From d8b839d1a84ebb0b2c1fc995136ac418c976e38a Mon Sep 17 00:00:00 2001 From: Todd Beets Date: Fri, 25 Mar 2022 09:25:43 -0700 Subject: [PATCH 1/4] Add server max connections exceeded error handling to attempt reconnect --- nats.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/nats.go b/nats.go index 566fced52..98fbe9edb 100644 --- a/nats.go +++ b/nats.go @@ -83,6 +83,9 @@ const ( // ACCOUNT_AUTHENTICATION_EXPIRED_ERR is for when nats server account authorization has expired. ACCOUNT_AUTHENTICATION_EXPIRED_ERR = "account authentication expired" + + // MAX_CONNECTIONS_ERR is for when nats server denies the connection due to server max_connections limit + MAX_CONNECTIONS_ERR = "maximum connections exceeded" ) // Errors @@ -161,6 +164,7 @@ var ( ErrMsgAlreadyAckd = errors.New("nats: message was already acknowledged") ErrStreamInfoMaxSubjects = errors.New("nats: subject details would exceed maximum allowed") ErrStreamNameAlreadyInUse = errors.New("nats: stream name already in use") + ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded") ) func init() { @@ -3213,6 +3217,8 @@ func (nc *Conn) processErr(ie string) { // FIXME(dlc) - process Slow Consumer signals special. if e == STALE_CONNECTION { nc.processOpErr(ErrStaleConnection) + } else if e == MAX_CONNECTIONS_ERR { + nc.processOpErr(ErrMaxConnectionsExceeded) } else if strings.HasPrefix(e, PERMISSIONS_ERR) { nc.processPermissionsViolation(ne) } else if authErr := checkAuthError(e); authErr != nil { From 17302cdd235569ae7a2085cdc1bf5d0fe22a157e Mon Sep 17 00:00:00 2001 From: Todd Beets Date: Fri, 25 Mar 2022 14:00:14 -0700 Subject: [PATCH 2/4] Add test case for auto reconnect on max_connections exceeded from server. --- nats_test.go | 63 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/nats_test.go b/nats_test.go index 123df9812..86d4a15a4 100644 --- a/nats_test.go +++ b/nats_test.go @@ -276,6 +276,69 @@ var testServers = []string{ "nats://localhost:1228", } +func TestMaxConnectionsReconnect(t *testing.T) { + // Join cluster on first server + routes := []*url.URL{&url.URL{Scheme: "nats", Host: fmt.Sprintf("127.0.0.1:%d", TEST_PORT)}} + + // Start first server + s1Opts := natsserver.DefaultTestOptions + s1Opts.Port = -1 + s1Opts.MaxConn = 2 + s1Opts.Cluster = server.ClusterOpts{Name: "test", Host: "127.0.0.1", Port: TEST_PORT} + s1Opts.Routes = routes + s1 := RunServerWithOptions(&s1Opts) + defer s1.Shutdown() + + // Start second server + s2Opts := natsserver.DefaultTestOptions + s2Opts.Port = -1 + s2Opts.MaxConn = 2 + s2Opts.Cluster = server.ClusterOpts{Name: "test", Host: "127.0.0.1", Port: TEST_PORT + 1} + s2Opts.Routes = routes + s2 := RunServerWithOptions(&s2Opts) + defer s2.Shutdown() + + // Only explicitly connect to first server + var opts = Options{ + Url: s1.ClientURL(), + AllowReconnect: true, + MaxReconnect: 2, + ReconnectWait: 10 * time.Millisecond, + Timeout: 200 * time.Millisecond, + } + + // Create two connections (the current max) for first server + nc1, _ := opts.Connect() + defer nc1.Close() + nc1.Flush() + + nc2, _ := opts.Connect() + defer nc2.Close() + nc2.Flush() + + if s1.NumClients() != 2 { + t.Fatalf("Expected 2 client connections to first server. Got %d\n", s1.NumClients()) + } + + if s2.NumClients() > 0 { + t.Fatalf("Expected 0 client connections to second server. Got %d\n", s2.NumClients()) + } + + // Kick one of our two server connections off first server. One client should reconnect to second server. + newS1Opts := s1Opts + newS1Opts.MaxConn = 1 + err := s1.ReloadOptions(&newS1Opts) + if err != nil { + t.Fatalf("Unexpected error changing max_connections [%s]", err) + } + + time.Sleep(200 * time.Millisecond) + + if s2.NumClients() <= 0 || s1.NumClients() > 1 { + t.Fatalf("Expected client reconnection to second server.\n") + } +} + func TestSimplifiedURLs(t *testing.T) { for _, test := range []struct { name string From e74477b1aa0dac1fd50a7df8298c8b692e66e3aa Mon Sep 17 00:00:00 2001 From: Todd Beets Date: Fri, 25 Mar 2022 15:41:13 -0700 Subject: [PATCH 3/4] Streamline max connections reconnect test case --- nats_test.go | 34 ++++++++++++++-------------------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/nats_test.go b/nats_test.go index 86d4a15a4..b1e1c8b51 100644 --- a/nats_test.go +++ b/nats_test.go @@ -277,15 +277,12 @@ var testServers = []string{ } func TestMaxConnectionsReconnect(t *testing.T) { - // Join cluster on first server - routes := []*url.URL{&url.URL{Scheme: "nats", Host: fmt.Sprintf("127.0.0.1:%d", TEST_PORT)}} // Start first server s1Opts := natsserver.DefaultTestOptions s1Opts.Port = -1 s1Opts.MaxConn = 2 - s1Opts.Cluster = server.ClusterOpts{Name: "test", Host: "127.0.0.1", Port: TEST_PORT} - s1Opts.Routes = routes + s1Opts.Cluster = server.ClusterOpts{Name: "test", Host: "127.0.0.1", Port: -1} s1 := RunServerWithOptions(&s1Opts) defer s1.Shutdown() @@ -293,38 +290,35 @@ func TestMaxConnectionsReconnect(t *testing.T) { s2Opts := natsserver.DefaultTestOptions s2Opts.Port = -1 s2Opts.MaxConn = 2 - s2Opts.Cluster = server.ClusterOpts{Name: "test", Host: "127.0.0.1", Port: TEST_PORT + 1} - s2Opts.Routes = routes + s2Opts.Cluster = server.ClusterOpts{Name: "test", Host: "127.0.0.1", Port: -1} + s2Opts.Routes = server.RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", s1Opts.Cluster.Port)) s2 := RunServerWithOptions(&s2Opts) defer s2.Shutdown() - // Only explicitly connect to first server - var opts = Options{ - Url: s1.ClientURL(), - AllowReconnect: true, - MaxReconnect: 2, - ReconnectWait: 10 * time.Millisecond, - Timeout: 200 * time.Millisecond, + opts := []Option{ + MaxReconnects(2), + ReconnectWait(10 * time.Millisecond), + Timeout(200 * time.Millisecond), } - // Create two connections (the current max) for first server - nc1, _ := opts.Connect() + // Create two connections (the current max) to first server + nc1, _ := Connect(s1.ClientURL(), opts...) defer nc1.Close() nc1.Flush() - nc2, _ := opts.Connect() + nc2, _ := Connect(s1.ClientURL(), opts...) defer nc2.Close() nc2.Flush() if s1.NumClients() != 2 { - t.Fatalf("Expected 2 client connections to first server. Got %d\n", s1.NumClients()) + t.Fatalf("Expected 2 client connections to first server. Got %d", s1.NumClients()) } if s2.NumClients() > 0 { - t.Fatalf("Expected 0 client connections to second server. Got %d\n", s2.NumClients()) + t.Fatalf("Expected 0 client connections to second server. Got %d", s2.NumClients()) } - // Kick one of our two server connections off first server. One client should reconnect to second server. + // Kick one of our two server connections off first server. One client should reconnect to second server newS1Opts := s1Opts newS1Opts.MaxConn = 1 err := s1.ReloadOptions(&newS1Opts) @@ -335,7 +329,7 @@ func TestMaxConnectionsReconnect(t *testing.T) { time.Sleep(200 * time.Millisecond) if s2.NumClients() <= 0 || s1.NumClients() > 1 { - t.Fatalf("Expected client reconnection to second server.\n") + t.Fatalf("Expected client reconnection to second server") } } From 06441fe061b56c9662c6defba3bed1782e405c31 Mon Sep 17 00:00:00 2001 From: Todd Beets Date: Fri, 25 Mar 2022 16:06:03 -0700 Subject: [PATCH 4/4] Replaced hard-coded Sleep with channels --- nats_test.go | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/nats_test.go b/nats_test.go index b1e1c8b51..8a81d49be 100644 --- a/nats_test.go +++ b/nats_test.go @@ -295,10 +295,20 @@ func TestMaxConnectionsReconnect(t *testing.T) { s2 := RunServerWithOptions(&s2Opts) defer s2.Shutdown() + errCh := make(chan error, 2) + reconnectCh := make(chan struct{}) opts := []Option{ MaxReconnects(2), ReconnectWait(10 * time.Millisecond), Timeout(200 * time.Millisecond), + DisconnectErrHandler(func(_ *Conn, err error) { + if err != nil { + errCh <- err + } + }), + ReconnectHandler(func(_ *Conn) { + reconnectCh <- struct{}{} + }), } // Create two connections (the current max) to first server @@ -326,7 +336,20 @@ func TestMaxConnectionsReconnect(t *testing.T) { t.Fatalf("Unexpected error changing max_connections [%s]", err) } - time.Sleep(200 * time.Millisecond) + select { + case err := <-errCh: + if err != ErrMaxConnectionsExceeded { + t.Fatalf("Unexpected error %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("Timed out waiting for disconnect event") + } + + select { + case <-reconnectCh: + case <-time.After(2 * time.Second): + t.Fatal("Timed out waiting for reconnect event") + } if s2.NumClients() <= 0 || s1.NumClients() > 1 { t.Fatalf("Expected client reconnection to second server")