diff --git a/config/default.go b/config/default.go index dd1726b31d..33947b4159 100644 --- a/config/default.go +++ b/config/default.go @@ -55,6 +55,7 @@ WriteTimeout = "60s" MaxRequestsPerIPAndSecond = 500 SequencerNodeURI = "" EnableL2SuggestedGasPricePolling = true +TraceBatchUseHTTPS = true [RPC.WebSockets] Enabled = true Host = "0.0.0.0" diff --git a/jsonrpc/config.go b/jsonrpc/config.go index 5284c67152..d347ba41d2 100644 --- a/jsonrpc/config.go +++ b/jsonrpc/config.go @@ -34,6 +34,10 @@ type Config struct { // EnableL2SuggestedGasPricePolling enables polling of the L2 gas price to block tx in the RPC with lower gas price. EnableL2SuggestedGasPricePolling bool `mapstructure:"EnableL2SuggestedGasPricePolling"` + + // TraceBatchUseHTTPS enables, in the debug_traceBatchByNum endpoint, the use of the HTTPS protocol (instead of HTTP) + // to do the parallel requests to RPC.debug_traceTransaction endpoint + TraceBatchUseHTTPS bool `mapstructure:"TraceBatchUseHTTPS"` } // WebSocketsConfig has parameters to config the rpc websocket support diff --git a/jsonrpc/endpoints_debug.go b/jsonrpc/endpoints_debug.go index a6878e3b44..576273d640 100644 --- a/jsonrpc/endpoints_debug.go +++ b/jsonrpc/endpoints_debug.go @@ -6,8 +6,14 @@ import ( "encoding/json" "errors" "fmt" + "net/http" + "net/url" + "sort" "strings" + "sync" + "time" + "github.com/0xPolygonHermez/zkevm-node/jsonrpc/client" "github.com/0xPolygonHermez/zkevm-node/jsonrpc/types" "github.com/0xPolygonHermez/zkevm-node/log" "github.com/0xPolygonHermez/zkevm-node/state" @@ -28,6 +34,7 @@ var defaultTraceConfig = &traceConfig{ // DebugEndpoints is the debug jsonrpc endpoint type DebugEndpoints struct { + cfg Config state types.StateInterface txMan dbTxManager } @@ -66,6 +73,11 @@ type traceBlockTransactionResponse struct { Result interface{} `json:"result"` } +type traceBatchTransactionResponse struct { + TxHash common.Hash `json:"txHash"` + Result interface{} `json:"result"` +} + // TraceTransaction creates a response for debug_traceTransaction request. // See https://geth.ethereum.org/docs/interacting-with-geth/rpc/ns-debug#debugtracetransaction func (d *DebugEndpoints) TraceTransaction(hash types.ArgHash, cfg *traceConfig) (interface{}, types.Error) { @@ -86,7 +98,7 @@ func (d *DebugEndpoints) TraceBlockByNumber(number types.BlockNumber, cfg *trace block, err := d.state.GetL2BlockByNumber(ctx, blockNumber, dbTx) if errors.Is(err, state.ErrNotFound) { return nil, types.NewRPCError(types.DefaultErrorCode, fmt.Sprintf("block #%d not found", blockNumber)) - } else if err == state.ErrNotFound { + } else if err != nil { return rpcErrorResponse(types.DefaultErrorCode, "failed to get block by number", err) } @@ -106,7 +118,7 @@ func (d *DebugEndpoints) TraceBlockByHash(hash types.ArgHash, cfg *traceConfig) block, err := d.state.GetL2BlockByHash(ctx, hash.Hash(), dbTx) if errors.Is(err, state.ErrNotFound) { return nil, types.NewRPCError(types.DefaultErrorCode, fmt.Sprintf("block %s not found", hash.Hash().String())) - } else if err == state.ErrNotFound { + } else if err != nil { return rpcErrorResponse(types.DefaultErrorCode, "failed to get block by hash", err) } @@ -119,6 +131,159 @@ func (d *DebugEndpoints) TraceBlockByHash(hash types.ArgHash, cfg *traceConfig) }) } +// TraceBatchByNumber creates a response for debug_traceBatchByNumber request. +// this endpoint tries to help clients to get traces at once for all the transactions +// attached to the same batch. +// +// IMPORTANT: in order to take advantage of the infrastructure automatically scaling, +// instead of parallelizing the trace transaction internally and pushing all the load +// to a single jRPC and Executor instance, the code will redirect the trace transaction +// requests to the same url, making them external calls, so we can process in parallel +// with multiple jRPC and Executor instances. +// +// the request flow will work as follows: +// -> user do a trace batch request +// -> jRPC balancer picks a jRPC server to handle the trace batch request +// -> picked jRPC sends parallel trace transaction requests for each transaction in the batch +// -> jRPC balancer sends each request to a different jRPC to handle the trace transaction requests +// -> picked jRPC server group trace transaction responses from other jRPC servers +// -> picked jRPC respond the initial request to the user with all the tx traces +func (d *DebugEndpoints) TraceBatchByNumber(httpRequest *http.Request, number types.BatchNumber, cfg *traceConfig) (interface{}, types.Error) { + type traceResponse struct { + blockNumber uint64 + txIndex uint64 + txHash common.Hash + trace interface{} + err error + } + + // the size of the buffer defines + // how many txs it will process in parallel. + const bufferSize = 10 + + // checks and load the request scheme to build the url for the remote requests + // scheme, err := getHttpScheme(httpRequest) + // if err != nil { + // return rpcErrorResponse(types.DefaultErrorCode, err.Error(), nil) + // } + + // builds the url of the remote jRPC server + scheme := "http" + if d.cfg.TraceBatchUseHTTPS { + scheme = "https" + } + u := url.URL{ + Scheme: scheme, + Host: httpRequest.Host, + Path: httpRequest.URL.Path, + } + rpcURL := u.String() + + return d.txMan.NewDbTxScope(d.state, func(ctx context.Context, dbTx pgx.Tx) (interface{}, types.Error) { + batchNumber, rpcErr := number.GetNumericBatchNumber(ctx, d.state, dbTx) + if rpcErr != nil { + return nil, rpcErr + } + + batch, err := d.state.GetBatchByNumber(ctx, batchNumber, dbTx) + if errors.Is(err, state.ErrStateNotSynchronized) { + return nil, types.NewRPCError(types.DefaultErrorCode, fmt.Sprintf("batch #%d not found", batchNumber)) + } else if err != nil { + return rpcErrorResponse(types.DefaultErrorCode, "failed to get batch by number", err) + } + + txs, err := d.state.GetTransactionsByBatchNumber(ctx, batch.BatchNumber, dbTx) + if !errors.Is(err, state.ErrNotFound) && err != nil { + return rpcErrorResponse(types.DefaultErrorCode, fmt.Sprintf("couldn't load batch txs from state by number %v to create the traces", batchNumber), err) + } + + receipts := make([]ethTypes.Receipt, 0, len(txs)) + for _, tx := range txs { + receipt, err := d.state.GetTransactionReceipt(ctx, tx.Hash(), dbTx) + if err != nil { + return rpcErrorResponse(types.DefaultErrorCode, fmt.Sprintf("couldn't load receipt for tx %v to get trace", tx.Hash().String()), err) + } + receipts = append(receipts, *receipt) + } + + buffer := make(chan byte, bufferSize) + + mu := sync.Mutex{} + wg := sync.WaitGroup{} + wg.Add(len(receipts)) + responses := make([]traceResponse, 0, len(receipts)) + + // gets the trace from the jRPC and adds it to the responses + loadTraceByTxHash := func(receipt ethTypes.Receipt) { + defer func() { + <-buffer // make buffer slot free + wg.Done() + }() + buffer <- 1 // use buffer free slot or wait for a free slot + + response := traceResponse{ + blockNumber: receipt.BlockNumber.Uint64(), + txIndex: uint64(receipt.TransactionIndex), + txHash: receipt.TxHash, + } + + res, err := client.JSONRPCCall(rpcURL, "debug_traceTransaction", receipt.TxHash.String(), cfg) + if err != nil { + err := fmt.Errorf("failed to get tx trace from remote jRPC server %v for tx %v, err: %w", rpcURL, receipt.TxHash.String(), err) + log.Errorf(err.Error()) + response.err = err + } else if res.Error != nil { + err := fmt.Errorf("tx trace error returned from remote jRPC server %v for tx %v, err: %v - %v", rpcURL, receipt.TxHash.String(), res.Error.Code, res.Error.Message) + log.Errorf(err.Error()) + response.err = err + } else { + response.trace = res.Result + } + + // add to the responses + mu.Lock() + defer mu.Unlock() + responses = append(responses, response) + } + + // load traces for each transaction + for _, receipt := range receipts { + go loadTraceByTxHash(receipt) + } + + // wait the traces to be loaded + if waitTimeout(&wg, d.cfg.ReadTimeout.Duration) { + return rpcErrorResponse(types.DefaultErrorCode, fmt.Sprintf("failed to get traces for batch %v: timeout reached", batchNumber), nil) + } + + // since the txs are attached to a L2 Block and the L2 Block is + // the struct attached to the Batch, in order to always respond + // the traces in the same order, we need to order the transactions + // first by block number and then by tx index, so we can have something + // close to the txs being sorted by a tx index related to the batch + sort.Slice(responses, func(i, j int) bool { + if responses[i].txIndex != responses[j].txIndex { + return responses[i].txIndex < responses[j].txIndex + } + return responses[i].blockNumber < responses[j].blockNumber + }) + + // build the batch trace response array + traces := make([]traceBatchTransactionResponse, 0, len(receipts)) + for _, response := range responses { + if response.err != nil { + return rpcErrorResponse(types.DefaultErrorCode, fmt.Sprintf("failed to get traces for batch %v: failed to get trace for tx: %v, err: %v", batchNumber, response.txHash.String(), response.err.Error()), nil) + } + + traces = append(traces, traceBatchTransactionResponse{ + TxHash: response.txHash, + Result: response.trace, + }) + } + return traces, nil + }) +} + func (d *DebugEndpoints) buildTraceBlock(ctx context.Context, txs []*ethTypes.Transaction, cfg *traceConfig, dbTx pgx.Tx) (interface{}, types.Error) { traces := []traceBlockTransactionResponse{} for _, tx := range txs { @@ -293,3 +458,53 @@ func isBuiltInTracer(tracer string) bool { func isJSCustomTracer(tracer string) bool { return strings.Contains(tracer, "result") && strings.Contains(tracer, "fault") } + +// // getHttpScheme tries to get the scheme from the http request in different ways +// func getHttpScheme(r *http.Request) (string, error) { +// // scheme headers +// headers := []string{"X-Forwarded-Proto", "X-Forwarded-Protocol", "X-Url-Scheme"} +// for _, header := range headers { +// value := r.Header.Get(header) +// if value == "http" || value == "https" { +// return value, nil +// } else if value != "" { +// return "", fmt.Errorf("header %v must be set to HTTP or HTTPS, value found: %s", header, value) +// } +// } + +// // https on/off headers +// headers = []string{"X-Forwarded-Ssl", "Front-End-Https"} +// for _, header := range headers { +// value := r.Header.Get(header) +// if value == "on" { +// return "https", nil +// } else if value == "off" { +// return "http", nil +// } else if value != "" { +// return "", fmt.Errorf("header %v must be set to ON or OFF, value found: %s", header, value) +// } +// } + +// // httpRequest TLS check +// scheme := "http" +// if r.TLS != nil { +// scheme = "https" +// } +// return scheme, nil +// } + +// waitTimeout waits for the waitGroup for the specified max timeout. +// Returns true if waiting timed out. +func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { + c := make(chan struct{}) + go func() { + defer close(c) + wg.Wait() + }() + select { + case <-c: + return false // completed normally + case <-time.After(timeout): + return true // timed out + } +} diff --git a/jsonrpc/endpoints_eth.go b/jsonrpc/endpoints_eth.go index 2b029083f3..099affbc50 100644 --- a/jsonrpc/endpoints_eth.go +++ b/jsonrpc/endpoints_eth.go @@ -31,8 +31,8 @@ const ( // EthEndpoints contains implementations for the "eth" RPC endpoints type EthEndpoints struct { - cfg Config chainID uint64 + cfg Config pool types.PoolInterface state types.StateInterface storage storageInterface diff --git a/jsonrpc/server.go b/jsonrpc/server.go index 4e67cefea4..b76e34fd01 100644 --- a/jsonrpc/server.go +++ b/jsonrpc/server.go @@ -79,7 +79,7 @@ func NewServer( } if _, ok := apis[APIDebug]; ok { - debugEndpoints := &DebugEndpoints{state: s} + debugEndpoints := &DebugEndpoints{cfg: cfg, state: s} handler.registerService(APIDebug, debugEndpoints) } diff --git a/pool/pool.go b/pool/pool.go index 51634ada1d..9d3ef6e57c 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -310,7 +310,7 @@ func (p *Pool) validateTx(ctx context.Context, poolTx Transaction) error { // check if sender has reached the limit of transactions in the pool if p.cfg.AccountQueue > 0 { - // txCount, err := p.storage.CountTransactionsByFromAndStatus(ctx, from, TxStatusPending, TxStatusFailed) + // txCount, err := p.storage.CountTransactionsByFromAndStatus(ctx, from, TxStatusPending) // if err != nil { // return err // } @@ -326,7 +326,7 @@ func (p *Pool) validateTx(ctx context.Context, poolTx Transaction) error { // check if the pool is full if p.cfg.GlobalQueue > 0 { - txCount, err := p.storage.CountTransactionsByStatus(ctx, TxStatusPending, TxStatusFailed) + txCount, err := p.storage.CountTransactionsByStatus(ctx, TxStatusPending) if err != nil { return err }