From 6c39b4c7bd2e0fdde31053aed038a0057d54a164 Mon Sep 17 00:00:00 2001 From: gexiao Date: Mon, 14 Nov 2022 22:18:09 +0800 Subject: [PATCH] Support ErrCh() for client --- jsonrpc/client.go | 5 +++++ jsonrpc/transport/http.go | 5 +++++ jsonrpc/transport/transport.go | 2 ++ jsonrpc/transport/websocket.go | 11 +++++++++++ 4 files changed, 23 insertions(+) diff --git a/jsonrpc/client.go b/jsonrpc/client.go index 1aeb6d7c..73b4f3a5 100644 --- a/jsonrpc/client.go +++ b/jsonrpc/client.go @@ -56,6 +56,11 @@ func (c *Client) Close() error { return c.transport.Close() } +// ErrCh returns a chan to send errors that occurred in the client +func (c *Client) ErrCh() chan error { + return c.transport.ErrCh() +} + // Call makes a jsonrpc call func (c *Client) Call(method string, out interface{}, params ...interface{}) error { return c.transport.Call(method, out, params...) diff --git a/jsonrpc/transport/http.go b/jsonrpc/transport/http.go index d328543c..63d2721a 100644 --- a/jsonrpc/transport/http.go +++ b/jsonrpc/transport/http.go @@ -27,6 +27,11 @@ func (h *HTTP) Close() error { return nil } +// ErrCh implements the transport interface +func (h *HTTP) ErrCh() chan error { + return nil +} + // Call implements the transport interface func (h *HTTP) Call(method string, out interface{}, params ...interface{}) error { // Encode json-rpc request diff --git a/jsonrpc/transport/transport.go b/jsonrpc/transport/transport.go index 10e7390e..df3b4c2f 100644 --- a/jsonrpc/transport/transport.go +++ b/jsonrpc/transport/transport.go @@ -15,6 +15,8 @@ type Transport interface { // Close closes the transport connection if necessary Close() error + + ErrCh() chan error } // PubSubTransport is a transport that allows subscriptions diff --git a/jsonrpc/transport/websocket.go b/jsonrpc/transport/websocket.go index 267e6067..5b3f0b5a 100644 --- a/jsonrpc/transport/websocket.go +++ b/jsonrpc/transport/websocket.go @@ -49,6 +49,7 @@ type stream struct { subsLock sync.Mutex subs map[string]func(b []byte) + errCh chan error closeCh chan struct{} timer *time.Timer } @@ -56,6 +57,7 @@ type stream struct { func newStream(codec Codec) (*stream, error) { w := &stream{ codec: codec, + errCh: make(chan error, 1), closeCh: make(chan struct{}), handler: map[uint64]callback{}, subs: map[string]func(b []byte){}, @@ -71,6 +73,11 @@ func (s *stream) Close() error { return s.codec.Close() } +// ErrCh implements the transport interface +func (s *stream) ErrCh() chan error { + return s.errCh +} + func (s *stream) incSeq() uint64 { return atomic.AddUint64(&s.seq, 1) } @@ -94,6 +101,10 @@ func (s *stream) listen() { if !s.isClosed() { // log error } + select { + case s.errCh <- err: + default: + } return }