Skip to content

Commit

Permalink
Merge pull request #100 from Cerebellum-Network/feature/events-listen…
Browse files Browse the repository at this point in the history
…er-error

Events listener error
  • Loading branch information
khssnv committed Jul 10, 2024
2 parents bdad6f0 + 3660cba commit 364e45c
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 129 deletions.
215 changes: 89 additions & 126 deletions blockchain/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,28 @@ package blockchain

import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/cenkalti/backoff"
gsrpc "github.com/centrifuge/go-substrate-rpc-client/v4"
"github.com/centrifuge/go-substrate-rpc-client/v4/registry"
"github.com/centrifuge/go-substrate-rpc-client/v4/registry/exec"
"github.com/centrifuge/go-substrate-rpc-client/v4/registry/parser"
"github.com/centrifuge/go-substrate-rpc-client/v4/registry/retriever"
"github.com/centrifuge/go-substrate-rpc-client/v4/registry/state"
"github.com/centrifuge/go-substrate-rpc-client/v4/types"
"golang.org/x/sync/errgroup"

"github.com/cerebellum-network/cere-ddc-sdk-go/blockchain/pallets"
)

var errCancelled = errors.New("cancelled")

type EventsListener func(events []*parser.Event, blockNumber types.BlockNumber, blockHash types.Hash)
type EventsListener func(events []*parser.Event, blockNumber types.BlockNumber, blockHash types.Hash) error

type Client struct {
*gsrpc.SubstrateAPI

mu sync.Mutex
eventsListeners map[*EventsListener]struct{}

isListening uint32
cancelListening func()
errsListening chan error

DdcClusters pallets.DdcClustersApi
DdcCustomers pallets.DdcCustomersApi
DdcNodes pallets.DdcNodesApi
Expand All @@ -60,22 +50,20 @@ func NewClient(url string) (*Client, error) {
}, nil
}

