Skip to content

Commit

Permalink
Replace custom stop func with context.CancelFunc
Browse files Browse the repository at this point in the history
  • Loading branch information
khssnv committed Mar 18, 2024
1 parent c00ca20 commit d30e293
Showing 1 changed file with 12 additions and 11 deletions.
23 changes: 12 additions & 11 deletions blockchain/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package blockchain

import (
"context"
"fmt"
"math"
"sync"
Expand All @@ -21,7 +22,7 @@ type Client struct {
eventsListeners map[int]EventsListener
mu sync.Mutex
isListening uint32
stopListening func()
cancelListening func()
errsListening chan error

DdcClusters pallets.DdcClustersApi
Expand Down Expand Up @@ -50,9 +51,9 @@ func NewClient(url string) (*Client, error) {
}, nil
}

func (c *Client) StartEventsListening() (func(), <-chan error, error) {
func (c *Client) StartEventsListening() (context.CancelFunc, <-chan error, error) {
if !atomic.CompareAndSwapUint32(&c.isListening, 0, 1) {
return c.stopListening, c.errsListening, nil
return c.cancelListening, c.errsListening, nil
}

meta, err := c.RPC.State.GetMetadataLatest()
Expand Down Expand Up @@ -88,21 +89,21 @@ func (c *Client) StartEventsListening() (func(), <-chan error, error) {
}()

once := sync.Once{}
c.stopListening = func() {
c.cancelListening = func() {
once.Do(func() {
done <- struct{}{}
sub.Unsubscribe()
c.isListening = 0
})
}

return c.stopListening, c.errsListening, nil
return c.cancelListening, c.errsListening, nil
}

// RegisterEventsListener subscribes given callback to blockchain events. There is a begin parameter which
// can be used to get events from blocks older than the latest block. If begin is greater than the latest
// block number, the listener will start from the latest block.
func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback EventsListener) (func(), error) {
func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback EventsListener) (context.CancelFunc, error) {
var idx int
for i := 0; i <= math.MaxInt; i++ {
if _, ok := c.eventsListeners[i]; !ok {
Expand Down Expand Up @@ -134,7 +135,7 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events
c.eventsListeners[idx] = callbackWrapper
c.mu.Unlock()

stopped := false
cancelled := false

go func() {
<-subscriptionStarted
Expand Down Expand Up @@ -193,7 +194,7 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events
continue
}

if stopped {
if cancelled {
return
}

Expand All @@ -205,16 +206,16 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events
}()

once := sync.Once{}
stop := func() {
cancel := func() {
once.Do(func() {
c.mu.Lock()
stopped = true
cancelled = true
delete(c.eventsListeners, idx)
c.mu.Unlock()
})
}

return stop, nil
return cancel, nil
}

func (c *Client) processSystemEventsStorageChanges(
Expand Down

0 comments on commit d30e293

Please sign in to comment.