Skip to content

Commit

Permalink
Merge pull request #935 from tbeets/main
Browse files Browse the repository at this point in the history
Enhance client to do reconnect on max connections from server
  • Loading branch information
tbeets committed Mar 25, 2022
2 parents 31782c0 + 06441fe commit 5753a5f
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 0 deletions.
6 changes: 6 additions & 0 deletions nats.go
Expand Up @@ -83,6 +83,9 @@ const (

// ACCOUNT_AUTHENTICATION_EXPIRED_ERR is for when nats server account authorization has expired.
ACCOUNT_AUTHENTICATION_EXPIRED_ERR = "account authentication expired"

// MAX_CONNECTIONS_ERR is for when nats server denies the connection due to server max_connections limit
MAX_CONNECTIONS_ERR = "maximum connections exceeded"
)

// Errors
Expand Down Expand Up @@ -161,6 +164,7 @@ var (
ErrMsgAlreadyAckd = errors.New("nats: message was already acknowledged")
ErrStreamInfoMaxSubjects = errors.New("nats: subject details would exceed maximum allowed")
ErrStreamNameAlreadyInUse = errors.New("nats: stream name already in use")
ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded")
)

func init() {
Expand Down Expand Up @@ -3213,6 +3217,8 @@ func (nc *Conn) processErr(ie string) {
// FIXME(dlc) - process Slow Consumer signals special.
if e == STALE_CONNECTION {
nc.processOpErr(ErrStaleConnection)
} else if e == MAX_CONNECTIONS_ERR {
nc.processOpErr(ErrMaxConnectionsExceeded)
} else if strings.HasPrefix(e, PERMISSIONS_ERR) {
nc.processPermissionsViolation(ne)
} else if authErr := checkAuthError(e); authErr != nil {
Expand Down
80 changes: 80 additions & 0 deletions nats_test.go
Expand Up @@ -276,6 +276,86 @@ var testServers = []string{
"nats://localhost:1228",
}

func TestMaxConnectionsReconnect(t *testing.T) {

// Start first server
s1Opts := natsserver.DefaultTestOptions
s1Opts.Port = -1
s1Opts.MaxConn = 2
s1Opts.Cluster = server.ClusterOpts{Name: "test", Host: "127.0.0.1", Port: -1}
s1 := RunServerWithOptions(&s1Opts)
defer s1.Shutdown()

// Start second server
s2Opts := natsserver.DefaultTestOptions
s2Opts.Port = -1
s2Opts.MaxConn = 2
s2Opts.Cluster = server.ClusterOpts{Name: "test", Host: "127.0.0.1", Port: -1}
s2Opts.Routes = server.RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", s1Opts.Cluster.Port))
s2 := RunServerWithOptions(&s2Opts)
defer s2.Shutdown()

errCh := make(chan error, 2)
reconnectCh := make(chan struct{})
opts := []Option{
MaxReconnects(2),
ReconnectWait(10 * time.Millisecond),
Timeout(200 * time.Millisecond),
DisconnectErrHandler(func(_ *Conn, err error) {
if err != nil {
errCh <- err
}
}),
ReconnectHandler(func(_ *Conn) {
reconnectCh <- struct{}{}
}),
}

// Create two connections (the current max) to first server
nc1, _ := Connect(s1.ClientURL(), opts...)
defer nc1.Close()
nc1.Flush()

nc2, _ := Connect(s1.ClientURL(), opts...)
defer nc2.Close()
nc2.Flush()

if s1.NumClients() != 2 {
t.Fatalf("Expected 2 client connections to first server. Got %d", s1.NumClients())
}

if s2.NumClients() > 0 {
t.Fatalf("Expected 0 client connections to second server. Got %d", s2.NumClients())
}

// Kick one of our two server connections off first server. One client should reconnect to second server
newS1Opts := s1Opts
newS1Opts.MaxConn = 1
err := s1.ReloadOptions(&newS1Opts)
if err != nil {
t.Fatalf("Unexpected error changing max_connections [%s]", err)
}

select {
case err := <-errCh:
if err != ErrMaxConnectionsExceeded {
t.Fatalf("Unexpected error %v", err)
}
case <-time.After(2 * time.Second):
t.Fatal("Timed out waiting for disconnect event")
}

select {
case <-reconnectCh:
case <-time.After(2 * time.Second):
t.Fatal("Timed out waiting for reconnect event")
}

if s2.NumClients() <= 0 || s1.NumClients() > 1 {
t.Fatalf("Expected client reconnection to second server")
}
}

func TestSimplifiedURLs(t *testing.T) {
for _, test := range []struct {
name string
Expand Down

0 comments on commit 5753a5f

Please sign in to comment.