// StartEventsListening subscribes for blockchain events and passes events starting from the
// 'begin' block to registered events listeners. Listeners registered after this call will only
// receive live events meaning all listeners which need historical events from 'begin' block
// should be registered at the moment of calling this function. The 'afterBlock' callback is
// invoked after all registered events listeners are already invoked.
func (c *Client) StartEventsListening(
// ListenEvents listens for blockchain events and sequentially calls registered events listeners to
// process incoming events. It starts from the block begin and calls callback after when all events
// listeners already called on a block events.
//
// ListenEvents always returns a non-nil error from a registered events listener or a callback
// after.
func (c *Client) ListenEvents(
ctx context.Context,
begin types.BlockNumber,
after func(blockNumber types.BlockNumber, blockHash types.Hash),
) (context.CancelFunc, <-chan error, error) {
if !atomic.CompareAndSwapUint32(&c.isListening, 0, 1) {
return c.cancelListening, c.errsListening, nil
}

after func(blockNumber types.BlockNumber, blockHash types.Hash) error,
) error {
sub, err := c.RPC.Chain.SubscribeNewHeads()
if err != nil {
return nil, nil, fmt.Errorf("subscribe new heads: %w", err)
return err
}

retriever, err := retriever.NewEventRetriever(
Expand All @@ -87,142 +75,134 @@ func (c *Client) StartEventsListening(
exec.NewRetryableExecutor[[]*parser.Event](exec.WithMaxRetryCount(0)),
)
if err != nil {
return nil, nil, fmt.Errorf("event retriever: %w", err)
return err
}

c.errsListening = make(chan error)
g, ctx := errgroup.WithContext(ctx)

liveHeadersC := sub.Chan()
histHeadersC := make(chan types.Header)
var wg sync.WaitGroup
go func() {
<-ctx.Done()
sub.Unsubscribe()
}()

// Query historical headers.
var cancelled atomic.Value
cancelled.Store(false)
wg.Add(1)
go func(beginBlock types.BlockNumber, live <-chan types.Header, hist chan types.Header) {
defer wg.Done()
defer close(hist)

firstLiveHeader := <-live // the first live header is the last historical header

for block := beginBlock; block < firstLiveHeader.Number; {
var header *types.Header
err := retryUntilCancelled(func() error {
blockHash, err := c.RPC.Chain.GetBlockHash(uint64(block))
if err != nil {
c.errsListening <- fmt.Errorf("get historical block hash: %w", err)
return err
}
histHeadersC := make(chan types.Header)
g.Go(func() error {
defer close(histHeadersC)

header, err = c.RPC.Chain.GetHeader(blockHash)
if err != nil {
c.errsListening <- fmt.Errorf("get historical header: %w", err)
return err
}
firstLiveHeader, ok := <-liveHeadersC // first live header will be the last historical
if !ok {
return ctx.Err()
}

return nil
}, &cancelled)
for block := begin; block < firstLiveHeader.Number; block++ {
blockHash, err := c.RPC.Chain.GetBlockHash(uint64(block))
if err != nil {
if err == errCancelled {
return
}
continue
return err
}

header, err := c.RPC.Chain.GetHeader(blockHash)
if err != nil {
return err
}

hist <- *header
select {
case <-ctx.Done():
return ctx.Err()
case histHeadersC <- *header:
}
}

block++
select {
case <-ctx.Done():
return ctx.Err()
case histHeadersC <- firstLiveHeader:
}

hist <- firstLiveHeader
}(begin, liveHeadersC, histHeadersC)
return nil
})

// Sequence historical and live headers.
headersC := make(chan types.Header)
wg.Add(1)
go func(hist, live <-chan types.Header, headersC chan types.Header) {
defer wg.Done()
g.Go(func() error {
defer close(headersC)

for header := range hist {
headersC <- header
for header := range histHeadersC {
select {
case <-ctx.Done():
return ctx.Err()
case headersC <- header:
}
}

for header := range live {
headersC <- header
for header := range liveHeadersC {
select {
case <-ctx.Done():
return ctx.Err()
case headersC <- header:
}
}
}(histHeadersC, liveHeadersC, headersC)

return nil
})

// Retrieve events skipping blocks before 'begin'.
eventsC := make(chan blockEvents)
wg.Add(1)
go func(headersC <-chan types.Header, eventsC chan blockEvents) {
defer wg.Done()
g.Go(func() error {
defer close(eventsC)

for header := range headersC {
if header.Number < begin {
continue
}

var hash types.Hash
var events []*parser.Event
err := retryUntilCancelled(func() error {
var err error
hash, err = c.RPC.Chain.GetBlockHash(uint64(header.Number))
if err != nil {
c.errsListening <- fmt.Errorf("get block hash: %w", err)
return err
}

events, err = retriever.GetEvents(hash)
if err != nil {
c.errsListening <- fmt.Errorf("events retriever: %w", err)
return err
}
hash, err := c.RPC.Chain.GetBlockHash(uint64(header.Number))
if err != nil {
return err
}

return nil
}, &cancelled)
events, err := retriever.GetEvents(hash)
if err != nil {
continue
return err
}

eventsC <- blockEvents{
select {
case <-ctx.Done():
return ctx.Err()
case eventsC <- blockEvents{
Events: events,
Hash: hash,
Number: header.Number,
}:
}
}
}(headersC, eventsC)

return nil
})

// Invoke listeners.
wg.Add(1)
go func(eventsC <-chan blockEvents) {
defer wg.Done()
g.Go(func() error {
for blockEvents := range eventsC {
for callback := range c.eventsListeners {
(*callback)(blockEvents.Events, blockEvents.Number, blockEvents.Hash)
err := (*callback)(blockEvents.Events, blockEvents.Number, blockEvents.Hash)
if err != nil {
return err
}
}

if after != nil {
after(blockEvents.Number, blockEvents.Hash)
err := after(blockEvents.Number, blockEvents.Hash)
if err != nil {
return err
}
}
}
}(eventsC)

once := sync.Once{}
c.cancelListening = func() {
once.Do(func() {
sub.Unsubscribe()
cancelled.Store(true)
wg.Wait()
close(c.errsListening)
c.isListening = 0
})
}
return ctx.Err()
})

return c.cancelListening, c.errsListening, nil
return g.Wait()
}

// RegisterEventsListener subscribes given callback to blockchain events.
Expand All @@ -246,20 +226,3 @@ type blockEvents struct {
Hash types.Hash
Number types.BlockNumber
}

func retryUntilCancelled(f func() error, cancelled *atomic.Value) error {
expbackoff := backoff.NewExponentialBackOff()
expbackoff.MaxElapsedTime = 0 // never stop
expbackoff.InitialInterval = 10 * time.Second
expbackoff.Multiplier = 2
expbackoff.MaxInterval = 10 * time.Minute

ff := func() error {
if cancelled.Load().(bool) {
return backoff.Permanent(errCancelled)
}
return f()
}

return backoff.Retry(ff, expbackoff)
}
2 changes: 1 addition & 1 deletion blockchain/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ module github.com/cerebellum-network/cere-ddc-sdk-go/blockchain
go 1.18

require (
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/centrifuge/go-substrate-rpc-client/v4 v4.2.1
golang.org/x/sync v0.7.0
)

require (
Expand Down
4 changes: 2 additions & 2 deletions blockchain/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migc
github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA=
github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPxiMSCF5k=
github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce h1:YtWJF7RHm2pYCvA5t0RPmAaLUhREsKuKd+SLhxFbFeQ=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cosmos/go-bip39 v1.0.0 h1:pcomnQdrdH22njcAatO0yWojsUnCO3y2tNoV1cb6hHY=
github.com/cosmos/go-bip39 v1.0.0/go.mod h1:RNJv0H/pOIVgxw6KS7QeX2a0Uo0aKUlfhZ4xuwvCdJw=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -64,6 +62,8 @@ golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 h1:hNQpMuAJe5CtcUqCXaWga3FHu
golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08=
golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
Expand Down

0 comments on commit 364e45c

Please sign in to comment.