Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blockchain events subscription #77

Merged
merged 32 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
8da4629
Add `pallet-ddc-clusters` events type definitions
khssnv Dec 15, 2023
91aef79
Add event retriever to the client
khssnv Dec 15, 2023
c9bd637
Store pallets' subscription to events in client
khssnv Dec 15, 2023
8212a4f
New client method to start listening to events
khssnv Dec 15, 2023
796555f
New generic events subscription type
khssnv Dec 15, 2023
d9b9d14
New events subscriber type
khssnv Dec 15, 2023
4a6ef91
New events publisher interface
khssnv Dec 15, 2023
1501773
A func to add buffered subscription to Publisher
khssnv Dec 15, 2023
0bbdfe2
Impl Publisher interface on ddcClustersApi
khssnv Dec 15, 2023
f53d0cf
Subscribe ddcClustersApi to block events
khssnv Dec 15, 2023
7b9cd80
Event DdcClusters.ClusterNodeAdded subscription
khssnv Dec 15, 2023
caa807b
Reshape events to remove events retriever
khssnv Jan 2, 2024
d2fa6dc
Create chain custom event records type
khssnv Jan 2, 2024
75bdbb1
Add event unsubscription callback
khssnv Jan 2, 2024
1cc6353
Remove Publisher interface
khssnv Jan 2, 2024
9a29fcc
Generic events subscriber
khssnv Jan 2, 2024
a646067
Filter specific block events for subscribers
khssnv Jan 2, 2024
d1158d0
Remove events retriever, listen to storage changes
khssnv Jan 2, 2024
ee5d949
Upgrade `go-substrate-rpc-client` to `v4.2.1`
khssnv Jan 2, 2024
67a3abc
Add `DdcClusters_ClusterCreated` events subscriber
khssnv Jan 2, 2024
c344094
Add `pallet-ddc-customers` event types
khssnv Jan 3, 2024
d3bcb3a
Track `ddc-customers` events subscribers
khssnv Jan 3, 2024
0214d43
Add `ddc-customers` events subscription
khssnv Jan 3, 2024
daae22b
Pass block events to `ddc-customers` subscribers
khssnv Jan 3, 2024
ccc8ddc
Add `pallet-ddc-payouts` event types
khssnv Jan 3, 2024
edda198
Add `ddc-payouts` events subscription
khssnv Jan 4, 2024
4d6a045
Pass block events to `ddc-payouts` subscribers
khssnv Jan 4, 2024
40ca238
Unify parameter naming
khssnv Jan 4, 2024
c0f64b7
Implement remaining events subs for `ddc-clusters`
khssnv Jan 4, 2024
3abdee1
Callbacks based blockchain events listener
khssnv Jan 5, 2024
2966920
Revert multiple commits with older events sub impl
khssnv Jan 5, 2024
19c6706
Ignore duplicate attempt to start event listening
khssnv Jan 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 107 additions & 5 deletions blockchain/client.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,29 @@
package blockchain

import (
"fmt"
"math"
"sync"
"sync/atomic"

gsrpc "github.com/centrifuge/go-substrate-rpc-client/v4"
"github.com/centrifuge/go-substrate-rpc-client/v4/types"
"github.com/centrifuge/go-substrate-rpc-client/v4/types/codec"

"github.com/cerebellum-network/cere-ddc-sdk-go/blockchain/pallets"
)

type EventsListener func(*pallets.Events)

type Client struct {
*gsrpc.SubstrateAPI

eventsListeners map[int]EventsListener
mu sync.Mutex
isListening uint32
stopListening func()
errsListening chan error

DdcClusters pallets.DdcClustersApi
DdcCustomers pallets.DdcCustomersApi
DdcNodes pallets.DdcNodesApi
Expand All @@ -26,10 +41,97 @@ func NewClient(url string) (*Client, error) {
}

return &Client{
SubstrateAPI: substrateApi,
DdcClusters: pallets.NewDdcClustersApi(substrateApi),
DdcCustomers: pallets.NewDdcCustomersApi(substrateApi, meta),
DdcNodes: pallets.NewDdcNodesApi(substrateApi, meta),
DdcPayouts: pallets.NewDdcPayoutsApi(substrateApi, meta),
SubstrateAPI: substrateApi,
eventsListeners: make(map[int]EventsListener),
DdcClusters: pallets.NewDdcClustersApi(substrateApi),
DdcCustomers: pallets.NewDdcCustomersApi(substrateApi, meta),
DdcNodes: pallets.NewDdcNodesApi(substrateApi, meta),
DdcPayouts: pallets.NewDdcPayoutsApi(substrateApi, meta),
}, nil
}

