Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance client to do reconnect on max connections from server #935

Merged
merged 4 commits into from Mar 25, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions nats.go
Expand Up @@ -83,6 +83,9 @@ const (

// ACCOUNT_AUTHENTICATION_EXPIRED_ERR is for when nats server account authorization has expired.
ACCOUNT_AUTHENTICATION_EXPIRED_ERR = "account authentication expired"

// MAX_CONNECTIONS_ERR is for when nats server denies the connection due to server max_connections limit
MAX_CONNECTIONS_ERR = "maximum connections exceeded"
)

// Errors
Expand Down Expand Up @@ -161,6 +164,7 @@ var (
ErrMsgAlreadyAckd = errors.New("nats: message was already acknowledged")
ErrStreamInfoMaxSubjects = errors.New("nats: subject details would exceed maximum allowed")
ErrStreamNameAlreadyInUse = errors.New("nats: stream name already in use")
ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded")
)

func init() {
Expand Down Expand Up @@ -3213,6 +3217,8 @@ func (nc *Conn) processErr(ie string) {
// FIXME(dlc) - process Slow Consumer signals special.
if e == STALE_CONNECTION {
nc.processOpErr(ErrStaleConnection)
} else if e == MAX_CONNECTIONS_ERR {
nc.processOpErr(ErrMaxConnectionsExceeded)
} else if strings.HasPrefix(e, PERMISSIONS_ERR) {
nc.processPermissionsViolation(ne)
} else if authErr := checkAuthError(e); authErr != nil {
Expand Down
63 changes: 63 additions & 0 deletions nats_test.go
Expand Up @@ -276,6 +276,69 @@ var testServers = []string{
"nats://localhost:1228",
}

func TestMaxConnectionsReconnect(t *testing.T) {
// Join cluster on first server
routes := []*url.URL{&url.URL{Scheme: "nats", Host: fmt.Sprintf("127.0.0.1:%d", TEST_PORT)}}
tbeets marked this conversation as resolved.
Show resolved Hide resolved

// Start first server
s1Opts := natsserver.DefaultTestOptions
s1Opts.Port = -1
s1Opts.MaxConn = 2
s1Opts.Cluster = server.ClusterOpts{Name: "test", Host: "127.0.0.1", Port: TEST_PORT}
tbeets marked this conversation as resolved.
Show resolved Hide resolved
s1Opts.Routes = routes
tbeets marked this conversation as resolved.
Show resolved Hide resolved
s1 := RunServerWithOptions(&s1Opts)
defer s1.Shutdown()

// Start second server
s2Opts := natsserver.DefaultTestOptions
s2Opts.Port = -1
s2Opts.MaxConn = 2
s2Opts.Cluster = server.ClusterOpts{Name: "test", Host: "127.0.0.1", Port: TEST_PORT + 1}
tbeets marked this conversation as resolved.
Show resolved Hide resolved
s2Opts.Routes = routes
tbeets marked this conversation as resolved.
Show resolved Hide resolved
s2 := RunServerWithOptions(&s2Opts)
defer s2.Shutdown()

// Only explicitly connect to first server
var opts = Options{
tbeets marked this conversation as resolved.
Show resolved Hide resolved
Url: s1.ClientURL(),
AllowReconnect: true,
MaxReconnect: 2,
ReconnectWait: 10 * time.Millisecond,
Timeout: 200 * time.Millisecond,
}

// Create two connections (the current max) for first server
nc1, _ := opts.Connect()
defer nc1.Close()
nc1.Flush()

nc2, _ := opts.Connect()
defer nc2.Close()
nc2.Flush()

if s1.NumClients() != 2 {
t.Fatalf("Expected 2 client connections to first server. Got %d\n", s1.NumClients())
tbeets marked this conversation as resolved.
Show resolved Hide resolved
}

if s2.NumClients() > 0 {
t.Fatalf("Expected 0 client connections to second server. Got %d\n", s2.NumClients())
tbeets marked this conversation as resolved.
Show resolved Hide resolved
}

// Kick one of our two server connections off first server. One client should reconnect to second server.
newS1Opts := s1Opts
newS1Opts.MaxConn = 1
err := s1.ReloadOptions(&newS1Opts)
if err != nil {
t.Fatalf("Unexpected error changing max_connections [%s]", err)
}

time.Sleep(200 * time.Millisecond)
tbeets marked this conversation as resolved.
Show resolved Hide resolved

if s2.NumClients() <= 0 || s1.NumClients() > 1 {
t.Fatalf("Expected client reconnection to second server.\n")
tbeets marked this conversation as resolved.
Show resolved Hide resolved
}
}

func TestSimplifiedURLs(t *testing.T) {
for _, test := range []struct {
name string
Expand Down