Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Routes: Pinned Accounts connect/reconnect in some cases #4602

Merged
merged 1 commit into from Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 1 addition & 3 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -3470,9 +3470,7 @@ func TestJetStreamClusterNoR1AssetsDuringLameDuck(t *testing.T) {
}()
defer close(qch)

s.mu.RLock()
gacc := s.gacc
s.mu.RUnlock()
gacc := s.GlobalAccount()
if gacc == nil {
t.Fatalf("No global account")
}
Expand Down
63 changes: 50 additions & 13 deletions server/route.go
Expand Up @@ -653,11 +653,14 @@ func (c *client) processRouteInfo(info *Info) {
// We receive an INFO from a server that informs us about another server,
// so the info.ID in the INFO protocol does not match the ID of this route.
if remoteID != _EMPTY_ && remoteID != info.ID {
// We want to know if the existing route supports pooling/pinned-account
// or not when processing the implicit route.
noPool := c.route.noPool
c.mu.Unlock()

// Process this implicit route. We will check that it is not an explicit
// route and/or that it has not been connected already.
s.processImplicitRoute(info)
s.processImplicitRoute(info, noPool)
return
}

Expand Down Expand Up @@ -812,10 +815,14 @@ func (c *client) processRouteInfo(info *Info) {
}
}
// For accounts that are configured to have their own route:
// If this is a solicit route, we already have c.route.accName set in createRoute.
// If this is a solicited route, we already have c.route.accName set in createRoute.
// For non solicited route (the accept side), we will set the account name that
// is present in the INFO protocol.
if !didSolicit {
if didSolicit && len(c.route.accName) > 0 {
// Set it in the info.RouteAccount so that addRoute can use that
// and we properly gossip that this is a route for an account.
info.RouteAccount = string(c.route.accName)
} else if !didSolicit && info.RouteAccount != _EMPTY_ {
c.route.accName = []byte(info.RouteAccount)
}
accName := string(c.route.accName)
Expand Down Expand Up @@ -1002,7 +1009,7 @@ func (s *Server) sendAsyncInfoToClients(regCli, wsCli bool) {
// This will process implicit route information received from another server.
// We will check to see if we have configured or are already connected,
// and if so we will ignore. Otherwise we will attempt to connect.
func (s *Server) processImplicitRoute(info *Info) {
func (s *Server) processImplicitRoute(info *Info, routeNoPool bool) {
remoteID := info.ID

s.mu.Lock()
Expand All @@ -1012,8 +1019,16 @@ func (s *Server) processImplicitRoute(info *Info) {
if remoteID == s.info.ID {
return
}

// Snapshot server options.
opts := s.getOpts()

// Check if this route already exists
if accName := info.RouteAccount; accName != _EMPTY_ {
// If we don't support pooling/pinned account, bail.
if opts.Cluster.PoolSize <= 0 {
return
}
if remotes, ok := s.accRoutes[accName]; ok {
if r := remotes[remoteID]; r != nil {
return
Expand All @@ -1034,13 +1049,22 @@ func (s *Server) processImplicitRoute(info *Info) {
return
}

// Snapshot server options.
opts := s.getOpts()

if info.AuthRequired {
r.User = url.UserPassword(opts.Cluster.Username, opts.Cluster.Password)
}
s.startGoRoutine(func() { s.connectToRoute(r, false, true, info.RouteAccount) })
// If we are processing an implicit route from a route that does not
// support pooling/pinned-accounts, we won't receive an INFO for each of
// the pinned-accounts that we would normally receive. In that case, just
// initiate routes for all our configured pinned accounts.
if routeNoPool && info.RouteAccount == _EMPTY_ && len(opts.Cluster.PinnedAccounts) > 0 {
// Copy since we are going to pass as closure to a go routine.
rURL := r
for _, an := range opts.Cluster.PinnedAccounts {
accName := an
s.startGoRoutine(func() { s.connectToRoute(rURL, false, true, accName) })
}
}
}

// hasThisRouteConfigured returns true if info.Host:info.Port is present
Expand Down Expand Up @@ -1071,7 +1095,10 @@ func (s *Server) forwardNewRouteInfoToKnownServers(info *Info) {

s.forEachRemote(func(r *client) {
r.mu.Lock()
if r.route.remoteID != info.ID {
// If this is a new route for a given account, do not send to a server
// that does not support pooling/pinned-accounts.
if r.route.remoteID != info.ID &&
(info.RouteAccount == _EMPTY_ || (info.RouteAccount != _EMPTY_ && !r.route.noPool)) {
r.enqueueProto(infoJSON)
}
r.mu.Unlock()
Expand Down Expand Up @@ -1855,7 +1882,7 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string
// server and need to handle things differently.
if info.RoutePoolSize <= 0 || opts.Cluster.PoolSize < 0 {
if accName != _EMPTY_ {
invProtoErr = fmt.Sprintf("Not possible to have a dedicate route for account %q between those servers", accName)
invProtoErr = fmt.Sprintf("Not possible to have a dedicated route for account %q between those servers", accName)
// In this case, make sure this route does not attempt to reconnect
c.setNoReconnect()
} else {
Expand Down Expand Up @@ -2731,6 +2758,7 @@ func (s *Server) removeRoute(c *client) {
opts = s.getOpts()
rURL *url.URL
noPool bool
didSolicit bool
)
c.mu.Lock()
cid := c.cid
Expand All @@ -2749,6 +2777,7 @@ func (s *Server) removeRoute(c *client) {
connectURLs = r.connectURLs
wsConnectURLs = r.wsConnURLs
rURL = r.url
didSolicit = r.didSolicit
}
c.mu.Unlock()
if accName != _EMPTY_ {
Expand Down Expand Up @@ -2807,10 +2836,18 @@ func (s *Server) removeRoute(c *client) {
if lnURL != _EMPTY_ && s.removeLeafNodeURL(lnURL) {
s.sendAsyncLeafNodeInfo()
}
// If this server has pooling and the route for this remote
// was a "no pool" route, attempt to reconnect.
if s.routesPoolSize > 1 && noPool {
s.startGoRoutine(func() { s.connectToRoute(rURL, true, true, _EMPTY_) })
// If this server has pooling/pinned accounts and the route for
// this remote was a "no pool" route, attempt to reconnect.
if noPool {
if s.routesPoolSize > 1 {
s.startGoRoutine(func() { s.connectToRoute(rURL, didSolicit, true, _EMPTY_) })
}
if len(opts.Cluster.PinnedAccounts) > 0 {
for _, an := range opts.Cluster.PinnedAccounts {
accName := an
s.startGoRoutine(func() { s.connectToRoute(rURL, didSolicit, true, accName) })
}
}
}
}
// This is for gateway code. Remove this route from a map that uses
Expand Down