From c4e2ecfab6c0c6c1f1577868dec9d5ebe13b1634 Mon Sep 17 00:00:00 2001 From: "Alisher A. Khassanov" Date: Fri, 15 Mar 2024 13:18:49 +0500 Subject: [PATCH 01/25] Events extraction helper function --- blockchain/client.go | 58 +++++++++++++++++++++++++++----------------- 1 file changed, 36 insertions(+), 22 deletions(-) diff --git a/blockchain/client.go b/blockchain/client.go index 6c5fe20..8d5fd53 100644 --- a/blockchain/client.go +++ b/blockchain/client.go @@ -77,28 +77,12 @@ func (c *Client) StartEventsListening() (func(), <-chan error, error) { case <-done: return case set := <-sub.Chan(): - header, err := c.RPC.Chain.GetHeader(set.Block) - if err != nil { - c.errsListening <- fmt.Errorf("get header: %w", err) - continue - } - - for _, change := range set.Changes { - if !codec.Eq(change.StorageKey, key) || !change.HasStorageData { - continue - } - - events := &pallets.Events{} - err = types.EventRecordsRaw(change.StorageData).DecodeEventRecords(meta, events) - if err != nil { - c.errsListening <- fmt.Errorf("events decoder: %w", err) - continue - } - - for _, callback := range c.eventsListeners { - go callback(events, header.Number, set.Block) - } - } + c.processSystemEventsStorageChanges( + set.Changes, + meta, + key, + set.Block, + ) } } }() @@ -142,3 +126,33 @@ func (c *Client) RegisterEventsListener(callback EventsListener) (func(), error) return stop, nil } + +func (c *Client) processSystemEventsStorageChanges( + changes []types.KeyValueOption, + meta *types.Metadata, + storageKey types.StorageKey, + blockHash types.Hash, +) { + header, err := c.RPC.Chain.GetHeader(blockHash) + if err != nil { + c.errsListening <- fmt.Errorf("get header: %w", err) + return + } + + for _, change := range changes { + if !codec.Eq(change.StorageKey, storageKey) || !change.HasStorageData { + continue + } + + events := &pallets.Events{} + err = types.EventRecordsRaw(change.StorageData).DecodeEventRecords(meta, events) + if err != nil { + c.errsListening <- fmt.Errorf("events decoder: %w", err) + continue + } + + for _, callback := range c.eventsListeners { + go callback(events, header.Number, blockHash) + } + } +} From a3c6e060e80dea47b77435a4c36cecff5901b086 Mon Sep 17 00:00:00 2001 From: "Alisher A. Khassanov" Date: Fri, 15 Mar 2024 13:22:05 +0500 Subject: [PATCH 02/25] Allow events listener to get older blocks events --- blockchain/client.go | 53 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-) diff --git a/blockchain/client.go b/blockchain/client.go index 8d5fd53..edc2659 100644 --- a/blockchain/client.go +++ b/blockchain/client.go @@ -99,7 +99,7 @@ func (c *Client) StartEventsListening() (func(), <-chan error, error) { return c.stopListening, c.errsListening, nil } -func (c *Client) RegisterEventsListener(callback EventsListener) (func(), error) { +func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback EventsListener) (func(), error) { var idx int for i := 0; i <= math.MaxInt; i++ { if _, ok := c.eventsListeners[i]; !ok { @@ -115,6 +115,57 @@ func (c *Client) RegisterEventsListener(callback EventsListener) (func(), error) c.eventsListeners[idx] = callback c.mu.Unlock() + go func() { + latestHeader, err := c.RPC.Chain.GetHeaderLatest() + if err != nil { + c.errsListening <- fmt.Errorf("get latest header: %w", err) + return + } + + if begin >= latestHeader.Number { + return + } + + meta, err := c.RPC.State.GetMetadataLatest() // TODO: update each runtime upgrade + if err != nil { + c.errsListening <- fmt.Errorf("get metadata: %w", err) + return + } + + key, err := types.CreateStorageKey(meta, "System", "Events") + if err != nil { + c.errsListening <- fmt.Errorf("create storage key: %w", err) + return + } + + beginHash, err := c.RPC.Chain.GetBlockHash(uint64(begin)) + if err != nil { + c.errsListening <- fmt.Errorf("get block hash: %w", err) + return + } + + latestHash, err := c.RPC.Chain.GetBlockHashLatest() + if err != nil { + c.errsListening <- fmt.Errorf("get latest block hash: %w", err) + return + } + + changesSets, err := c.RPC.State.QueryStorage([]types.StorageKey{key}, beginHash, latestHash) + if err != nil { + c.errsListening <- fmt.Errorf("storage changes query: %w", err) + return + } + + for _, set := range changesSets { + c.processSystemEventsStorageChanges( + set.Changes, + meta, + key, + set.Block, + ) + } + }() + once := sync.Once{} stop := func() { once.Do(func() { From 7134adb3bb03f764b1ee36fba2f1ac4d09dc5f88 Mon Sep 17 00:00:00 2001 From: "Alisher A. Khassanov" Date: Fri, 15 Mar 2024 13:37:48 +0500 Subject: [PATCH 03/25] Lock mutex for events listeners map iteration --- blockchain/client.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/blockchain/client.go b/blockchain/client.go index edc2659..62cc3cc 100644 --- a/blockchain/client.go +++ b/blockchain/client.go @@ -202,8 +202,10 @@ func (c *Client) processSystemEventsStorageChanges( continue } + c.mu.Lock() for _, callback := range c.eventsListeners { go callback(events, header.Number, blockHash) } + c.mu.Unlock() } } From 5a8658928ddf5943950f72353623c3f33cd3274c Mon Sep 17 00:00:00 2001 From: "Alisher A. Khassanov" Date: Fri, 15 Mar 2024 14:14:22 +0500 Subject: [PATCH 04/25] Add `RegisterEventsListener` method doc comment --- blockchain/client.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/blockchain/client.go b/blockchain/client.go index 62cc3cc..bf087a5 100644 --- a/blockchain/client.go +++ b/blockchain/client.go @@ -99,6 +99,11 @@ func (c *Client) StartEventsListening() (func(), <-chan error, error) { return c.stopListening, c.errsListening, nil } +// RegisterEventsListener subscribes given callback to blockchain events. There is a begin parameter which +// can be used to get events from blocks older than the latest block. If begin is greater than the latest +// block number, the listener will start from the latest block. Subscription on new events starts +// immediately and does not wait until the older blocks events are processed. Rare cases of events +// duplication are possible. func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback EventsListener) (func(), error) { var idx int for i := 0; i <= math.MaxInt; i++ { From afbf476cd17a854489d2ca7b2162d6f7e26260e5 Mon Sep 17 00:00:00 2001 From: "Alisher A. Khassanov" Date: Fri, 15 Mar 2024 16:46:24 +0500 Subject: [PATCH 05/25] Stop old blocks processing on event listener stop --- blockchain/client.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/blockchain/client.go b/blockchain/client.go index bf087a5..5b63ecd 100644 --- a/blockchain/client.go +++ b/blockchain/client.go @@ -120,6 +120,8 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events c.eventsListeners[idx] = callback c.mu.Unlock() + stopped := false + go func() { latestHeader, err := c.RPC.Chain.GetHeaderLatest() if err != nil { @@ -162,6 +164,10 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events } for _, set := range changesSets { + if stopped { + return + } + c.processSystemEventsStorageChanges( set.Changes, meta, @@ -175,6 +181,7 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events stop := func() { once.Do(func() { c.mu.Lock() + stopped = true delete(c.eventsListeners, idx) c.mu.Unlock() }) From 9e3b9a4cb1b67e0b4ffe5b6ce6e4d98d337cce4b Mon Sep 17 00:00:00 2001 From: "Alisher A. Khassanov" Date: Fri, 15 Mar 2024 16:56:04 +0500 Subject: [PATCH 06/25] Fix old blocks events passing to all listeners --- blockchain/client.go | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/blockchain/client.go b/blockchain/client.go index 5b63ecd..cc1d5f9 100644 --- a/blockchain/client.go +++ b/blockchain/client.go @@ -164,16 +164,30 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events } for _, set := range changesSets { - if stopped { + header, err := c.RPC.Chain.GetHeader(set.Block) + if err != nil { + c.errsListening <- fmt.Errorf("get header: %w", err) return } - c.processSystemEventsStorageChanges( - set.Changes, - meta, - key, - set.Block, - ) + for _, change := range set.Changes { + if !codec.Eq(change.StorageKey, key) || !change.HasStorageData { + continue + } + + events := &pallets.Events{} + err = types.EventRecordsRaw(change.StorageData).DecodeEventRecords(meta, events) + if err != nil { + c.errsListening <- fmt.Errorf("events decoder: %w", err) + continue + } + + if stopped { + return + } + + go callback(events, header.Number, set.Block) + } } }() From 43ccc6b4d877615622a0d8aca4db53e250ec685a Mon Sep 17 00:00:00 2001 From: "Alisher A. Khassanov" Date: Fri, 15 Mar 2024 17:00:24 +0500 Subject: [PATCH 07/25] Blocking invocation of callback on old blocks --- blockchain/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/blockchain/client.go b/blockchain/client.go index cc1d5f9..e2bf145 100644 --- a/blockchain/client.go +++ b/blockchain/client.go @@ -186,7 +186,7 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events return } - go callback(events, header.Number, set.Block) + callback(events, header.Number, set.Block) } } }() From d554e1d048ee89ada16b7cfa9cbe87cbb2b29fdf Mon Sep 17 00:00:00 2001 From: "Alisher A. Khassanov" Date: Fri, 15 Mar 2024 17:03:40 +0500 Subject: [PATCH 08/25] Complete with old blocks, then subscribe for new --- blockchain/client.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/blockchain/client.go b/blockchain/client.go index e2bf145..7158004 100644 --- a/blockchain/client.go +++ b/blockchain/client.go @@ -116,10 +116,6 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events } } - c.mu.Lock() - c.eventsListeners[idx] = callback - c.mu.Unlock() - stopped := false go func() { @@ -189,6 +185,12 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events callback(events, header.Number, set.Block) } } + + c.mu.Lock() + if !stopped { + c.eventsListeners[idx] = callback + } + c.mu.Unlock() }() once := sync.Once{} From 752970c82352b650df16b329453619d7725144f7 Mon Sep 17 00:00:00 2001 From: "Alisher A. Khassanov" Date: Mon, 18 Mar 2024 14:54:41 +0500 Subject: [PATCH 09/25] Sequencing historical and new events --- blockchain/client.go | 51 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 44 insertions(+), 7 deletions(-) diff --git a/blockchain/client.go b/blockchain/client.go index 7158004..b4b50fa 100644 --- a/blockchain/client.go +++ b/blockchain/client.go @@ -101,9 +101,7 @@ func (c *Client) StartEventsListening() (func(), <-chan error, error) { // RegisterEventsListener subscribes given callback to blockchain events. There is a begin parameter which // can be used to get events from blocks older than the latest block. If begin is greater than the latest -// block number, the listener will start from the latest block. Subscription on new events starts -// immediately and does not wait until the older blocks events are processed. Rare cases of events -// duplication are possible. +// block number, the listener will start from the latest block. func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback EventsListener) (func(), error) { var idx int for i := 0; i <= math.MaxInt; i++ { @@ -116,6 +114,31 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events } } + // Start events collection from the latest block to process them after completion with old blocks. + var pendingEvents []*blockEvents + var pendingEventsMu sync.Mutex + var pendingEventsDone bool + + callbackWrapper := func(events *pallets.Events, blockNumber types.BlockNumber, blockHash types.Hash) { + pendingEventsMu.Lock() + defer pendingEventsMu.Unlock() + + if !pendingEventsDone { + pendingEvents = append(pendingEvents, &blockEvents{ + Events: events, + Hash: blockHash, + Number: blockNumber, + }) + return + } + + callback(events, blockNumber, blockHash) + } + + c.mu.Lock() + c.eventsListeners[idx] = callbackWrapper + c.mu.Unlock() + stopped := false go func() { @@ -186,11 +209,19 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events } } - c.mu.Lock() - if !stopped { - c.eventsListeners[idx] = callback + for { + pendingEventsMu.Lock() + if len(pendingEvents) == 0 { + pendingEventsDone = true + pendingEventsMu.Unlock() + break + } + + callback(pendingEvents[0].Events, pendingEvents[0].Number, pendingEvents[0].Hash) + + pendingEvents = pendingEvents[1:] + pendingEventsMu.Unlock() } - c.mu.Unlock() }() once := sync.Once{} @@ -237,3 +268,9 @@ func (c *Client) processSystemEventsStorageChanges( c.mu.Unlock() } } + +type blockEvents struct { + Events *pallets.Events + Hash types.Hash + Number types.BlockNumber +} From 1d9bc1ef46ae8a45837c88a6ade3bde0e0c7fe39 Mon Sep 17 00:00:00 2001 From: "Alisher A. Khassanov" Date: Mon, 18 Mar 2024 15:05:55 +0500 Subject: [PATCH 10/25] Refactor events sequencing into a type --- blockchain/client.go | 69 +++++++++++++++++++++++++++----------------- 1 file changed, 42 insertions(+), 27 deletions(-) diff --git a/blockchain/client.go b/blockchain/client.go index b4b50fa..f715c68 100644 --- a/blockchain/client.go +++ b/blockchain/client.go @@ -114,21 +114,10 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events } } - // Start events collection from the latest block to process them after completion with old blocks. - var pendingEvents []*blockEvents - var pendingEventsMu sync.Mutex - var pendingEventsDone bool - + // Collect events starting from the latest block to process them after completion with old blocks. + pendingEvents := &pendingEvents{} callbackWrapper := func(events *pallets.Events, blockNumber types.BlockNumber, blockHash types.Hash) { - pendingEventsMu.Lock() - defer pendingEventsMu.Unlock() - - if !pendingEventsDone { - pendingEvents = append(pendingEvents, &blockEvents{ - Events: events, - Hash: blockHash, - Number: blockNumber, - }) + if pendingEvents.TryPush(events, blockHash, blockNumber) { return } @@ -209,19 +198,7 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events } } - for { - pendingEventsMu.Lock() - if len(pendingEvents) == 0 { - pendingEventsDone = true - pendingEventsMu.Unlock() - break - } - - callback(pendingEvents[0].Events, pendingEvents[0].Number, pendingEvents[0].Hash) - - pendingEvents = pendingEvents[1:] - pendingEventsMu.Unlock() - } + pendingEvents.Do(callback) }() once := sync.Once{} @@ -274,3 +251,41 @@ type blockEvents struct { Hash types.Hash Number types.BlockNumber } + +type pendingEvents struct { + list []*blockEvents + mu sync.Mutex + done bool +} + +func (pe *pendingEvents) TryPush(events *pallets.Events, hash types.Hash, number types.BlockNumber) bool { + pe.mu.Lock() + if !pe.done { + pe.list = append(pe.list, &blockEvents{ + Events: events, + Hash: hash, + Number: number, + }) + pe.mu.Unlock() + return true + } + pe.mu.Unlock() + return false +} + +func (pe *pendingEvents) Do(callback EventsListener) { + for { + pe.mu.Lock() + + if len(pe.list) == 0 { + pe.done = true + pe.mu.Unlock() + break + } + + callback(pe.list[0].Events, pe.list[0].Number, pe.list[0].Hash) + + pe.list = pe.list[1:] + pe.mu.Unlock() + } +} From ce79d6effca7af380ae8e8736345a4878a94a5ee Mon Sep 17 00:00:00 2001 From: "Alisher A. Khassanov" Date: Mon, 18 Mar 2024 15:21:18 +0500 Subject: [PATCH 11/25] Read old events up to the first subscription block --- blockchain/client.go | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/blockchain/client.go b/blockchain/client.go index f715c68..c31011e 100644 --- a/blockchain/client.go +++ b/blockchain/client.go @@ -116,7 +116,13 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events // Collect events starting from the latest block to process them after completion with old blocks. pendingEvents := &pendingEvents{} + subscriptionStartBlock := uint32(0) + subscriptionStarted := make(chan struct{}) callbackWrapper := func(events *pallets.Events, blockNumber types.BlockNumber, blockHash types.Hash) { + if atomic.CompareAndSwapUint32(&subscriptionStartBlock, 0, uint32(blockNumber)) { + close(subscriptionStarted) + } + if pendingEvents.TryPush(events, blockHash, blockNumber) { return } @@ -131,13 +137,9 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events stopped := false go func() { - latestHeader, err := c.RPC.Chain.GetHeaderLatest() - if err != nil { - c.errsListening <- fmt.Errorf("get latest header: %w", err) - return - } + <-subscriptionStarted - if begin >= latestHeader.Number { + if begin >= types.BlockNumber(subscriptionStartBlock) { return } @@ -159,13 +161,13 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events return } - latestHash, err := c.RPC.Chain.GetBlockHashLatest() + endHash, err := c.RPC.Chain.GetBlockHash(uint64(subscriptionStartBlock - 1)) if err != nil { - c.errsListening <- fmt.Errorf("get latest block hash: %w", err) + c.errsListening <- fmt.Errorf("get block hash: %w", err) return } - changesSets, err := c.RPC.State.QueryStorage([]types.StorageKey{key}, beginHash, latestHash) + changesSets, err := c.RPC.State.QueryStorage([]types.StorageKey{key}, beginHash, endHash) if err != nil { c.errsListening <- fmt.Errorf("storage changes query: %w", err) return From ecbdf8279e9afde903255daa82627a7b82634b8d Mon Sep 17 00:00:00 2001 From: "Alisher A. Khassanov" Date: Mon, 18 Mar 2024 15:27:52 +0500 Subject: [PATCH 12/25] Update todo comment --- blockchain/client.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/blockchain/client.go b/blockchain/client.go index c31011e..7caadd4 100644 --- a/blockchain/client.go +++ b/blockchain/client.go @@ -143,7 +143,8 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events return } - meta, err := c.RPC.State.GetMetadataLatest() // TODO: update each runtime upgrade + // TODO: get for begin block and update each runtime upgrade + meta, err := c.RPC.State.GetMetadataLatest() if err != nil { c.errsListening <- fmt.Errorf("get metadata: %w", err) return From c8bce022d251ca0c8e3635b74d6675ca0228826e Mon Sep 17 00:00:00 2001 From: "Alisher A. Khassanov" Date: Mon, 18 Mar 2024 16:44:02 +0500 Subject: [PATCH 13/25] Replace custom stop func with context.CancelFunc --- blockchain/client.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/blockchain/client.go b/blockchain/client.go index 7caadd4..6d0d036 100644 --- a/blockchain/client.go +++ b/blockchain/client.go @@ -1,6 +1,7 @@ package blockchain import ( + "context" "fmt" "math" "sync" @@ -21,7 +22,7 @@ type Client struct { eventsListeners map[int]EventsListener mu sync.Mutex isListening uint32 - stopListening func() + cancelListening func() errsListening chan error DdcClusters pallets.DdcClustersApi @@ -50,9 +51,9 @@ func NewClient(url string) (*Client, error) { }, nil } -func (c *Client) StartEventsListening() (func(), <-chan error, error) { +func (c *Client) StartEventsListening() (context.CancelFunc, <-chan error, error) { if !atomic.CompareAndSwapUint32(&c.isListening, 0, 1) { - return c.stopListening, c.errsListening, nil + return c.cancelListening, c.errsListening, nil } meta, err := c.RPC.State.GetMetadataLatest() @@ -88,7 +89,7 @@ func (c *Client) StartEventsListening() (func(), <-chan error, error) { }() once := sync.Once{} - c.stopListening = func() { + c.cancelListening = func() { once.Do(func() { done <- struct{}{} sub.Unsubscribe() @@ -96,13 +97,13 @@ func (c *Client) StartEventsListening() (func(), <-chan error, error) { }) } - return c.stopListening, c.errsListening, nil + return c.cancelListening, c.errsListening, nil } // RegisterEventsListener subscribes given callback to blockchain events. There is a begin parameter which // can be used to get events from blocks older than the latest block. If begin is greater than the latest // block number, the listener will start from the latest block. -func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback EventsListener) (func(), error) { +func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback EventsListener) (context.CancelFunc, error) { var idx int for i := 0; i <= math.MaxInt; i++ { if _, ok := c.eventsListeners[i]; !ok { @@ -134,7 +135,7 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events c.eventsListeners[idx] = callbackWrapper c.mu.Unlock() - stopped := false + cancelled := false go func() { <-subscriptionStarted @@ -193,7 +194,7 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events continue } - if stopped { + if cancelled { return } @@ -205,16 +206,16 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events }() once := sync.Once{} - stop := func() { + cancel := func() { once.Do(func() { c.mu.Lock() - stopped = true + cancelled = true delete(c.eventsListeners, idx) c.mu.Unlock() }) } - return stop, nil + return cancel, nil } func (c *Client) processSystemEventsStorageChanges( From 6cb83df63c6bc58949a5e399351ff028df76d59f Mon Sep 17 00:00:00 2001 From: "Alisher A. Khassanov" Date: Wed, 20 Mar 2024 18:19:41 +0500 Subject: [PATCH 14/25] Use safe RPC method to query historical events --- blockchain/client.go | 62 +++++++++++++++++++++----------------------- 1 file changed, 29 insertions(+), 33 deletions(-) diff --git a/blockchain/client.go b/blockchain/client.go index 6d0d036..5a6c7ba 100644 --- a/blockchain/client.go +++ b/blockchain/client.go @@ -157,48 +157,44 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events return } - beginHash, err := c.RPC.Chain.GetBlockHash(uint64(begin)) - if err != nil { - c.errsListening <- fmt.Errorf("get block hash: %w", err) - return - } - - endHash, err := c.RPC.Chain.GetBlockHash(uint64(subscriptionStartBlock - 1)) - if err != nil { - c.errsListening <- fmt.Errorf("get block hash: %w", err) - return - } - - changesSets, err := c.RPC.State.QueryStorage([]types.StorageKey{key}, beginHash, endHash) - if err != nil { - c.errsListening <- fmt.Errorf("storage changes query: %w", err) - return - } - - for _, set := range changesSets { - header, err := c.RPC.Chain.GetHeader(set.Block) + for currentBlock := uint32(begin); currentBlock < subscriptionStartBlock; currentBlock++ { + bHash, err := c.RPC.Chain.GetBlockHash(uint64(currentBlock)) if err != nil { - c.errsListening <- fmt.Errorf("get header: %w", err) + c.errsListening <- fmt.Errorf("get block hash: %w", err) return } - for _, change := range set.Changes { - if !codec.Eq(change.StorageKey, key) || !change.HasStorageData { - continue - } + blockChangesSets, err := c.RPC.State.QueryStorageAt([]types.StorageKey{key}, bHash) + if err != nil { + c.errsListening <- fmt.Errorf("query storage: %w", err) + return + } - events := &pallets.Events{} - err = types.EventRecordsRaw(change.StorageData).DecodeEventRecords(meta, events) + for _, set := range blockChangesSets { + header, err := c.RPC.Chain.GetHeader(set.Block) if err != nil { - c.errsListening <- fmt.Errorf("events decoder: %w", err) - continue - } - - if cancelled { + c.errsListening <- fmt.Errorf("get header: %w", err) return } - callback(events, header.Number, set.Block) + for _, change := range set.Changes { + if !codec.Eq(change.StorageKey, key) || !change.HasStorageData { + continue + } + + events := &pallets.Events{} + err = types.EventRecordsRaw(change.StorageData).DecodeEventRecords(meta, events) + if err != nil { + c.errsListening <- fmt.Errorf("events decoder: %w", err) + continue + } + + if cancelled { + return + } + + callback(events, header.Number, set.Block) + } } } From aca5bfbe605bac1bcdfa8d4c74379a869694c061 Mon Sep 17 00:00:00 2001 From: "Alisher A. Khassanov" Date: Thu, 21 Mar 2024 12:24:19 +0500 Subject: [PATCH 15/25] Mark pending events as processed in all branches --- blockchain/client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/blockchain/client.go b/blockchain/client.go index 5a6c7ba..052a606 100644 --- a/blockchain/client.go +++ b/blockchain/client.go @@ -138,6 +138,8 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events cancelled := false go func() { + defer pendingEvents.Do(callback) + <-subscriptionStarted if begin >= types.BlockNumber(subscriptionStartBlock) { @@ -197,8 +199,6 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events } } } - - pendingEvents.Do(callback) }() once := sync.Once{} From 1d4d72343ad300c8b6986dcd6c7bafd3d9da1e0b Mon Sep 17 00:00:00 2001 From: "Alisher A. Khassanov" Date: Mon, 25 Mar 2024 10:06:35 +0500 Subject: [PATCH 16/25] Thread safe new listener index selection --- blockchain/client.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/blockchain/client.go b/blockchain/client.go index 052a606..5b250fe 100644 --- a/blockchain/client.go +++ b/blockchain/client.go @@ -104,17 +104,6 @@ func (c *Client) StartEventsListening() (context.CancelFunc, <-chan error, error // can be used to get events from blocks older than the latest block. If begin is greater than the latest // block number, the listener will start from the latest block. func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback EventsListener) (context.CancelFunc, error) { - var idx int - for i := 0; i <= math.MaxInt; i++ { - if _, ok := c.eventsListeners[i]; !ok { - idx = i - break - } - if i == math.MaxInt { - return nil, fmt.Errorf("too many events listeners") - } - } - // Collect events starting from the latest block to process them after completion with old blocks. pendingEvents := &pendingEvents{} subscriptionStartBlock := uint32(0) @@ -132,6 +121,17 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events } c.mu.Lock() + // Find the smallest available key for the new listener. + var idx int + for i := 0; i <= math.MaxInt; i++ { + if _, ok := c.eventsListeners[i]; !ok { + idx = i + break + } + if i == math.MaxInt { + return nil, fmt.Errorf("too many events listeners") + } + } c.eventsListeners[idx] = callbackWrapper c.mu.Unlock() From 9988c642e2bc87b60fe13866dd0674581844c90a Mon Sep 17 00:00:00 2001 From: "Alisher A. Khassanov" Date: Mon, 25 Mar 2024 10:48:15 +0500 Subject: [PATCH 17/25] More general events handler --- blockchain/client.go | 61 ++++++++++++++++++++------------------------ 1 file changed, 27 insertions(+), 34 deletions(-) diff --git a/blockchain/client.go b/blockchain/client.go index 5b250fe..d4ad6a9 100644 --- a/blockchain/client.go +++ b/blockchain/client.go @@ -78,11 +78,18 @@ func (c *Client) StartEventsListening() (context.CancelFunc, <-chan error, error case <-done: return case set := <-sub.Chan(): - c.processSystemEventsStorageChanges( - set.Changes, + c.onChanges( meta, key, + set.Changes, set.Block, + func(events *pallets.Events, blockNumber types.BlockNumber, blockHash types.Hash) { + c.mu.Lock() + for _, callback := range c.eventsListeners { + go callback(events, blockNumber, blockHash) + } + c.mu.Unlock() + }, ) } } @@ -173,30 +180,19 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events } for _, set := range blockChangesSets { - header, err := c.RPC.Chain.GetHeader(set.Block) - if err != nil { - c.errsListening <- fmt.Errorf("get header: %w", err) + if cancelled { return } - for _, change := range set.Changes { - if !codec.Eq(change.StorageKey, key) || !change.HasStorageData { - continue - } - - events := &pallets.Events{} - err = types.EventRecordsRaw(change.StorageData).DecodeEventRecords(meta, events) - if err != nil { - c.errsListening <- fmt.Errorf("events decoder: %w", err) - continue - } - - if cancelled { - return - } - - callback(events, header.Number, set.Block) - } + c.onChanges( + meta, + key, + set.Changes, + set.Block, + func(events *pallets.Events, blockNumber types.BlockNumber, blockHash types.Hash) { + callback(events, blockNumber, blockHash) + }, + ) } } }() @@ -214,20 +210,21 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events return cancel, nil } -func (c *Client) processSystemEventsStorageChanges( - changes []types.KeyValueOption, +func (c *Client) onChanges( meta *types.Metadata, - storageKey types.StorageKey, - blockHash types.Hash, + key types.StorageKey, + changes []types.KeyValueOption, + block types.Hash, + callback EventsListener, ) { - header, err := c.RPC.Chain.GetHeader(blockHash) + header, err := c.RPC.Chain.GetHeader(block) if err != nil { c.errsListening <- fmt.Errorf("get header: %w", err) return } for _, change := range changes { - if !codec.Eq(change.StorageKey, storageKey) || !change.HasStorageData { + if !codec.Eq(change.StorageKey, key) || !change.HasStorageData { continue } @@ -238,11 +235,7 @@ func (c *Client) processSystemEventsStorageChanges( continue } - c.mu.Lock() - for _, callback := range c.eventsListeners { - go callback(events, header.Number, blockHash) - } - c.mu.Unlock() + callback(events, header.Number, block) } } From f9b1edc42bbb4cfe8558314a0d9f9419bb1dc904 Mon Sep 17 00:00:00 2001 From: "Alisher A. Khassanov" Date: Wed, 27 Mar 2024 13:58:07 +0500 Subject: [PATCH 18/25] More optimal events listeners registry --- blockchain/client.go | 29 +++++++++-------------------- 1 file changed, 9 insertions(+), 20 deletions(-) diff --git a/blockchain/client.go b/blockchain/client.go index d4ad6a9..224bc7c 100644 --- a/blockchain/client.go +++ b/blockchain/client.go @@ -3,7 +3,6 @@ package blockchain import ( "context" "fmt" - "math" "sync" "sync/atomic" @@ -19,7 +18,7 @@ type EventsListener func(events *pallets.Events, blockNumber types.BlockNumber, type Client struct { *gsrpc.SubstrateAPI - eventsListeners map[int]EventsListener + eventsListeners map[*EventsListener]struct{} mu sync.Mutex isListening uint32 cancelListening func() @@ -43,7 +42,7 @@ func NewClient(url string) (*Client, error) { return &Client{ SubstrateAPI: substrateApi, - eventsListeners: make(map[int]EventsListener), + eventsListeners: make(map[*EventsListener]struct{}), DdcClusters: pallets.NewDdcClustersApi(substrateApi), DdcCustomers: pallets.NewDdcCustomersApi(substrateApi, meta), DdcNodes: pallets.NewDdcNodesApi(substrateApi, meta), @@ -85,8 +84,8 @@ func (c *Client) StartEventsListening() (context.CancelFunc, <-chan error, error set.Block, func(events *pallets.Events, blockNumber types.BlockNumber, blockHash types.Hash) { c.mu.Lock() - for _, callback := range c.eventsListeners { - go callback(events, blockNumber, blockHash) + for callback := range c.eventsListeners { + go (*callback)(events, blockNumber, blockHash) } c.mu.Unlock() }, @@ -115,7 +114,8 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events pendingEvents := &pendingEvents{} subscriptionStartBlock := uint32(0) subscriptionStarted := make(chan struct{}) - callbackWrapper := func(events *pallets.Events, blockNumber types.BlockNumber, blockHash types.Hash) { + + callbackWrapper := EventsListener(func(events *pallets.Events, blockNumber types.BlockNumber, blockHash types.Hash) { if atomic.CompareAndSwapUint32(&subscriptionStartBlock, 0, uint32(blockNumber)) { close(subscriptionStarted) } @@ -125,21 +125,10 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events } callback(events, blockNumber, blockHash) - } + }) c.mu.Lock() - // Find the smallest available key for the new listener. - var idx int - for i := 0; i <= math.MaxInt; i++ { - if _, ok := c.eventsListeners[i]; !ok { - idx = i - break - } - if i == math.MaxInt { - return nil, fmt.Errorf("too many events listeners") - } - } - c.eventsListeners[idx] = callbackWrapper + c.eventsListeners[&callbackWrapper] = struct{}{} c.mu.Unlock() cancelled := false @@ -202,7 +191,7 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events once.Do(func() { c.mu.Lock() cancelled = true - delete(c.eventsListeners, idx) + delete(c.eventsListeners, &callbackWrapper) c.mu.Unlock() }) } From cc38b430c920ce0f871937938a04545cceb1b675 Mon Sep 17 00:00:00 2001 From: "Alisher A. Khassanov" Date: Wed, 27 Mar 2024 15:47:30 +0500 Subject: [PATCH 19/25] Synchronous call of events listener callback --- blockchain/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/blockchain/client.go b/blockchain/client.go index 224bc7c..bbdd210 100644 --- a/blockchain/client.go +++ b/blockchain/client.go @@ -85,7 +85,7 @@ func (c *Client) StartEventsListening() (context.CancelFunc, <-chan error, error func(events *pallets.Events, blockNumber types.BlockNumber, blockHash types.Hash) { c.mu.Lock() for callback := range c.eventsListeners { - go (*callback)(events, blockNumber, blockHash) + (*callback)(events, blockNumber, blockHash) } c.mu.Unlock() }, From 192a39580d847771dd3f17e61dfc3c2c0cc68153 Mon Sep 17 00:00:00 2001 From: "Alisher A. Khassanov" Date: Wed, 27 Mar 2024 16:18:17 +0500 Subject: [PATCH 20/25] Accept events listening block completion callback --- blockchain/client.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/blockchain/client.go b/blockchain/client.go index bbdd210..2ebe533 100644 --- a/blockchain/client.go +++ b/blockchain/client.go @@ -50,7 +50,9 @@ func NewClient(url string) (*Client, error) { }, nil } -func (c *Client) StartEventsListening() (context.CancelFunc, <-chan error, error) { +func (c *Client) StartEventsListening( + afterBlock func(blockNumber types.BlockNumber, blockHash types.Hash), +) (context.CancelFunc, <-chan error, error) { if !atomic.CompareAndSwapUint32(&c.isListening, 0, 1) { return c.cancelListening, c.errsListening, nil } @@ -90,6 +92,16 @@ func (c *Client) StartEventsListening() (context.CancelFunc, <-chan error, error c.mu.Unlock() }, ) + + header, err := c.RPC.Chain.GetHeader(set.Block) + if err != nil { + c.errsListening <- fmt.Errorf("get header: %w", err) + return + } + + if afterBlock != nil { + afterBlock(header.Number, set.Block) + } } } }() From 23a6067185540a389281cb50a8625b0e17f54ec1 Mon Sep 17 00:00:00 2001 From: "Alisher A. Khassanov" Date: Thu, 28 Mar 2024 12:25:39 +0500 Subject: [PATCH 21/25] Remove begin param from events listener reg method --- blockchain/client.go | 87 +++----------------------------------------- 1 file changed, 5 insertions(+), 82 deletions(-) diff --git a/blockchain/client.go b/blockchain/client.go index 2ebe533..2ac6dd3 100644 --- a/blockchain/client.go +++ b/blockchain/client.go @@ -118,97 +118,20 @@ func (c *Client) StartEventsListening( return c.cancelListening, c.errsListening, nil } -// RegisterEventsListener subscribes given callback to blockchain events. There is a begin parameter which -// can be used to get events from blocks older than the latest block. If begin is greater than the latest -// block number, the listener will start from the latest block. -func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback EventsListener) (context.CancelFunc, error) { - // Collect events starting from the latest block to process them after completion with old blocks. - pendingEvents := &pendingEvents{} - subscriptionStartBlock := uint32(0) - subscriptionStarted := make(chan struct{}) - - callbackWrapper := EventsListener(func(events *pallets.Events, blockNumber types.BlockNumber, blockHash types.Hash) { - if atomic.CompareAndSwapUint32(&subscriptionStartBlock, 0, uint32(blockNumber)) { - close(subscriptionStarted) - } - - if pendingEvents.TryPush(events, blockHash, blockNumber) { - return - } - - callback(events, blockNumber, blockHash) - }) - +// RegisterEventsListener subscribes given callback to blockchain events. +func (c *Client) RegisterEventsListener(callback EventsListener) context.CancelFunc { c.mu.Lock() - c.eventsListeners[&callbackWrapper] = struct{}{} + c.eventsListeners[&callback] = struct{}{} c.mu.Unlock() - cancelled := false - - go func() { - defer pendingEvents.Do(callback) - - <-subscriptionStarted - - if begin >= types.BlockNumber(subscriptionStartBlock) { - return - } - - // TODO: get for begin block and update each runtime upgrade - meta, err := c.RPC.State.GetMetadataLatest() - if err != nil { - c.errsListening <- fmt.Errorf("get metadata: %w", err) - return - } - - key, err := types.CreateStorageKey(meta, "System", "Events") - if err != nil { - c.errsListening <- fmt.Errorf("create storage key: %w", err) - return - } - - for currentBlock := uint32(begin); currentBlock < subscriptionStartBlock; currentBlock++ { - bHash, err := c.RPC.Chain.GetBlockHash(uint64(currentBlock)) - if err != nil { - c.errsListening <- fmt.Errorf("get block hash: %w", err) - return - } - - blockChangesSets, err := c.RPC.State.QueryStorageAt([]types.StorageKey{key}, bHash) - if err != nil { - c.errsListening <- fmt.Errorf("query storage: %w", err) - return - } - - for _, set := range blockChangesSets { - if cancelled { - return - } - - c.onChanges( - meta, - key, - set.Changes, - set.Block, - func(events *pallets.Events, blockNumber types.BlockNumber, blockHash types.Hash) { - callback(events, blockNumber, blockHash) - }, - ) - } - } - }() - once := sync.Once{} - cancel := func() { + return func() { once.Do(func() { c.mu.Lock() - cancelled = true - delete(c.eventsListeners, &callbackWrapper) + delete(c.eventsListeners, &callback) c.mu.Unlock() }) } - - return cancel, nil } func (c *Client) onChanges( From 2b0ce76ffdcc5a453e80e93f93179b7035204d8a Mon Sep 17 00:00:00 2001 From: "Alisher A. Khassanov" Date: Thu, 28 Mar 2024 12:27:08 +0500 Subject: [PATCH 22/25] Remove unused pending events type --- blockchain/client.go | 38 -------------------------------------- 1 file changed, 38 deletions(-) diff --git a/blockchain/client.go b/blockchain/client.go index 2ac6dd3..c31fb45 100644 --- a/blockchain/client.go +++ b/blockchain/client.go @@ -168,41 +168,3 @@ type blockEvents struct { Hash types.Hash Number types.BlockNumber } - -type pendingEvents struct { - list []*blockEvents - mu sync.Mutex - done bool -} - -func (pe *pendingEvents) TryPush(events *pallets.Events, hash types.Hash, number types.BlockNumber) bool { - pe.mu.Lock() - if !pe.done { - pe.list = append(pe.list, &blockEvents{ - Events: events, - Hash: hash, - Number: number, - }) - pe.mu.Unlock() - return true - } - pe.mu.Unlock() - return false -} - -func (pe *pendingEvents) Do(callback EventsListener) { - for { - pe.mu.Lock() - - if len(pe.list) == 0 { - pe.done = true - pe.mu.Unlock() - break - } - - callback(pe.list[0].Events, pe.list[0].Number, pe.list[0].Hash) - - pe.list = pe.list[1:] - pe.mu.Unlock() - } -} From ac38f4927543b0de2c90cec719d17fc50b4ab226 Mon Sep 17 00:00:00 2001 From: "Alisher A. Khassanov" Date: Thu, 28 Mar 2024 13:10:57 +0500 Subject: [PATCH 23/25] Allow to start events listening from an old block --- blockchain/client.go | 155 +++++++++++++++++++++++++++---------------- 1 file changed, 98 insertions(+), 57 deletions(-) diff --git a/blockchain/client.go b/blockchain/client.go index c31fb45..dc9e632 100644 --- a/blockchain/client.go +++ b/blockchain/client.go @@ -51,12 +51,15 @@ func NewClient(url string) (*Client, error) { } func (c *Client) StartEventsListening( + begin types.BlockNumber, afterBlock func(blockNumber types.BlockNumber, blockHash types.Hash), ) (context.CancelFunc, <-chan error, error) { if !atomic.CompareAndSwapUint32(&c.isListening, 0, 1) { return c.cancelListening, c.errsListening, nil } + c.errsListening = make(chan error) + meta, err := c.RPC.State.GetMetadataLatest() if err != nil { return nil, nil, err @@ -70,47 +73,114 @@ func (c *Client) StartEventsListening( return nil, nil, err } - done := make(chan struct{}) - c.errsListening = make(chan error) + liveChangesC := sub.Chan() + histChangesC := make(chan types.StorageChangeSet) + + // Query historical changes. + var cancelled atomic.Value + cancelled.Store(false) + go func(begin types.BlockNumber, liveChanges <-chan types.StorageChangeSet, histChangesC chan types.StorageChangeSet) { + defer close(histChangesC) + + set := <-liveChanges // first live changes set block is the last historical block + + header, err := c.RPC.Chain.GetHeader(set.Block) + if err != nil { + c.errsListening <- fmt.Errorf("get header: %w", err) + return + } + + for currentBlock := begin; currentBlock < header.Number; currentBlock++ { + blockHash, err := c.RPC.Chain.GetBlockHash(uint64(currentBlock)) + if err != nil { + c.errsListening <- fmt.Errorf("get block hash: %w", err) + return + } + + blockChangesSets, err := c.RPC.State.QueryStorageAt([]types.StorageKey{key}, blockHash) + if err != nil { + c.errsListening <- fmt.Errorf("query storage: %w", err) + return + } + + for _, set := range blockChangesSets { + histChangesC <- set + } - go func() { - for { - select { - case <-done: + // Graceful stop must finish the block before exiting. + if cancelled.Load().(bool) { return - case set := <-sub.Chan(): - c.onChanges( - meta, - key, - set.Changes, - set.Block, - func(events *pallets.Events, blockNumber types.BlockNumber, blockHash types.Hash) { - c.mu.Lock() - for callback := range c.eventsListeners { - (*callback)(events, blockNumber, blockHash) - } - c.mu.Unlock() - }, - ) - - header, err := c.RPC.Chain.GetHeader(set.Block) + } + } + + histChangesC <- set + }(begin, liveChangesC, histChangesC) + + // Sequence historical and live changes. + changesC := make(chan types.StorageChangeSet) + go func(histChangesC, liveChangesC <-chan types.StorageChangeSet, changesC chan types.StorageChangeSet) { + defer close(changesC) + + for set := range histChangesC { + changesC <- set + } + + for set := range liveChangesC { + changesC <- set + } + }(histChangesC, liveChangesC, changesC) + + // Decode events from changes. + eventsC := make(chan blockEvents) + go func(changesC <-chan types.StorageChangeSet, eventsC chan blockEvents) { + defer close(eventsC) + + for set := range changesC { + header, err := c.RPC.Chain.GetHeader(set.Block) + if err != nil { + c.errsListening <- fmt.Errorf("get header: %w", err) + return + } + + for _, change := range set.Changes { + if !codec.Eq(change.StorageKey, key) || !change.HasStorageData { + continue + } + + events := &pallets.Events{} + err = types.EventRecordsRaw(change.StorageData).DecodeEventRecords(meta, events) if err != nil { - c.errsListening <- fmt.Errorf("get header: %w", err) - return + c.errsListening <- fmt.Errorf("events decoder: %w", err) + continue } - if afterBlock != nil { - afterBlock(header.Number, set.Block) + eventsC <- blockEvents{ + Events: events, + Number: header.Number, + Hash: set.Block, } } } - }() + }(changesC, eventsC) + + // Invoke listeners. + go func(eventsC <-chan blockEvents) { + for blockEvents := range eventsC { + for callback := range c.eventsListeners { + (*callback)(blockEvents.Events, blockEvents.Number, blockEvents.Hash) + } + + if afterBlock != nil { + afterBlock(blockEvents.Number, blockEvents.Hash) + } + } + }(eventsC) once := sync.Once{} c.cancelListening = func() { once.Do(func() { - done <- struct{}{} sub.Unsubscribe() + cancelled.Store(true) c.isListening = 0 }) } @@ -134,35 +204,6 @@ func (c *Client) RegisterEventsListener(callback EventsListener) context.CancelF } } -func (c *Client) onChanges( - meta *types.Metadata, - key types.StorageKey, - changes []types.KeyValueOption, - block types.Hash, - callback EventsListener, -) { - header, err := c.RPC.Chain.GetHeader(block) - if err != nil { - c.errsListening <- fmt.Errorf("get header: %w", err) - return - } - - for _, change := range changes { - if !codec.Eq(change.StorageKey, key) || !change.HasStorageData { - continue - } - - events := &pallets.Events{} - err = types.EventRecordsRaw(change.StorageData).DecodeEventRecords(meta, events) - if err != nil { - c.errsListening <- fmt.Errorf("events decoder: %w", err) - continue - } - - callback(events, header.Number, block) - } -} - type blockEvents struct { Events *pallets.Events Hash types.Hash From 17f86a84ac42c39dd3fbe3a1a6b35ae12ae5d747 Mon Sep 17 00:00:00 2001 From: "Alisher A. Khassanov" Date: Thu, 28 Mar 2024 13:27:36 +0500 Subject: [PATCH 24/25] Skip events from blocks before the `begin` block --- blockchain/client.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/blockchain/client.go b/blockchain/client.go index dc9e632..6e2b8c9 100644 --- a/blockchain/client.go +++ b/blockchain/client.go @@ -130,7 +130,7 @@ func (c *Client) StartEventsListening( } }(histChangesC, liveChangesC, changesC) - // Decode events from changes. + // Decode events from changes skipping blocks before 'begin'. eventsC := make(chan blockEvents) go func(changesC <-chan types.StorageChangeSet, eventsC chan blockEvents) { defer close(eventsC) @@ -142,6 +142,10 @@ func (c *Client) StartEventsListening( return } + if header.Number < begin { + continue + } + for _, change := range set.Changes { if !codec.Eq(change.StorageKey, key) || !change.HasStorageData { continue From 0a486b30bbe17f8de4e937e6fe52001c03de1f3c Mon Sep 17 00:00:00 2001 From: "Alisher A. Khassanov" Date: Thu, 28 Mar 2024 13:19:09 +0500 Subject: [PATCH 25/25] Add `StartEventsListening` doc comment --- blockchain/client.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/blockchain/client.go b/blockchain/client.go index 6e2b8c9..6412b47 100644 --- a/blockchain/client.go +++ b/blockchain/client.go @@ -50,6 +50,11 @@ func NewClient(url string) (*Client, error) { }, nil } +// StartEventsListening subscribes for blockchain events and passes events starting from the +// 'begin' block to registered events listeners. Listeners registered after this call will only +// receive live events meaning all listeners which need historical events from 'begin' block +// should be registered at the moment of calling this function. The 'afterBlock' callback is +// invoked after all registered events listeners are already invoked. func (c *Client) StartEventsListening( begin types.BlockNumber, afterBlock func(blockNumber types.BlockNumber, blockHash types.Hash),