Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance client to do reconnect on max connections from server #935

Merged
merged 4 commits into from Mar 25, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions nats.go
Expand Up @@ -83,6 +83,9 @@ const (

// ACCOUNT_AUTHENTICATION_EXPIRED_ERR is for when nats server account authorization has expired.
ACCOUNT_AUTHENTICATION_EXPIRED_ERR = "account authentication expired"

// MAX_CONNECTIONS_ERR is for when nats server denies the connection due to server max_connections limit
MAX_CONNECTIONS_ERR = "maximum connections exceeded"
)

// Errors
Expand Down Expand Up @@ -161,6 +164,7 @@ var (
ErrMsgAlreadyAckd = errors.New("nats: message was already acknowledged")
ErrStreamInfoMaxSubjects = errors.New("nats: subject details would exceed maximum allowed")
ErrStreamNameAlreadyInUse = errors.New("nats: stream name already in use")
ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded")
)

func init() {
Expand Down Expand Up @@ -3213,6 +3217,8 @@ func (nc *Conn) processErr(ie string) {
// FIXME(dlc) - process Slow Consumer signals special.
if e == STALE_CONNECTION {
nc.processOpErr(ErrStaleConnection)
} else if e == MAX_CONNECTIONS_ERR {
nc.processOpErr(ErrMaxConnectionsExceeded)
} else if strings.HasPrefix(e, PERMISSIONS_ERR) {
nc.processPermissionsViolation(ne)
} else if authErr := checkAuthError(e); authErr != nil {
Expand Down
57 changes: 57 additions & 0 deletions nats_test.go
Expand Up @@ -276,6 +276,63 @@ var testServers = []string{
"nats://localhost:1228",
}

func TestMaxConnectionsReconnect(t *testing.T) {

// Start first server
s1Opts := natsserver.DefaultTestOptions
s1Opts.Port = -1
s1Opts.MaxConn = 2
s1Opts.Cluster = server.ClusterOpts{Name: "test", Host: "127.0.0.1", Port: -1}
s1 := RunServerWithOptions(&s1Opts)
defer s1.Shutdown()

// Start second server
s2Opts := natsserver.DefaultTestOptions
s2Opts.Port = -1
s2Opts.MaxConn = 2
s2Opts.Cluster = server.ClusterOpts{Name: "test", Host: "127.0.0.1", Port: -1}
s2Opts.Routes = server.RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", s1Opts.Cluster.Port))
s2 := RunServerWithOptions(&s2Opts)
defer s2.Shutdown()

opts := []Option{
MaxReconnects(2),
ReconnectWait(10 * time.Millisecond),
Timeout(200 * time.Millisecond),
}

// Create two connections (the current max) to first server
nc1, _ := Connect(s1.ClientURL(), opts...)
defer nc1.Close()
nc1.Flush()

nc2, _ := Connect(s1.ClientURL(), opts...)
defer nc2.Close()
nc2.Flush()

if s1.NumClients() != 2 {
t.Fatalf("Expected 2 client connections to first server. Got %d", s1.NumClients())
}

if s2.NumClients() > 0 {
t.Fatalf("Expected 0 client connections to second server. Got %d", s2.NumClients())
}

// Kick one of our two server connections off first server. One client should reconnect to second server
newS1Opts := s1Opts
newS1Opts.MaxConn = 1
err := s1.ReloadOptions(&newS1Opts)
if err != nil {
t.Fatalf("Unexpected error changing max_connections [%s]", err)
}

time.Sleep(200 * time.Millisecond)
tbeets marked this conversation as resolved.
Show resolved Hide resolved

if s2.NumClients() <= 0 || s1.NumClients() > 1 {
t.Fatalf("Expected client reconnection to second server")
}
}

func TestSimplifiedURLs(t *testing.T) {
for _, test := range []struct {
name string
Expand Down