Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscribe for events from a given block #83

Merged
merged 25 commits into from
Mar 28, 2024
Merged
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
c4e2ecf
Events extraction helper function
khssnv Mar 15, 2024
a3c6e06
Allow events listener to get older blocks events
khssnv Mar 15, 2024
7134adb
Lock mutex for events listeners map iteration
khssnv Mar 15, 2024
5a86589
Add `RegisterEventsListener` method doc comment
khssnv Mar 15, 2024
afbf476
Stop old blocks processing on event listener stop
khssnv Mar 15, 2024
9e3b9a4
Fix old blocks events passing to all listeners
khssnv Mar 15, 2024
43ccc6b
Blocking invocation of callback on old blocks
khssnv Mar 15, 2024
d554e1d
Complete with old blocks, then subscribe for new
khssnv Mar 15, 2024
752970c
Sequencing historical and new events
khssnv Mar 18, 2024
1d9bc1e
Refactor events sequencing into a type
khssnv Mar 18, 2024
ce79d6e
Read old events up to the first subscription block
khssnv Mar 18, 2024
ecbdf82
Update todo comment
khssnv Mar 18, 2024
c8bce02
Replace custom stop func with context.CancelFunc
khssnv Mar 18, 2024
6cb83df
Use safe RPC method to query historical events
khssnv Mar 20, 2024
aca5bfb
Mark pending events as processed in all branches
khssnv Mar 21, 2024
1d4d723
Thread safe new listener index selection
khssnv Mar 25, 2024
9988c64
More general events handler
khssnv Mar 25, 2024
f9b1edc
More optimal events listeners registry
khssnv Mar 27, 2024
cc38b43
Synchronous call of events listener callback
khssnv Mar 27, 2024
192a395
Accept events listening block completion callback
khssnv Mar 27, 2024
23a6067
Remove begin param from events listener reg method
khssnv Mar 28, 2024
2b0ce76
Remove unused pending events type
khssnv Mar 28, 2024
ac38f49
Allow to start events listening from an old block
khssnv Mar 28, 2024
17f86a8
Skip events from blocks before the `begin` block
khssnv Mar 28, 2024
0a486b3
Add `StartEventsListening` doc comment
khssnv Mar 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 126 additions & 50 deletions blockchain/client.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package blockchain

