diff --git a/blockchain/client.go b/blockchain/client.go index 0d57eae..e90b3e5 100644 --- a/blockchain/client.go +++ b/blockchain/client.go @@ -2,13 +2,8 @@ package blockchain import ( "context" - "errors" - "fmt" "sync" - "sync/atomic" - "time" - "github.com/cenkalti/backoff" gsrpc "github.com/centrifuge/go-substrate-rpc-client/v4" "github.com/centrifuge/go-substrate-rpc-client/v4/registry" "github.com/centrifuge/go-substrate-rpc-client/v4/registry/exec" @@ -16,13 +11,12 @@ import ( "github.com/centrifuge/go-substrate-rpc-client/v4/registry/retriever" "github.com/centrifuge/go-substrate-rpc-client/v4/registry/state" "github.com/centrifuge/go-substrate-rpc-client/v4/types" + "golang.org/x/sync/errgroup" "github.com/cerebellum-network/cere-ddc-sdk-go/blockchain/pallets" ) -var errCancelled = errors.New("cancelled") - -type EventsListener func(events []*parser.Event, blockNumber types.BlockNumber, blockHash types.Hash) +type EventsListener func(events []*parser.Event, blockNumber types.BlockNumber, blockHash types.Hash) error type Client struct { *gsrpc.SubstrateAPI @@ -30,10 +24,6 @@ type Client struct { mu sync.Mutex eventsListeners map[*EventsListener]struct{} - isListening uint32 - cancelListening func() - errsListening chan error - DdcClusters pallets.DdcClustersApi DdcCustomers pallets.DdcCustomersApi DdcNodes pallets.DdcNodesApi @@ -60,22 +50,20 @@ func NewClient(url string) (*Client, error) { }, nil } -// StartEventsListening subscribes for blockchain events and passes events starting from the -// 'begin' block to registered events listeners. Listeners registered after this call will only -// receive live events meaning all listeners which need historical events from 'begin' block -// should be registered at the moment of calling this function. The 'afterBlock' callback is -// invoked after all registered events listeners are already invoked. -func (c *Client) StartEventsListening( +// ListenEvents listens for blockchain events and sequentially calls registered events listeners to +// process incoming events. It starts from the block begin and calls callback after when all events +// listeners already called on a block events. +// +// ListenEvents always returns a non-nil error from a registered events listener or a callback +// after. +func (c *Client) ListenEvents( + ctx context.Context, begin types.BlockNumber, - after func(blockNumber types.BlockNumber, blockHash types.Hash), -) (context.CancelFunc, <-chan error, error) { - if !atomic.CompareAndSwapUint32(&c.isListening, 0, 1) { - return c.cancelListening, c.errsListening, nil - } - + after func(blockNumber types.BlockNumber, blockHash types.Hash) error, +) error { sub, err := c.RPC.Chain.SubscribeNewHeads() if err != nil { - return nil, nil, fmt.Errorf("subscribe new heads: %w", err) + return err } retriever, err := retriever.NewEventRetriever( @@ -87,78 +75,81 @@ func (c *Client) StartEventsListening( exec.NewRetryableExecutor[[]*parser.Event](exec.WithMaxRetryCount(0)), ) if err != nil { - return nil, nil, fmt.Errorf("event retriever: %w", err) + return err } - c.errsListening = make(chan error) + g, ctx := errgroup.WithContext(ctx) liveHeadersC := sub.Chan() - histHeadersC := make(chan types.Header) - var wg sync.WaitGroup + go func() { + <-ctx.Done() + sub.Unsubscribe() + }() // Query historical headers. - var cancelled atomic.Value - cancelled.Store(false) - wg.Add(1) - go func(beginBlock types.BlockNumber, live <-chan types.Header, hist chan types.Header) { - defer wg.Done() - defer close(hist) - - firstLiveHeader := <-live // the first live header is the last historical header - - for block := beginBlock; block < firstLiveHeader.Number; { - var header *types.Header - err := retryUntilCancelled(func() error { - blockHash, err := c.RPC.Chain.GetBlockHash(uint64(block)) - if err != nil { - c.errsListening <- fmt.Errorf("get historical block hash: %w", err) - return err - } + histHeadersC := make(chan types.Header) + g.Go(func() error { + defer close(histHeadersC) - header, err = c.RPC.Chain.GetHeader(blockHash) - if err != nil { - c.errsListening <- fmt.Errorf("get historical header: %w", err) - return err - } + firstLiveHeader, ok := <-liveHeadersC // first live header will be the last historical + if !ok { + return ctx.Err() + } - return nil - }, &cancelled) + for block := begin; block < firstLiveHeader.Number; block++ { + blockHash, err := c.RPC.Chain.GetBlockHash(uint64(block)) if err != nil { - if err == errCancelled { - return - } - continue + return err + } + + header, err := c.RPC.Chain.GetHeader(blockHash) + if err != nil { + return err } - hist <- *header + select { + case <-ctx.Done(): + return ctx.Err() + case histHeadersC <- *header: + } + } - block++ + select { + case <-ctx.Done(): + return ctx.Err() + case histHeadersC <- firstLiveHeader: } - hist <- firstLiveHeader - }(begin, liveHeadersC, histHeadersC) + return nil + }) // Sequence historical and live headers. headersC := make(chan types.Header) - wg.Add(1) - go func(hist, live <-chan types.Header, headersC chan types.Header) { - defer wg.Done() + g.Go(func() error { defer close(headersC) - for header := range hist { - headersC <- header + for header := range histHeadersC { + select { + case <-ctx.Done(): + return ctx.Err() + case headersC <- header: + } } - for header := range live { - headersC <- header + for header := range liveHeadersC { + select { + case <-ctx.Done(): + return ctx.Err() + case headersC <- header: + } } - }(histHeadersC, liveHeadersC, headersC) + + return nil + }) // Retrieve events skipping blocks before 'begin'. eventsC := make(chan blockEvents) - wg.Add(1) - go func(headersC <-chan types.Header, eventsC chan blockEvents) { - defer wg.Done() + g.Go(func() error { defer close(eventsC) for header := range headersC { @@ -166,63 +157,52 @@ func (c *Client) StartEventsListening( continue } - var hash types.Hash - var events []*parser.Event - err := retryUntilCancelled(func() error { - var err error - hash, err = c.RPC.Chain.GetBlockHash(uint64(header.Number)) - if err != nil { - c.errsListening <- fmt.Errorf("get block hash: %w", err) - return err - } - - events, err = retriever.GetEvents(hash) - if err != nil { - c.errsListening <- fmt.Errorf("events retriever: %w", err) - return err - } + hash, err := c.RPC.Chain.GetBlockHash(uint64(header.Number)) + if err != nil { + return err + } - return nil - }, &cancelled) + events, err := retriever.GetEvents(hash) if err != nil { - continue + return err } - eventsC <- blockEvents{ + select { + case <-ctx.Done(): + return ctx.Err() + case eventsC <- blockEvents{ Events: events, Hash: hash, Number: header.Number, + }: } } - }(headersC, eventsC) + + return nil + }) // Invoke listeners. - wg.Add(1) - go func(eventsC <-chan blockEvents) { - defer wg.Done() + g.Go(func() error { for blockEvents := range eventsC { for callback := range c.eventsListeners { - (*callback)(blockEvents.Events, blockEvents.Number, blockEvents.Hash) + err := (*callback)(blockEvents.Events, blockEvents.Number, blockEvents.Hash) + if err != nil { + return err + } } if after != nil { - after(blockEvents.Number, blockEvents.Hash) + err := after(blockEvents.Number, blockEvents.Hash) + if err != nil { + return err + } } } - }(eventsC) - once := sync.Once{} - c.cancelListening = func() { - once.Do(func() { - sub.Unsubscribe() - cancelled.Store(true) - wg.Wait() - close(c.errsListening) - c.isListening = 0 - }) - } + return ctx.Err() + }) - return c.cancelListening, c.errsListening, nil + return g.Wait() } // RegisterEventsListener subscribes given callback to blockchain events. @@ -246,20 +226,3 @@ type blockEvents struct { Hash types.Hash Number types.BlockNumber } - -func retryUntilCancelled(f func() error, cancelled *atomic.Value) error { - expbackoff := backoff.NewExponentialBackOff() - expbackoff.MaxElapsedTime = 0 // never stop - expbackoff.InitialInterval = 10 * time.Second - expbackoff.Multiplier = 2 - expbackoff.MaxInterval = 10 * time.Minute - - ff := func() error { - if cancelled.Load().(bool) { - return backoff.Permanent(errCancelled) - } - return f() - } - - return backoff.Retry(ff, expbackoff) -} diff --git a/blockchain/go.mod b/blockchain/go.mod index f75bd98..5b11c93 100644 --- a/blockchain/go.mod +++ b/blockchain/go.mod @@ -3,8 +3,8 @@ module github.com/cerebellum-network/cere-ddc-sdk-go/blockchain go 1.18 require ( - github.com/cenkalti/backoff v2.2.1+incompatible github.com/centrifuge/go-substrate-rpc-client/v4 v4.2.1 + golang.org/x/sync v0.7.0 ) require ( diff --git a/blockchain/go.sum b/blockchain/go.sum index 43188f8..462485b 100644 --- a/blockchain/go.sum +++ b/blockchain/go.sum @@ -6,8 +6,6 @@ github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migc github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA= github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPxiMSCF5k= github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce h1:YtWJF7RHm2pYCvA5t0RPmAaLUhREsKuKd+SLhxFbFeQ= -github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= -github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cosmos/go-bip39 v1.0.0 h1:pcomnQdrdH22njcAatO0yWojsUnCO3y2tNoV1cb6hHY= github.com/cosmos/go-bip39 v1.0.0/go.mod h1:RNJv0H/pOIVgxw6KS7QeX2a0Uo0aKUlfhZ4xuwvCdJw= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -64,6 +62,8 @@ golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 h1:hNQpMuAJe5CtcUqCXaWga3FHu golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08= golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=