Skip to content

Commit

Permalink
Use GSRPC Registry for events listening
Browse files Browse the repository at this point in the history
  • Loading branch information
khssnv committed May 10, 2024
1 parent 912cf0f commit 26558b9
Showing 1 changed file with 52 additions and 74 deletions.
126 changes: 52 additions & 74 deletions blockchain/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import (
"sync/atomic"

gsrpc "github.com/centrifuge/go-substrate-rpc-client/v4"
"github.com/centrifuge/go-substrate-rpc-client/v4/registry/parser"
"github.com/centrifuge/go-substrate-rpc-client/v4/registry/retriever"
"github.com/centrifuge/go-substrate-rpc-client/v4/registry/state"
"github.com/centrifuge/go-substrate-rpc-client/v4/types"
"github.com/centrifuge/go-substrate-rpc-client/v4/types/codec"

"github.com/cerebellum-network/cere-ddc-sdk-go/blockchain/pallets"
)

type EventsListener func(events *pallets.Events, blockNumber types.BlockNumber, blockHash types.Hash)
type EventsListener func(events []*parser.Event, blockNumber types.BlockNumber, blockHash types.Hash)

type Client struct {
*gsrpc.SubstrateAPI
Expand Down Expand Up @@ -64,127 +66,103 @@ func (c *Client) StartEventsListening(
return c.cancelListening, c.errsListening, nil
}

c.errsListening = make(chan error)

meta, err := c.RPC.State.GetMetadataLatest()
sub, err := c.RPC.Chain.SubscribeNewHeads()
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("subscribe new heads: %w", err)
}
key, err := types.CreateStorageKey(meta, "System", "Events", nil)
if err != nil {
return nil, nil, err
}
sub, err := c.RPC.State.SubscribeStorageRaw([]types.StorageKey{key})

retriever, err := retriever.NewDefaultEventRetriever(state.NewEventProvider(c.RPC.State), c.RPC.State)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("event retriever: %w", err)
}

liveChangesC := sub.Chan()
histChangesC := make(chan types.StorageChangeSet)
c.errsListening = make(chan error)

liveHeadersC := sub.Chan()
histHeadersC := make(chan types.Header)
var wg sync.WaitGroup

// Query historical changes.
// Query historical headers.
var cancelled atomic.Value
cancelled.Store(false)
wg.Add(1)
go func(begin types.BlockNumber, liveChanges <-chan types.StorageChangeSet, histChangesC chan types.StorageChangeSet) {
go func(beginBlock types.BlockNumber, live <-chan types.Header, hist chan types.Header) {
defer wg.Done()
defer close(histChangesC)
defer close(hist)

set := <-liveChanges // first live changes set block is the last historical block
firstLiveHeader := <-live // the first live header is the last historical header

header, err := c.RPC.Chain.GetHeader(set.Block)
if err != nil {
c.errsListening <- fmt.Errorf("get header: %w", err)
return
}

for currentBlock := begin; currentBlock < header.Number; currentBlock++ {
blockHash, err := c.RPC.Chain.GetBlockHash(uint64(currentBlock))
for block := beginBlock; block < firstLiveHeader.Number; block++ {
blockHash, err := c.RPC.Chain.GetBlockHash(uint64(block))
if err != nil {
c.errsListening <- fmt.Errorf("get block hash: %w", err)
c.errsListening <- fmt.Errorf("get historical block hash: %w", err)
return
}

blockChangesSets, err := c.RPC.State.QueryStorageAt([]types.StorageKey{key}, blockHash)
header, err := c.RPC.Chain.GetHeader(blockHash)
if err != nil {
c.errsListening <- fmt.Errorf("query storage: %w", err)
c.errsListening <- fmt.Errorf("get historical header: %w", err)
return
}

for _, set := range blockChangesSets {
histChangesC <- set
}
hist <- *header

// Graceful stop must finish the block before exiting.
// Graceful stop finishes with the block before exiting.
if cancelled.Load().(bool) {
return
}
}

histChangesC <- set
}(begin, liveChangesC, histChangesC)
hist <- firstLiveHeader
}(begin, liveHeadersC, histHeadersC)

// Sequence historical and live changes.
changesC := make(chan types.StorageChangeSet)
// Sequence historical and live headers.
headersC := make(chan types.Header)
wg.Add(1)
go func(histChangesC, liveChangesC <-chan types.StorageChangeSet, changesC chan types.StorageChangeSet) {
go func(hist, live <-chan types.Header, headersC chan types.Header) {
defer wg.Done()
defer close(changesC)
defer close(headersC)

for set := range histChangesC {
changesC <- set
for set := range hist {
headersC <- set
}

for set := range liveChangesC {
changesC <- set
for set := range live {
headersC <- set
}
}(histChangesC, liveChangesC, changesC)
}(histHeadersC, liveHeadersC, headersC)

// Decode events from changes skipping blocks before 'begin'.
// Retrieve events skipping blocks before 'begin'.
eventsC := make(chan blockEvents)
wg.Add(1)
go func(changesC <-chan types.StorageChangeSet, eventsC chan blockEvents) {
go func(headersC <-chan types.Header, eventsC chan blockEvents) {
defer wg.Done()
defer close(eventsC)

for set := range changesC {
header, err := c.RPC.Chain.GetHeader(set.Block)
if err != nil {
c.errsListening <- fmt.Errorf("get header: %w", err)
return
}

for header := range headersC {
if header.Number < begin {
continue
}

meta, err := c.RPC.State.GetMetadata(set.Block)
hash, err := c.RPC.Chain.GetBlockHash(uint64(header.Number))
if err != nil {
c.errsListening <- fmt.Errorf("get metadata: %w", err)
c.errsListening <- fmt.Errorf("get block hash: %w", err)
return
}

for _, change := range set.Changes {
if !codec.Eq(change.StorageKey, key) || !change.HasStorageData {
continue
}

events := &pallets.Events{}
err = types.EventRecordsRaw(change.StorageData).DecodeEventRecords(meta, events)
if err != nil {
c.errsListening <- fmt.Errorf("events decoder: %w", err)
continue
}

eventsC <- blockEvents{
Events: events,
Number: header.Number,
Hash: set.Block,
}
events, err := retriever.GetEvents(hash)
if err != nil {
c.errsListening <- fmt.Errorf("events retriever: %w", err)
continue
}

eventsC <- blockEvents{
Events: events,
Hash: hash,
Number: header.Number,
}
}
}(changesC, eventsC)
}(headersC, eventsC)

// Invoke listeners.
go func(eventsC <-chan blockEvents) {
Expand Down Expand Up @@ -230,7 +208,7 @@ func (c *Client) RegisterEventsListener(callback EventsListener) context.CancelF
}

type blockEvents struct {
Events *pallets.Events
Events []*parser.Event
Hash types.Hash
Number types.BlockNumber
}

0 comments on commit 26558b9

Please sign in to comment.