func (c *Client) StartEventsListening() (func(), <-chan error, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would happen in case this is called twice? shouldn't we rewrite it somehow so the client can be subscribed to changes only once?

Copy link
Member Author

@khssnv khssnv Jan 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. Now we can freely call it twice and concurrently. It will ignore a duplicate attempt to start events listening (until its stopped) and just returns an existing stopper function and errors channel.

if !atomic.CompareAndSwapUint32(&c.isListening, 0, 1) {
return c.stopListening, c.errsListening, nil
}

meta, err := c.RPC.State.GetMetadataLatest()
if err != nil {
return nil, nil, err
}
key, err := types.CreateStorageKey(meta, "System", "Events", nil)
if err != nil {
return nil, nil, err
}
sub, err := c.RPC.State.SubscribeStorageRaw([]types.StorageKey{key})
if err != nil {
return nil, nil, err
}

done := make(chan struct{})
c.errsListening = make(chan error)

go func() {
for {
select {
case <-done:
return
case set := <-sub.Chan():
for _, change := range set.Changes {
if !codec.Eq(change.StorageKey, key) || !change.HasStorageData {
continue
}

events := &pallets.Events{}
err = types.EventRecordsRaw(change.StorageData).DecodeEventRecords(meta, events)
if err != nil {
c.errsListening <- fmt.Errorf("events decoder: %w", err)
}

for _, callback := range c.eventsListeners {
go callback(events)
}
}
}
}
}()

once := sync.Once{}
c.stopListening = func() {
once.Do(func() {
done <- struct{}{}
sub.Unsubscribe()
c.isListening = 0
})
}

return c.stopListening, c.errsListening, nil
}

func (c *Client) RegisterEventsListener(callback EventsListener) (func(), error) {
var idx int
for i := 0; i <= math.MaxInt; i++ {
if _, ok := c.eventsListeners[i]; !ok {
idx = i
break
}
if i == math.MaxInt {
return nil, fmt.Errorf("too many events listeners")
}
}

c.mu.Lock()
c.eventsListeners[idx] = callback
c.mu.Unlock()

once := sync.Once{}
stop := func() {
once.Do(func() {
c.mu.Lock()
delete(c.eventsListeners, idx)
c.mu.Unlock()
})
}

return stop, nil
}
4 changes: 2 additions & 2 deletions blockchain/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module github.com/cerebellum-network/cere-ddc-sdk-go/blockchain

go 1.18

require github.com/centrifuge/go-substrate-rpc-client/v4 v4.0.8
require github.com/centrifuge/go-substrate-rpc-client/v4 v4.2.1

