From 70edacd33a5a86df247692be3b5b42aa326b9925 Mon Sep 17 00:00:00 2001 From: bonedaddy Date: Thu, 28 Jan 2021 21:17:41 -0800 Subject: [PATCH 1/5] start working on connection re-establishment --- client/client.go | 110 +++++++++++++++++++++++++++++++++-------- client/client_test.go | 4 ++ client/history.go | 9 ++++ client/history_test.go | 15 +++++- go.mod | 1 + go.sum | 2 + 6 files changed, 119 insertions(+), 22 deletions(-) diff --git a/client/client.go b/client/client.go index 6d0c916..feb38e2 100644 --- a/client/client.go +++ b/client/client.go @@ -2,11 +2,12 @@ package client import ( "context" - "errors" "log" "net/url" "sync" + "github.com/pkg/errors" + "github.com/gorilla/websocket" ) @@ -35,36 +36,28 @@ type Client struct { ctx context.Context cancel context.CancelFunc initMsg BaseMessage // used to resend the initialization msg if connection drops - apiKey string + opts Opts + history *MsgHistory } // New returns a new blocknative websocket client func New(ctx context.Context, opts Opts) (*Client, error) { ctx, cancel := context.WithCancel(ctx) - u := url.URL{ + client := &Client{ + ctx: ctx, + cancel: cancel, + opts: opts, + history: &MsgHistory{}, + } + if err := client.doConnect(ctx, url.URL{ Scheme: opts.Scheme, Host: opts.Host, Path: opts.Path, - } - c, _, err := websocket.DefaultDialer.DialContext(ctx, u.String(), nil) - if err != nil { - cancel() - return nil, err - } - // this checks out connection to blocknative's api and makes sure that we connected properly - var out ConnectResponse - if err := c.ReadJSON(&out); err != nil { + }); err != nil { cancel() return nil, err } - if out.Status != "ok" { - cancel() - return nil, errors.New("failed to initialize websockets api connection") - } - if opts.PrintConnectResponse { - log.Printf("%+v\n", out) - } - return &Client{conn: c, ctx: ctx, cancel: cancel, apiKey: opts.APIKey}, nil + return client, nil } // Initialize is used to handle blocknative websockets api initialization @@ -84,6 +77,8 @@ func (c *Client) Initialize(msg BaseMessage) error { } // ReadJSON is a wrapper around Conn:ReadJSON +// You should provide a pointer otherwise you will likely +// encounter a nil interface type as the returned value func (c *Client) ReadJSON(out interface{}) error { c.mx.RLock() defer c.mx.RUnlock() @@ -91,15 +86,20 @@ func (c *Client) ReadJSON(out interface{}) error { } // WriteJSON is a wrapper around Conn:WriteJSON +// Do not provide a pointer as this could cause problems +// with the message history buffer if the provided value +// becomes garbage collected func (c *Client) WriteJSON(out interface{}) error { c.mx.Lock() defer c.mx.Unlock() + // push the message into the history buffer + c.history.Push(out) return c.conn.WriteJSON(out) } // APIKey returns the api key being used by the client func (c *Client) APIKey() string { - return c.apiKey + return c.opts.APIKey } // Close is used to terminate our websocket client @@ -116,3 +116,71 @@ func (c *Client) Close() error { c.cancel() return err } + +// ReInit should only be used in the event that we receive an unexpected +// error and allows us to replay previous messages +func (c *Client) ReInit() error { + c.mx.Lock() + defer c.mx.Unlock() + c.doConnect(c.ctx, url.URL{ + Scheme: c.opts.Scheme, + Host: c.opts.Host, + Path: c.opts.Path, + }) + // dont empty the buffer such that future errors + // can reuse the message history + msgs := c.history.CopyAll() + // send the initialize messsage + if err := c.conn.WriteJSON(c.initMsg); err != nil { + return errors.Wrap(err, "fatal error received") + } + for _, msg := range msgs { + if err := c.conn.WriteJSON(&msg); err != nil { + // TODO(bonedaddy): figure out how to properly handle + log.Println("receive error during reinitialization: ", err) + return err + } + var out interface{} + _ = c.conn.ReadJSON(&out) + } + return nil +} + +// checkError is used to handle processing of errors +// and returns true if we were able to succesfully recover from the error +// returning false if we aren't indicating +func (c *Client) handleError(err error) bool { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { + return false + } + return true +} + +// doConnect should only be used during creation of the initial client object or during reinitialization +// caller must take care of locking +func (c *Client) doConnect(ctx context.Context, u url.URL) error { + // close the previous connection if it exists + if c.conn != nil { + log.Println("closing") + c.conn.Close() + } + log.Println("dialing") + conn, _, err := websocket.DefaultDialer.DialContext(ctx, u.String(), nil) + if err != nil { + return err + } + c.conn = conn + log.Println("reading") + // this checks out connection to blocknative's api and makes sure that we connected properly + var out ConnectResponse + if err := c.conn.ReadJSON(&out); err != nil { + return err + } + if out.Status != "ok" { + return errors.New("failed to initialize websockets api connection") + } + if c.opts.PrintConnectResponse { + log.Printf("%+v\n", out) + } + return nil +} diff --git a/client/client_test.go b/client/client_test.go index a612730..723c2b6 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -24,5 +24,9 @@ func TestClient(t *testing.T) { var out interface{} require.NoError(t, client.ReadJSON(&out)) t.Log("message: ", out) + // test reinitialization + t.Log("testing reinit") + require.NoError(t, client.ReInit()) + t.Log("closing") require.NoError(t, client.Close()) } diff --git a/client/history.go b/client/history.go index 7e8effc..04b2529 100644 --- a/client/history.go +++ b/client/history.go @@ -44,6 +44,15 @@ func (mg *MsgHistory) PopAll() []interface{} { return copied } +// CopyAll is like PopAll except it does not clear the buffer +func (mg *MsgHistory) CopyAll() []interface{} { + mg.mx.Lock() + defer mg.mx.Unlock() + copied := make([]interface{}, len(mg.buffer)) + copy(copied, mg.buffer) + return copied +} + // Len returns the length of the msg history buffewr func (mg *MsgHistory) Len() int { mg.mx.RLock() diff --git a/client/history_test.go b/client/history_test.go index 51ba9c2..8959060 100644 --- a/client/history_test.go +++ b/client/history_test.go @@ -43,7 +43,20 @@ func TestMsgHistory(t *testing.T) { require.Equal(t, nil, hist.Pop()) // set msg history again set() - all := hist.PopAll() + all := hist.CopyAll() + require.Equal(t, 3, hist.Len()) + for i := 0; i < len(all); i++ { + item = all[i].(arg) + switch i { + case 0: + require.Equal(t, 1, item.num) + case 1: + require.Equal(t, 2, item.num) + case 2: + require.Equal(t, 3, item.num) + } + } + all = hist.PopAll() require.Equal(t, 0, hist.Len()) for i := 0; i < len(all); i++ { item = all[i].(arg) diff --git a/go.mod b/go.mod index 62523dc..2b83ab2 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.15 require ( github.com/gorilla/websocket v1.4.2 + github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.7.0 github.com/urfave/cli/v2 v2.3.0 ) diff --git a/go.sum b/go.sum index 930750a..f61bc89 100644 --- a/go.sum +++ b/go.sum @@ -5,6 +5,8 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q= From 4a0402a9200ba540db8ac7467ec0087f6d7ae98e Mon Sep 17 00:00:00 2001 From: bonedaddy Date: Thu, 28 Jan 2021 21:32:42 -0800 Subject: [PATCH 2/5] remove logs and add comments --- client/client.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/client/client.go b/client/client.go index feb38e2..231ce04 100644 --- a/client/client.go +++ b/client/client.go @@ -40,7 +40,7 @@ type Client struct { history *MsgHistory } -// New returns a new blocknative websocket client +// New returns a new blocknative websocket client caller must make sure to initialize afterwards func New(ctx context.Context, opts Opts) (*Client, error) { ctx, cancel := context.WithCancel(ctx) client := &Client{ @@ -113,6 +113,8 @@ func (c *Client) Close() error { if err != nil { log.Println("failed to send close message: ", err) } + // close the underlying connection + c.conn.Close() c.cancel() return err } @@ -131,6 +133,7 @@ func (c *Client) ReInit() error { // can reuse the message history msgs := c.history.CopyAll() // send the initialize messsage + // we do not store this in the message history buffer if err := c.conn.WriteJSON(c.initMsg); err != nil { return errors.Wrap(err, "fatal error received") } @@ -161,16 +164,13 @@ func (c *Client) handleError(err error) bool { func (c *Client) doConnect(ctx context.Context, u url.URL) error { // close the previous connection if it exists if c.conn != nil { - log.Println("closing") c.conn.Close() } - log.Println("dialing") conn, _, err := websocket.DefaultDialer.DialContext(ctx, u.String(), nil) if err != nil { return err } c.conn = conn - log.Println("reading") // this checks out connection to blocknative's api and makes sure that we connected properly var out ConnectResponse if err := c.conn.ReadJSON(&out); err != nil { From b8273272787c986f4dee8725b04faae629d68411 Mon Sep 17 00:00:00 2001 From: bonedaddy Date: Thu, 28 Jan 2021 23:52:50 -0800 Subject: [PATCH 3/5] change handleError to public ShouldReInit --- client/client.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/client/client.go b/client/client.go index 231ce04..b51f76b 100644 --- a/client/client.go +++ b/client/client.go @@ -149,10 +149,9 @@ func (c *Client) ReInit() error { return nil } -// checkError is used to handle processing of errors -// and returns true if we were able to succesfully recover from the error -// returning false if we aren't indicating -func (c *Client) handleError(err error) bool { +// ShouldReInit is used to check the given error +// and return whether or not we should reinitialize the connection +func (c *Client) ShouldReInit(err error) bool { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { return false } From c510083cd3b05b1c50647b96d9886a0aff7b6c6a Mon Sep 17 00:00:00 2001 From: bonedaddy Date: Sat, 30 Jan 2021 16:12:18 -0800 Subject: [PATCH 4/5] update go mod --- go.mod | 1 + go.sum | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index ec78629..756a6d9 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.15 require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/gorilla/websocket v1.4.2 + github.com/kr/pretty v0.2.1 // indirect github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.7.0 github.com/urfave/cli/v2 v2.3.0 diff --git a/go.sum b/go.sum index c64cdd0..b005980 100644 --- a/go.sum +++ b/go.sum @@ -7,13 +7,13 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= -github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= -github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q= From a9efbfcb7889a85fda9c95851be8afadee1217fb Mon Sep 17 00:00:00 2001 From: bonedaddy Date: Sat, 30 Jan 2021 16:43:11 -0800 Subject: [PATCH 5/5] properly drain all messages during reinit --- client/client.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/client/client.go b/client/client.go index b51f76b..b58a086 100644 --- a/client/client.go +++ b/client/client.go @@ -137,14 +137,16 @@ func (c *Client) ReInit() error { if err := c.conn.WriteJSON(c.initMsg); err != nil { return errors.Wrap(err, "fatal error received") } + // drain + _ = c.conn.ReadJSON(nil) for _, msg := range msgs { if err := c.conn.WriteJSON(&msg); err != nil { // TODO(bonedaddy): figure out how to properly handle log.Println("receive error during reinitialization: ", err) return err } - var out interface{} - _ = c.conn.ReadJSON(&out) + // drain + _ = c.conn.ReadJSON(nil) } return nil }