Skip to content

Commit

Permalink
Merge pull request #83 from Cerebellum-Network/feature/subscribe-for-…
Browse files Browse the repository at this point in the history
…events-from-block

Subscribe for events from a given block
  • Loading branch information
khssnv committed Mar 28, 2024
2 parents 6c6ffd6 + 0a486b3 commit 1ebedce
Showing 1 changed file with 126 additions and 50 deletions.
176 changes: 126 additions & 50 deletions blockchain/client.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package blockchain

import (
"context"
"fmt"
"math"
"sync"
"sync/atomic"

Expand All @@ -18,10 +18,10 @@ type EventsListener func(events *pallets.Events, blockNumber types.BlockNumber,
type Client struct {
*gsrpc.SubstrateAPI

eventsListeners map[int]EventsListener
eventsListeners map[*EventsListener]struct{}
mu sync.Mutex
isListening uint32
stopListening func()
cancelListening func()
errsListening chan error

DdcClusters pallets.DdcClustersApi
Expand All @@ -42,19 +42,29 @@ func NewClient(url string) (*Client, error) {

return &Client{
SubstrateAPI: substrateApi,
eventsListeners: make(map[int]EventsListener),
eventsListeners: make(map[*EventsListener]struct{}),
DdcClusters: pallets.NewDdcClustersApi(substrateApi),
DdcCustomers: pallets.NewDdcCustomersApi(substrateApi, meta),
DdcNodes: pallets.NewDdcNodesApi(substrateApi, meta),
DdcPayouts: pallets.NewDdcPayoutsApi(substrateApi, meta),
}, nil
}

func (c *Client) StartEventsListening() (func(), <-chan error, error) {
// StartEventsListening subscribes for blockchain events and passes events starting from the
// 'begin' block to registered events listeners. Listeners registered after this call will only
// receive live events meaning all listeners which need historical events from 'begin' block
// should be registered at the moment of calling this function. The 'afterBlock' callback is
// invoked after all registered events listeners are already invoked.
func (c *Client) StartEventsListening(
begin types.BlockNumber,
afterBlock func(blockNumber types.BlockNumber, blockHash types.Hash),
) (context.CancelFunc, <-chan error, error) {
if !atomic.CompareAndSwapUint32(&c.isListening, 0, 1) {
return c.stopListening, c.errsListening, nil
return c.cancelListening, c.errsListening, nil
}

c.errsListening = make(chan error)

meta, err := c.RPC.State.GetMetadataLatest()
if err != nil {
return nil, nil, err
Expand All @@ -68,77 +78,143 @@ func (c *Client) StartEventsListening() (func(), <-chan error, error) {
return nil, nil, err
}

done := make(chan struct{})
c.errsListening = make(chan error)
liveChangesC := sub.Chan()
histChangesC := make(chan types.StorageChangeSet)

// Query historical changes.
var cancelled atomic.Value
cancelled.Store(false)
go func(begin types.BlockNumber, liveChanges <-chan types.StorageChangeSet, histChangesC chan types.StorageChangeSet) {
defer close(histChangesC)

set := <-liveChanges // first live changes set block is the last historical block

header, err := c.RPC.Chain.GetHeader(set.Block)
if err != nil {
c.errsListening <- fmt.Errorf("get header: %w", err)
return
}

for currentBlock := begin; currentBlock < header.Number; currentBlock++ {
blockHash, err := c.RPC.Chain.GetBlockHash(uint64(currentBlock))
if err != nil {
c.errsListening <- fmt.Errorf("get block hash: %w", err)
return
}

blockChangesSets, err := c.RPC.State.QueryStorageAt([]types.StorageKey{key}, blockHash)
if err != nil {
c.errsListening <- fmt.Errorf("query storage: %w", err)
return
}

for _, set := range blockChangesSets {
histChangesC <- set
}

go func() {
for {
select {
case <-done:
// Graceful stop must finish the block before exiting.
if cancelled.Load().(bool) {
return
case set := <-sub.Chan():
header, err := c.RPC.Chain.GetHeader(set.Block)
}
}

histChangesC <- set
}(begin, liveChangesC, histChangesC)

// Sequence historical and live changes.
changesC := make(chan types.StorageChangeSet)
go func(histChangesC, liveChangesC <-chan types.StorageChangeSet, changesC chan types.StorageChangeSet) {
defer close(changesC)

for set := range histChangesC {
changesC <- set
}

for set := range liveChangesC {
changesC <- set
}
}(histChangesC, liveChangesC, changesC)

// Decode events from changes skipping blocks before 'begin'.
eventsC := make(chan blockEvents)
go func(changesC <-chan types.StorageChangeSet, eventsC chan blockEvents) {
defer close(eventsC)

for set := range changesC {
header, err := c.RPC.Chain.GetHeader(set.Block)
if err != nil {
c.errsListening <- fmt.Errorf("get header: %w", err)
return
}

if header.Number < begin {
continue
}

for _, change := range set.Changes {
if !codec.Eq(change.StorageKey, key) || !change.HasStorageData {
continue
}

events := &pallets.Events{}
err = types.EventRecordsRaw(change.StorageData).DecodeEventRecords(meta, events)
if err != nil {
c.errsListening <- fmt.Errorf("get header: %w", err)
c.errsListening <- fmt.Errorf("events decoder: %w", err)
continue
}

for _, change := range set.Changes {
if !codec.Eq(change.StorageKey, key) || !change.HasStorageData {
continue
}

events := &pallets.Events{}
err = types.EventRecordsRaw(change.StorageData).DecodeEventRecords(meta, events)
if err != nil {
c.errsListening <- fmt.Errorf("events decoder: %w", err)
continue
}

for _, callback := range c.eventsListeners {
go callback(events, header.Number, set.Block)
}
eventsC <- blockEvents{
Events: events,
Number: header.Number,
Hash: set.Block,
}
}
}
}()
}(changesC, eventsC)

// Invoke listeners.
go func(eventsC <-chan blockEvents) {
for blockEvents := range eventsC {
for callback := range c.eventsListeners {
(*callback)(blockEvents.Events, blockEvents.Number, blockEvents.Hash)
}

if afterBlock != nil {
afterBlock(blockEvents.Number, blockEvents.Hash)
}
}
}(eventsC)

once := sync.Once{}
c.stopListening = func() {
c.cancelListening = func() {
once.Do(func() {
done <- struct{}{}
sub.Unsubscribe()
cancelled.Store(true)
c.isListening = 0
})
}

return c.stopListening, c.errsListening, nil
return c.cancelListening, c.errsListening, nil
}

func (c *Client) RegisterEventsListener(callback EventsListener) (func(), error) {
var idx int
for i := 0; i <= math.MaxInt; i++ {
if _, ok := c.eventsListeners[i]; !ok {
idx = i
break
}
if i == math.MaxInt {
return nil, fmt.Errorf("too many events listeners")
}
}

// RegisterEventsListener subscribes given callback to blockchain events.
func (c *Client) RegisterEventsListener(callback EventsListener) context.CancelFunc {
c.mu.Lock()
c.eventsListeners[idx] = callback
c.eventsListeners[&callback] = struct{}{}
c.mu.Unlock()

once := sync.Once{}
stop := func() {
return func() {
once.Do(func() {
c.mu.Lock()
delete(c.eventsListeners, idx)
delete(c.eventsListeners, &callback)
c.mu.Unlock()
})
}
}

return stop, nil
type blockEvents struct {
Events *pallets.Events
Hash types.Hash
Number types.BlockNumber
}

0 comments on commit 1ebedce

Please sign in to comment.