diff --git a/blockchain/client.go b/blockchain/client.go index 6c5fe20..6412b47 100644 --- a/blockchain/client.go +++ b/blockchain/client.go @@ -1,8 +1,8 @@ package blockchain import ( + "context" "fmt" - "math" "sync" "sync/atomic" @@ -18,10 +18,10 @@ type EventsListener func(events *pallets.Events, blockNumber types.BlockNumber, type Client struct { *gsrpc.SubstrateAPI - eventsListeners map[int]EventsListener + eventsListeners map[*EventsListener]struct{} mu sync.Mutex isListening uint32 - stopListening func() + cancelListening func() errsListening chan error DdcClusters pallets.DdcClustersApi @@ -42,7 +42,7 @@ func NewClient(url string) (*Client, error) { return &Client{ SubstrateAPI: substrateApi, - eventsListeners: make(map[int]EventsListener), + eventsListeners: make(map[*EventsListener]struct{}), DdcClusters: pallets.NewDdcClustersApi(substrateApi), DdcCustomers: pallets.NewDdcCustomersApi(substrateApi, meta), DdcNodes: pallets.NewDdcNodesApi(substrateApi, meta), @@ -50,11 +50,21 @@ func NewClient(url string) (*Client, error) { }, nil } -func (c *Client) StartEventsListening() (func(), <-chan error, error) { +// StartEventsListening subscribes for blockchain events and passes events starting from the +// 'begin' block to registered events listeners. Listeners registered after this call will only +// receive live events meaning all listeners which need historical events from 'begin' block +// should be registered at the moment of calling this function. The 'afterBlock' callback is +// invoked after all registered events listeners are already invoked. +func (c *Client) StartEventsListening( + begin types.BlockNumber, + afterBlock func(blockNumber types.BlockNumber, blockHash types.Hash), +) (context.CancelFunc, <-chan error, error) { if !atomic.CompareAndSwapUint32(&c.isListening, 0, 1) { - return c.stopListening, c.errsListening, nil + return c.cancelListening, c.errsListening, nil } + c.errsListening = make(chan error) + meta, err := c.RPC.State.GetMetadataLatest() if err != nil { return nil, nil, err @@ -68,77 +78,143 @@ func (c *Client) StartEventsListening() (func(), <-chan error, error) { return nil, nil, err } - done := make(chan struct{}) - c.errsListening = make(chan error) + liveChangesC := sub.Chan() + histChangesC := make(chan types.StorageChangeSet) + + // Query historical changes. + var cancelled atomic.Value + cancelled.Store(false) + go func(begin types.BlockNumber, liveChanges <-chan types.StorageChangeSet, histChangesC chan types.StorageChangeSet) { + defer close(histChangesC) + + set := <-liveChanges // first live changes set block is the last historical block + + header, err := c.RPC.Chain.GetHeader(set.Block) + if err != nil { + c.errsListening <- fmt.Errorf("get header: %w", err) + return + } + + for currentBlock := begin; currentBlock < header.Number; currentBlock++ { + blockHash, err := c.RPC.Chain.GetBlockHash(uint64(currentBlock)) + if err != nil { + c.errsListening <- fmt.Errorf("get block hash: %w", err) + return + } + + blockChangesSets, err := c.RPC.State.QueryStorageAt([]types.StorageKey{key}, blockHash) + if err != nil { + c.errsListening <- fmt.Errorf("query storage: %w", err) + return + } + + for _, set := range blockChangesSets { + histChangesC <- set + } - go func() { - for { - select { - case <-done: + // Graceful stop must finish the block before exiting. + if cancelled.Load().(bool) { return - case set := <-sub.Chan(): - header, err := c.RPC.Chain.GetHeader(set.Block) + } + } + + histChangesC <- set + }(begin, liveChangesC, histChangesC) + + // Sequence historical and live changes. + changesC := make(chan types.StorageChangeSet) + go func(histChangesC, liveChangesC <-chan types.StorageChangeSet, changesC chan types.StorageChangeSet) { + defer close(changesC) + + for set := range histChangesC { + changesC <- set + } + + for set := range liveChangesC { + changesC <- set + } + }(histChangesC, liveChangesC, changesC) + + // Decode events from changes skipping blocks before 'begin'. + eventsC := make(chan blockEvents) + go func(changesC <-chan types.StorageChangeSet, eventsC chan blockEvents) { + defer close(eventsC) + + for set := range changesC { + header, err := c.RPC.Chain.GetHeader(set.Block) + if err != nil { + c.errsListening <- fmt.Errorf("get header: %w", err) + return + } + + if header.Number < begin { + continue + } + + for _, change := range set.Changes { + if !codec.Eq(change.StorageKey, key) || !change.HasStorageData { + continue + } + + events := &pallets.Events{} + err = types.EventRecordsRaw(change.StorageData).DecodeEventRecords(meta, events) if err != nil { - c.errsListening <- fmt.Errorf("get header: %w", err) + c.errsListening <- fmt.Errorf("events decoder: %w", err) continue } - for _, change := range set.Changes { - if !codec.Eq(change.StorageKey, key) || !change.HasStorageData { - continue - } - - events := &pallets.Events{} - err = types.EventRecordsRaw(change.StorageData).DecodeEventRecords(meta, events) - if err != nil { - c.errsListening <- fmt.Errorf("events decoder: %w", err) - continue - } - - for _, callback := range c.eventsListeners { - go callback(events, header.Number, set.Block) - } + eventsC <- blockEvents{ + Events: events, + Number: header.Number, + Hash: set.Block, } } } - }() + }(changesC, eventsC) + + // Invoke listeners. + go func(eventsC <-chan blockEvents) { + for blockEvents := range eventsC { + for callback := range c.eventsListeners { + (*callback)(blockEvents.Events, blockEvents.Number, blockEvents.Hash) + } + + if afterBlock != nil { + afterBlock(blockEvents.Number, blockEvents.Hash) + } + } + }(eventsC) once := sync.Once{} - c.stopListening = func() { + c.cancelListening = func() { once.Do(func() { - done <- struct{}{} sub.Unsubscribe() + cancelled.Store(true) c.isListening = 0 }) } - return c.stopListening, c.errsListening, nil + return c.cancelListening, c.errsListening, nil } -func (c *Client) RegisterEventsListener(callback EventsListener) (func(), error) { - var idx int - for i := 0; i <= math.MaxInt; i++ { - if _, ok := c.eventsListeners[i]; !ok { - idx = i - break - } - if i == math.MaxInt { - return nil, fmt.Errorf("too many events listeners") - } - } - +// RegisterEventsListener subscribes given callback to blockchain events. +func (c *Client) RegisterEventsListener(callback EventsListener) context.CancelFunc { c.mu.Lock() - c.eventsListeners[idx] = callback + c.eventsListeners[&callback] = struct{}{} c.mu.Unlock() once := sync.Once{} - stop := func() { + return func() { once.Do(func() { c.mu.Lock() - delete(c.eventsListeners, idx) + delete(c.eventsListeners, &callback) c.mu.Unlock() }) } +} - return stop, nil +type blockEvents struct { + Events *pallets.Events + Hash types.Hash + Number types.BlockNumber }