Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscribe for events from a given block #83

Merged
merged 25 commits into from
Mar 28, 2024
Merged
Changes from 15 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
c4e2ecf
Events extraction helper function
khssnv Mar 15, 2024
a3c6e06
Allow events listener to get older blocks events
khssnv Mar 15, 2024
7134adb
Lock mutex for events listeners map iteration
khssnv Mar 15, 2024
5a86589
Add `RegisterEventsListener` method doc comment
khssnv Mar 15, 2024
afbf476
Stop old blocks processing on event listener stop
khssnv Mar 15, 2024
9e3b9a4
Fix old blocks events passing to all listeners
khssnv Mar 15, 2024
43ccc6b
Blocking invocation of callback on old blocks
khssnv Mar 15, 2024
d554e1d
Complete with old blocks, then subscribe for new
khssnv Mar 15, 2024
752970c
Sequencing historical and new events
khssnv Mar 18, 2024
1d9bc1e
Refactor events sequencing into a type
khssnv Mar 18, 2024
ce79d6e
Read old events up to the first subscription block
khssnv Mar 18, 2024
ecbdf82
Update todo comment
khssnv Mar 18, 2024
c8bce02
Replace custom stop func with context.CancelFunc
khssnv Mar 18, 2024
6cb83df
Use safe RPC method to query historical events
khssnv Mar 20, 2024
aca5bfb
Mark pending events as processed in all branches
khssnv Mar 21, 2024
1d4d723
Thread safe new listener index selection
khssnv Mar 25, 2024
9988c64
More general events handler
khssnv Mar 25, 2024
f9b1edc
More optimal events listeners registry
khssnv Mar 27, 2024
cc38b43
Synchronous call of events listener callback
khssnv Mar 27, 2024
192a395
Accept events listening block completion callback
khssnv Mar 27, 2024
23a6067
Remove begin param from events listener reg method
khssnv Mar 28, 2024
2b0ce76
Remove unused pending events type
khssnv Mar 28, 2024
ac38f49
Allow to start events listening from an old block
khssnv Mar 28, 2024
17f86a8
Skip events from blocks before the `begin` block
khssnv Mar 28, 2024
0a486b3
Add `StartEventsListening` doc comment
khssnv Mar 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 176 additions & 29 deletions blockchain/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package blockchain

import (
"context"
"fmt"
"math"
"sync"
Expand All @@ -21,7 +22,7 @@ type Client struct {
eventsListeners map[int]EventsListener
mu sync.Mutex
isListening uint32
stopListening func()
cancelListening func()
errsListening chan error

DdcClusters pallets.DdcClustersApi
Expand Down Expand Up @@ -50,9 +51,9 @@ func NewClient(url string) (*Client, error) {
}, nil
}

func (c *Client) StartEventsListening() (func(), <-chan error, error) {
func (c *Client) StartEventsListening() (context.CancelFunc, <-chan error, error) {
if !atomic.CompareAndSwapUint32(&c.isListening, 0, 1) {
return c.stopListening, c.errsListening, nil
return c.cancelListening, c.errsListening, nil
}

meta, err := c.RPC.State.GetMetadataLatest()
Expand All @@ -77,10 +78,105 @@ func (c *Client) StartEventsListening() (func(), <-chan error, error) {
case <-done:
return
case set := <-sub.Chan():
c.processSystemEventsStorageChanges(
set.Changes,
meta,
key,
set.Block,
)
}
}
}()

once := sync.Once{}
c.cancelListening = func() {
once.Do(func() {
done <- struct{}{}
sub.Unsubscribe()
c.isListening = 0
})
}

return c.cancelListening, c.errsListening, nil
}