import (
"context"
"fmt"
"math"
"sync"
"sync/atomic"

Expand All @@ -18,10 +18,10 @@ type EventsListener func(events *pallets.Events, blockNumber types.BlockNumber,
type Client struct {
*gsrpc.SubstrateAPI

eventsListeners map[int]EventsListener
eventsListeners map[*EventsListener]struct{}
mu sync.Mutex
isListening uint32
stopListening func()
cancelListening func()
errsListening chan error

DdcClusters pallets.DdcClustersApi
Expand All @@ -42,19 +42,29 @@ func NewClient(url string) (*Client, error) {

return &Client{
SubstrateAPI: substrateApi,
eventsListeners: make(map[int]EventsListener),
eventsListeners: make(map[*EventsListener]struct{}),
DdcClusters: pallets.NewDdcClustersApi(substrateApi),
DdcCustomers: pallets.NewDdcCustomersApi(substrateApi, meta),
DdcNodes: pallets.NewDdcNodesApi(substrateApi, meta),
DdcPayouts: pallets.NewDdcPayoutsApi(substrateApi, meta),
}, nil
}

func (c *Client) StartEventsListening() (func(), <-chan error, error) {
// StartEventsListening subscribes for blockchain events and passes events starting from the
// 'begin' block to registered events listeners. Listeners registered after this call will only
// receive live events meaning all listeners which need historical events from 'begin' block
// should be registered at the moment of calling this function. The 'afterBlock' callback is
// invoked after all registered events listeners are already invoked.
func (c *Client) StartEventsListening(
begin types.BlockNumber,
afterBlock func(blockNumber types.BlockNumber, blockHash types.Hash),
) (context.CancelFunc, <-chan error, error) {
if !atomic.CompareAndSwapUint32(&c.isListening, 0, 1) {
return c.stopListening, c.errsListening, nil
return c.cancelListening, c.errsListening, nil
}

c.errsListening = make(chan error)

meta, err := c.RPC.State.GetMetadataLatest()
if err != nil {
return nil, nil, err
Expand All @@ -68,77 +78,143 @@ func (c *Client) StartEventsListening() (func(), <-chan error, error) {
return nil, nil, err
}

done := make(chan struct{})
c.errsListening = make(chan error)
liveChangesC := sub.Chan()
histChangesC := make(chan types.StorageChangeSet)

// Query historical changes.
var cancelled atomic.Value
cancelled.Store(false)
go func(begin types.BlockNumber, liveChanges <-chan types.StorageChangeSet, histChangesC chan types.StorageChangeSet) {
defer close(histChangesC)

set := <-liveChanges // first live changes set block is the last historical block

header, err := c.RPC.Chain.GetHeader(set.Block)
if err != nil {
c.errsListening <- fmt.Errorf("get header: %w", err)
return
}

for currentBlock := begin; currentBlock < header.Number; currentBlock++ {
blockHash, err := c.RPC.Chain.GetBlockHash(uint64(currentBlock))
if err != nil {
c.errsListening <- fmt.Errorf("get block hash: %w", err)
return
}

blockChangesSets, err := c.RPC.State.QueryStorageAt([]types.StorageKey{key}, blockHash)
if err != nil {
c.errsListening <- fmt.Errorf("query storage: %w", err)
return
}

for _, set := range blockChangesSets {
histChangesC <- set
}

go func() {
for {
select {
case <-done:
// Graceful stop must finish the block before exiting.
if cancelled.Load().(bool) {
return
case set := <-sub.Chan():
header, err := c.RPC.Chain.GetHeader(set.Block)
}
}

histChangesC <- set
}(begin, liveChangesC, histChangesC)

// Sequence historical and live changes.
changesC := make(chan types.StorageChangeSet)
go func(histChangesC, liveChangesC <-chan types.StorageChangeSet, changesC chan types.StorageChangeSet) {
defer close(changesC)

for set := range histChangesC {
changesC <- set
}

for set := range liveChangesC {
changesC <- set
}
}(histChangesC, liveChangesC, changesC)

// Decode events from changes skipping blocks before 'begin'.
eventsC := make(chan blockEvents)
go func(changesC <-chan types.StorageChangeSet, eventsC chan blockEvents) {
defer close(eventsC)

for set := range changesC {
header, err := c.RPC.Chain.GetHeader(set.Block)
if err != nil {
c.errsListening <- fmt.Errorf("get header: %w", err)
return
}

if header.Number < begin {
continue
}

for _, change := range set.Changes {
if !codec.Eq(change.StorageKey, key) || !change.HasStorageData {
continue
}

events := &pallets.Events{}
err = types.EventRecordsRaw(change.StorageData).DecodeEventRecords(meta, events)
if err != nil {
c.errsListening <- fmt.Errorf("get header: %w", err)
c.errsListening <- fmt.Errorf("events decoder: %w", err)
continue
}

for _, change := range set.Changes {
if !codec.Eq(change.StorageKey, key) || !change.HasStorageData {
continue
}

events := &pallets.Events{}
err = types.EventRecordsRaw(change.StorageData).DecodeEventRecords(meta, events)
if err != nil {
c.errsListening <- fmt.Errorf("events decoder: %w", err)
continue
}

for _, callback := range c.eventsListeners {
go callback(events, header.Number, set.Block)
}
eventsC <- blockEvents{
Events: events,
Number: header.Number,
Hash: set.Block,
}
}
}
}()
}(changesC, eventsC)

// Invoke listeners.
go func(eventsC <-chan blockEvents) {
for blockEvents := range eventsC {
for callback := range c.eventsListeners {
(*callback)(blockEvents.Events, blockEvents.Number, blockEvents.Hash)
}

if afterBlock != nil {
afterBlock(blockEvents.Number, blockEvents.Hash)
}
}
}(eventsC)

once := sync.Once{}
c.stopListening = func() {
c.cancelListening = func() {
once.Do(func() {
done <- struct{}{}
sub.Unsubscribe()
cancelled.Store(true)
c.isListening = 0
})
}

return c.stopListening, c.errsListening, nil
return c.cancelListening, c.errsListening, nil
}

func (c *Client) RegisterEventsListener(callback EventsListener) (func(), error) {
var idx int
for i := 0; i <= math.MaxInt; i++ {
if _, ok := c.eventsListeners[i]; !ok {
idx = i
break
}
if i == math.MaxInt {
return nil, fmt.Errorf("too many events listeners")
}
}

// RegisterEventsListener subscribes given callback to blockchain events.
func (c *Client) RegisterEventsListener(callback EventsListener) context.CancelFunc {
c.mu.Lock()
c.eventsListeners[idx] = callback
c.eventsListeners[&callback] = struct{}{}
c.mu.Unlock()

once := sync.Once{}
stop := func() {
return func() {
once.Do(func() {
c.mu.Lock()
delete(c.eventsListeners, idx)
delete(c.eventsListeners, &callback)
c.mu.Unlock()
})
}
}

return stop, nil
type blockEvents struct {
Events *pallets.Events
Hash types.Hash
Number types.BlockNumber
}
Loading