Skip to content

Commit

Permalink
Experiment with iss
Browse files Browse the repository at this point in the history
  • Loading branch information
ferranbt committed Sep 5, 2024
1 parent 08d366f commit 14f1a72
Show file tree
Hide file tree
Showing 10 changed files with 515 additions and 1 deletion.
2 changes: 1 addition & 1 deletion builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ func (b *Builder) runBuildingJob(slotCtx context.Context, attrs *builderTypes.Pa
log.Info("runBuildingJob", "slot", attrs.Slot, "parent", attrs.HeadHash, "payloadTimestamp", uint64(attrs.Timestamp), "txs", attrs.Transactions)

// retry build block every builderBlockRetryInterval
runRetryLoop(ctx, b.builderRetryInterval, func() {
runRetryLoop(ctx, 100*time.Second, func() {
log.Info("retrying BuildBlock",
"slot", attrs.Slot,
"parent", attrs.HeadHash,
Expand Down
2 changes: 2 additions & 0 deletions builder/eth_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package builder

import (
"errors"
"fmt"
"time"

"github.com/ethereum/go-ethereum/beacon/engine"
Expand Down Expand Up @@ -34,6 +35,7 @@ func NewEthereumService(eth *eth.Ethereum, config *Config) *EthereumService {
}

func (s *EthereumService) BuildBlock(attrs *builderTypes.PayloadAttributes) (*engine.ExecutionPayloadEnvelope, error) {
fmt.Println("- build block -")
// Send a request to generate a full block in the background.
// The result can be obtained via the returned channel.
args := &miner.BuildPayloadArgs{
Expand Down
2 changes: 2 additions & 0 deletions builder/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ func respondError(w http.ResponseWriter, code int, message string) {

// runRetryLoop calls retry periodically with the provided interval respecting context cancellation
func runRetryLoop(ctx context.Context, interval time.Duration, retry func()) {
retry() // Call the callback function at the beginning

t := time.NewTicker(interval)
defer t.Stop()
for {
Expand Down
8 changes: 8 additions & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,14 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
}
eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, config.GPO, config.Miner.GasPrice)

// FB Specific

// This is a weird recursion, miner uses iss and iss uses miner, Fix
issBuilder, _ := miner.NewISSBuilder(eth.miner, 250*time.Millisecond, 2*time.Second)
issBuilder.AddSSEStream(1122)
eth.miner.SetISSBuilder(issBuilder)
// End FB Specific

// Setup DNS discovery iterators.
dnsclient := dnsdisc.NewClient(dnsdisc.Config{})
eth.ethDialCandidates, err = dnsclient.NewIterator(eth.config.EthDiscoveryURLs...)
Expand Down
1 change: 1 addition & 0 deletions eth/catalyst/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl
return valid(nil), engine.InvalidPayloadAttributes.With(err)
}
}
fmt.Println("_ FROM HERE _")
payload, err := api.eth.Miner().BuildPayload(args)
if err != nil {
log.Error("Failed to build payload", "err", err)
Expand Down
238 changes: 238 additions & 0 deletions miner/iss_builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
package miner

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/consensus/misc"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/r3labs/sse"
)

type ISSSink func(receipts types.Receipts)

type ISSBuilder struct {
miner *Miner
issTime time.Duration // TODO: Maybe add some config for this
blockTime time.Duration

sink []ISSSink

// current execution context
payload *Payload
cancelFn context.CancelFunc

notifyCh chan struct{}
}

func NewISSBuilder(miner *Miner, issTime time.Duration, blockTime time.Duration) (*ISSBuilder, error) {
// Ensure that issTime is lower than blockTime
if issTime >= blockTime {
return nil, fmt.Errorf("issTime must be lower than blockTime")
}

builder := &ISSBuilder{
miner: miner,
issTime: issTime,
blockTime: blockTime,
sink: []ISSSink{},
notifyCh: make(chan struct{}),
}
return builder, nil
}

func (i *ISSBuilder) AddSink(sink ISSSink) *ISSBuilder {
i.sink = append(i.sink, sink)
return i
}

func (i *ISSBuilder) Build(args *BuildPayloadArgs) (*Payload, error) {
// cancel any previous builds
if i.cancelFn != nil {
i.cancelFn()
}

i.payload = newPayload(nil, args.Id())

ctx, cancelFn := context.WithCancel(context.Background())
i.cancelFn = cancelFn

go func() {
i.buildPayload(ctx, args)
}()

return i.payload, nil
}

// very very conservative estimate
var timeToHash = 150 * time.Millisecond

func (i *ISSBuilder) buildPayload(ctx context.Context, args *BuildPayloadArgs) {
// TODO: Use the context
params := &generateParams{
timestamp: args.Timestamp,
forceTime: true,
parentHash: args.Parent,
coinbase: args.FeeRecipient,
random: args.Random,
withdrawals: args.Withdrawals,
beaconRoot: args.BeaconRoot,
noTxs: false,
txs: args.Transactions,
gasLimit: args.GasLimit,
}

blockTime, err := i.miner.validateParams(params)
if err != nil {
// TODO: Fail at the point of Build, and only if this works start the async process
panic(err)
}

work, err := i.miner.prepareWork(params)
if err != nil {
// Same as above, fail at the point of Build
panic(err)
}

fmt.Printf("(%d) Building block %s \n", work.header.Number, blockTime)

if work.gasPool == nil {
gasLimit := i.miner.config.EffectiveGasCeil
if gasLimit == 0 || gasLimit > work.header.GasLimit {
gasLimit = work.header.GasLimit
}
work.gasPool = new(core.GasPool).AddGas(gasLimit)
}

misc.EnsureCreate2Deployer(i.miner.chainConfig, work.header.Time, work.state)

// Apply the op payload transactions
for _, tx := range params.txs {
work.state.SetTxContext(tx.Hash(), work.tcount)
err = i.miner.commitTransaction(work, tx)
if err != nil {
// Same as above, fail at the point of Build
panic(err)
}
work.tcount++
}

gasPool := new(core.GasPool).AddGas(work.header.GasLimit)

// initial estimation of how much gas to put on each batch
gasPerBatch := work.header.GasLimit / 4

// we are not including the time it takes to hash in the time to build the ISS batches
blockBuildingTimeLeft := blockTime - timeToHash
initialTime := time.Now()

initialGasLimit := work.header.GasLimit

// work is your snapshot reference to the whole state being built
for {
// check if there is enough time left to make another chunk of the block
// - is there enough gas?
// - is there enough time?
// We are going to keep some wiggle room of n milliseconds for hashing
if gasPool.Gas() == 0 {
break
}
if blockBuildingTimeLeft <= 0 {
break
}

timeForBatch := i.issTime
if blockBuildingTimeLeft < timeForBatch {
timeForBatch = blockBuildingTimeLeft
}

now := time.Now()

// I have to use both a context and the timer because I can't pass the context to fillTransactions
// and I cannot use timer.C to know if the timer is over
// TODO: Transport the interrupt to the fillTransactions into a context
ctx, cancel := context.WithCancel(context.Background())
interrupt := &atomic.Int32{}
time.AfterFunc(timeForBatch, func() {
cancel()
interrupt.Store(commitInterruptTimeout)
})

// we must override this because it is what fillTransactions uses to determine if it should keep filling
// take the snapshot of the work here for each iteration
work.header.GasLimit = gasPerBatch
if err := i.miner.fillTransactions(interrupt, work); err != nil {
if errors.Is(err, errBlockInterruptedByTimeout) {
// If the error is a timeout, break out of the inner loop
break
} else {
panic(err)
}
}

// wait for the timer to fire even if the bb ends sooner
<-ctx.Done()

for _, sink := range i.sink {
sink(work.receipts)
}

// notify that a new batch is ready
select {
case i.notifyCh <- struct{}{}:
default:
}

// wait for the timer to fire even if the bb ends sooner
blockBuildingTimeLeft -= time.Since(now)
fmt.Printf("(%d) Time since %s\n", work.header.Number, time.Since(now))
}

fmt.Printf("(%d) Total block time (no hash) %s\n", work.header.Number, time.Since(initialTime))

work.header.GasLimit = initialGasLimit

body := types.Body{Transactions: work.txs, Withdrawals: params.withdrawals}
block, err := i.miner.engine.FinalizeAndAssemble(i.miner.chain, work.header, work.state, &body, work.receipts)
if err != nil {
panic(err)
}
res := &newPayloadResult{
block: block,
fees: totalFees(block, work.receipts),
sidecars: work.sidecars,
stateDB: work.state,
receipts: work.receipts,
}
i.payload.update(res, 2*time.Second)
}

func (i *ISSBuilder) AddSSEStream(port uint64) {
eventStream := sse.New()
eventStream.AutoReplay = false
eventStream.CreateStream("iss")

// Create a new Mux and set the handler
mux := http.NewServeMux()
mux.HandleFunc("/events", eventStream.HTTPHandler)

sink := func(receipts types.Receipts) {
raw, err := json.Marshal(receipts)
if err != nil {
panic(err)
}

eventStream.Publish("iss", &sse.Event{
Data: raw,
})
}
i.AddSink(sink)

go http.ListenAndServe(fmt.Sprintf("0.0.0.0:%d", port), mux)
}
76 changes: 76 additions & 0 deletions miner/iss_builder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package miner

import (
"fmt"
"math/big"
"testing"
"time"

"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/params"
"github.com/stretchr/testify/require"
)

func TestISSBuilder_ExecutionInBatches(t *testing.T) {
w, _ := newTestWorker(t, params.TestChainConfig, ethash.NewFaker(), rawdb.NewMemoryDatabase(), 0)

builder, _ := NewISSBuilder(w, 250*time.Millisecond, 2*time.Second)
args := &BuildPayloadArgs{
Timestamp: 1,
}

doneCh := make(chan error)
go func() {
// 250/2 is 8 batches
for i := 0; i < 8; i++ {
select {
case <-builder.notifyCh:
case <-time.After(275 * time.Millisecond):
doneCh <- fmt.Errorf("timeout")
}
}
doneCh <- nil
}()

builder.Build(args)

err := <-doneCh
require.NoError(t, err)
}

func TestISSBuilder_MultipleTransactions(t *testing.T) {
w, b := newTestWorker(t, params.TestChainConfig, ethash.NewFaker(), rawdb.NewMemoryDatabase(), 0)
b.pushNewTxnToPool(t)

builder, _ := NewISSBuilder(w, 250*time.Millisecond, 2*time.Second)
args := &BuildPayloadArgs{
Timestamp: 1,
}
payload, err := builder.Build(args)
require.NoError(t, err)

envelope := payload.Resolve()
require.Equal(t, 2, len(envelope.ExecutionPayload.Transactions))
}

func (tB *testWorkerBackend) pushNewTxnToPool(t *testing.T) {
if tB.nonce == 0 {
tB.nonce = 1
} else {
tB.nonce++
}

signer := types.LatestSigner(params.TestChainConfig)

tx2 := types.MustSignNewTx(testBankKey, signer, &types.LegacyTx{
Nonce: tB.nonce,
To: &testUserAddress,
Value: big.NewInt(1000),
Gas: params.TxGas,
GasPrice: big.NewInt(params.InitialBaseFee),
})
errArr := tB.txPool.Add([]*types.Transaction{tx2}, false, false)
require.NoError(t, errArr[0])
}
Loading

0 comments on commit 14f1a72

Please sign in to comment.