require (
github.com/ChainSafe/go-schnorrkel v1.0.0 // indirect
Expand All @@ -18,7 +18,7 @@ require (
github.com/mimoo/StrobeGo v0.0.0-20220103164710-9a04d6ca976b // indirect
github.com/pierrec/xxHash v0.1.5 // indirect
github.com/rs/cors v1.8.2 // indirect
github.com/vedhavyas/go-subkey v1.0.3 // indirect
github.com/vedhavyas/go-subkey/v2 v2.0.0 // indirect
golang.org/x/crypto v0.7.0 // indirect
golang.org/x/sys v0.6.0 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
Expand Down
9 changes: 4 additions & 5 deletions blockchain/go.sum
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
github.com/ChainSafe/go-schnorrkel v1.0.0 h1:3aDA67lAykLaG1y3AOjs88dMxC88PgUuHRrLeDnvGIM=
github.com/ChainSafe/go-schnorrkel v1.0.0/go.mod h1:dpzHYVxLZcp8pjlV+O+UR8K0Hp/z7vcchBSbMBEhCw4=
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 h1:fLjPD/aNc3UIOA6tDi6QXUemppXK3P9BI7mr2hd6gx8=
github.com/btcsuite/btcd v0.22.0-beta h1:LTDpDKUM5EeOFBPM8IXpinEcmZ6FWfNZbE3lfrfdnWo=
github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPxiMSCF5k=
github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce h1:YtWJF7RHm2pYCvA5t0RPmAaLUhREsKuKd+SLhxFbFeQ=
github.com/centrifuge/go-substrate-rpc-client/v4 v4.0.8 h1:gHLD5S81As9u5DbefLahw1enVO6OdBSS8gBI2R6KNEQ=
github.com/centrifuge/go-substrate-rpc-client/v4 v4.0.8/go.mod h1:5g1oM4Zu3BOaLpsKQ+O8PAv2kNuq+kPcA1VzFbsSqxE=
github.com/centrifuge/go-substrate-rpc-client/v4 v4.2.1 h1:io49TJ8IOIlzipioJc9pJlrjgdJvqktpUWYxVY5AUjE=
github.com/centrifuge/go-substrate-rpc-client/v4 v4.2.1/go.mod h1:k61SBXqYmnZO4frAJyH3iuqjolYrYsq79r8EstmklDY=
github.com/cosmos/go-bip39 v0.0.0-20180819234021-555e2067c45d/go.mod h1:tSxLoYXyBmiFeKpvmq4dzayMdCjCnu8uqmCysIGBT2Y=
github.com/cosmos/go-bip39 v1.0.0 h1:pcomnQdrdH22njcAatO0yWojsUnCO3y2tNoV1cb6hHY=
github.com/cosmos/go-bip39 v1.0.0/go.mod h1:RNJv0H/pOIVgxw6KS7QeX2a0Uo0aKUlfhZ4xuwvCdJw=
Expand Down Expand Up @@ -47,8 +46,8 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s=
github.com/tklauser/go-sysconf v0.3.5 h1:uu3Xl4nkLzQfXNsWn15rPc/HQCJKObbt1dKJeWp3vU4=
github.com/tklauser/numcpus v0.2.2 h1:oyhllyrScuYI6g+h/zUvNXNp1wy7x8qQy3t/piefldA=
github.com/vedhavyas/go-subkey v1.0.3 h1:iKR33BB/akKmcR2PMlXPBeeODjWLM90EL98OrOGs8CA=
github.com/vedhavyas/go-subkey v1.0.3/go.mod h1:CloUaFQSSTdWnINfBRFjVMkWXZANW+nd8+TI5jYcl6Y=
github.com/vedhavyas/go-subkey/v2 v2.0.0 h1:LemDIsrVtRSOkp0FA8HxP6ynfKjeOj3BY2U9UNfeDMA=
github.com/vedhavyas/go-subkey/v2 v2.0.0/go.mod h1:95aZ+XDCWAUUynjlmi7BtPExjXgXxByE0WfBwbmIRH4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
Expand Down
35 changes: 35 additions & 0 deletions blockchain/pallets/ddcclusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,41 @@ type ClusterProps struct {
NodeProviderAuthContract types.AccountID
}

// Events
type (
EventDdcClustersClusterCreated struct {
Phase types.Phase
ClusterId ClusterId
Topics []types.Hash
}

EventDdcClustersClusterNodeAdded struct {
Phase types.Phase
ClusterId ClusterId
NodePubKey NodePubKey
Topics []types.Hash
}

EventDdcClustersClusterNodeRemoved struct {
Phase types.Phase
ClusterId ClusterId
NodePubKey NodePubKey
Topics []types.Hash
}

EventDdcClustersClusterParamsSet struct {
Phase types.Phase
ClusterId ClusterId
Topics []types.Hash
}

EventDdcClustersClusterGovParamsSet struct {
Phase types.Phase
ClusterId ClusterId
Topics []types.Hash
}
)

type DdcClustersApi interface {
GetClustersNodes(clusterId ClusterId) ([]NodePubKey, error)
}
Expand Down
42 changes: 40 additions & 2 deletions blockchain/pallets/ddccustomers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,44 @@ type UnlockChunk struct {
Block types.BlockNumber
}

// Events
type (
EventDdcCustomersDeposited struct {
Phase types.Phase
Owner types.AccountID
Amount types.U128
Topics []types.Hash
}
EventDdcCustomersInitialDepositUnlock struct {
Phase types.Phase
Owner types.AccountID
Amount types.U128
Topics []types.Hash
}
EventDdcCustomersWithdrawn struct {
Phase types.Phase
Owner types.AccountID
Amount types.U128
Topics []types.Hash
}
EventDdcCustomersCharged struct {
Phase types.Phase
Owner types.AccountID
Amount types.U128
Topics []types.Hash
}
EventDdcCustomersBucketCreated struct {
Phase types.Phase
BucketId BucketId
Topics []types.Hash
}
EventDdcCustomersBucketUpdated struct {
Phase types.Phase
BucketId BucketId
Topics []types.Hash
}
)

type DdcCustomersApi interface {
GetBuckets(bucketId BucketId) (types.Option[Bucket], error)
GetBucketsCount() (types.U64, error)
Expand All @@ -36,9 +74,9 @@ type ddcCustomersApi struct {
meta *types.Metadata
}

func NewDdcCustomersApi(substrateAPI *gsrpc.SubstrateAPI, meta *types.Metadata) DdcCustomersApi {
func NewDdcCustomersApi(substrateApi *gsrpc.SubstrateAPI, meta *types.Metadata) DdcCustomersApi {
return &ddcCustomersApi{
substrateAPI,
substrateApi,
meta,
}
}
Expand Down
4 changes: 2 additions & 2 deletions blockchain/pallets/ddcnodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ type ddcNodesApi struct {
meta *types.Metadata
}

func NewDdcNodesApi(substrateAPI *gsrpc.SubstrateAPI, meta *types.Metadata) DdcNodesApi {
func NewDdcNodesApi(substrateApi *gsrpc.SubstrateAPI, meta *types.Metadata) DdcNodesApi {
return &ddcNodesApi{
substrateAPI,
substrateApi,
meta,
}
}
Expand Down
Loading
Loading