// RegisterEventsListener subscribes given callback to blockchain events. There is a begin parameter which
// can be used to get events from blocks older than the latest block. If begin is greater than the latest
// block number, the listener will start from the latest block.
func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback EventsListener) (context.CancelFunc, error) {
upalinski marked this conversation as resolved.
Show resolved Hide resolved
var idx int
for i := 0; i <= math.MaxInt; i++ {
upalinski marked this conversation as resolved.
Show resolved Hide resolved
if _, ok := c.eventsListeners[i]; !ok {
idx = i
break
}
if i == math.MaxInt {
return nil, fmt.Errorf("too many events listeners")
}
}

// Collect events starting from the latest block to process them after completion with old blocks.
pendingEvents := &pendingEvents{}
subscriptionStartBlock := uint32(0)
subscriptionStarted := make(chan struct{})
callbackWrapper := func(events *pallets.Events, blockNumber types.BlockNumber, blockHash types.Hash) {
if atomic.CompareAndSwapUint32(&subscriptionStartBlock, 0, uint32(blockNumber)) {
close(subscriptionStarted)
}

if pendingEvents.TryPush(events, blockHash, blockNumber) {
return
}

callback(events, blockNumber, blockHash)
}

c.mu.Lock()
c.eventsListeners[idx] = callbackWrapper
c.mu.Unlock()

cancelled := false

go func() {
defer pendingEvents.Do(callback)

<-subscriptionStarted

if begin >= types.BlockNumber(subscriptionStartBlock) {
return
}

// TODO: get for begin block and update each runtime upgrade
meta, err := c.RPC.State.GetMetadataLatest()
if err != nil {
c.errsListening <- fmt.Errorf("get metadata: %w", err)
return
}

key, err := types.CreateStorageKey(meta, "System", "Events")
if err != nil {
c.errsListening <- fmt.Errorf("create storage key: %w", err)
return
}

for currentBlock := uint32(begin); currentBlock < subscriptionStartBlock; currentBlock++ {
bHash, err := c.RPC.Chain.GetBlockHash(uint64(currentBlock))
if err != nil {
c.errsListening <- fmt.Errorf("get block hash: %w", err)
return
}

blockChangesSets, err := c.RPC.State.QueryStorageAt([]types.StorageKey{key}, bHash)
if err != nil {
c.errsListening <- fmt.Errorf("query storage: %w", err)
return
}

for _, set := range blockChangesSets {
header, err := c.RPC.Chain.GetHeader(set.Block)
if err != nil {
c.errsListening <- fmt.Errorf("get header: %w", err)
continue
return
}

for _, change := range set.Changes {
Expand All @@ -95,50 +191,101 @@ func (c *Client) StartEventsListening() (func(), <-chan error, error) {
continue
}

for _, callback := range c.eventsListeners {
go callback(events, header.Number, set.Block)
if cancelled {
return
}

callback(events, header.Number, set.Block)
}
}
}
}()

once := sync.Once{}
c.stopListening = func() {
cancel := func() {
once.Do(func() {
done <- struct{}{}
sub.Unsubscribe()
c.isListening = 0
c.mu.Lock()
cancelled = true
delete(c.eventsListeners, idx)
c.mu.Unlock()
})
}

return c.stopListening, c.errsListening, nil
return cancel, nil
}

func (c *Client) RegisterEventsListener(callback EventsListener) (func(), error) {
var idx int
for i := 0; i <= math.MaxInt; i++ {
if _, ok := c.eventsListeners[i]; !ok {
idx = i
break
func (c *Client) processSystemEventsStorageChanges(
upalinski marked this conversation as resolved.
Show resolved Hide resolved
changes []types.KeyValueOption,
meta *types.Metadata,
storageKey types.StorageKey,
blockHash types.Hash,
) {
header, err := c.RPC.Chain.GetHeader(blockHash)
if err != nil {
c.errsListening <- fmt.Errorf("get header: %w", err)
return
}

for _, change := range changes {
if !codec.Eq(change.StorageKey, storageKey) || !change.HasStorageData {
continue
}
if i == math.MaxInt {
return nil, fmt.Errorf("too many events listeners")

events := &pallets.Events{}
err = types.EventRecordsRaw(change.StorageData).DecodeEventRecords(meta, events)
if err != nil {
c.errsListening <- fmt.Errorf("events decoder: %w", err)
continue
}

c.mu.Lock()
for _, callback := range c.eventsListeners {
go callback(events, header.Number, blockHash)
}
c.mu.Unlock()
}
}

c.mu.Lock()
c.eventsListeners[idx] = callback
c.mu.Unlock()
type blockEvents struct {
Events *pallets.Events
Hash types.Hash
Number types.BlockNumber
}

once := sync.Once{}
stop := func() {
once.Do(func() {
c.mu.Lock()
delete(c.eventsListeners, idx)
c.mu.Unlock()
type pendingEvents struct {
list []*blockEvents
mu sync.Mutex
done bool
}

func (pe *pendingEvents) TryPush(events *pallets.Events, hash types.Hash, number types.BlockNumber) bool {
pe.mu.Lock()
if !pe.done {
pe.list = append(pe.list, &blockEvents{
Events: events,
Hash: hash,
Number: number,
})
pe.mu.Unlock()
return true
}
pe.mu.Unlock()
return false
}

func (pe *pendingEvents) Do(callback EventsListener) {
for {
pe.mu.Lock()

return stop, nil
if len(pe.list) == 0 {
pe.done = true
pe.mu.Unlock()
break
}

callback(pe.list[0].Events, pe.list[0].Number, pe.list[0].Hash)

pe.list = pe.list[1:]
pe.mu.Unlock()
}
}
Loading