Skip to content

Commit

Permalink
Fixed issues with leafnode compression negotiation (#4230)
Browse files Browse the repository at this point in the history
When a server would send an asynchronous INFO to a remote server it
would incorrectly contain compression information that could cause
issues with one side thinking that the connection should be compressed
while the other side was not.

It also caused the authentication timer to be incorrectly set which
would cause a disconnect.

Signed-off-by: Ivan Kozlovic <ijkozlovic@gmail.com>
  • Loading branch information
derekcollison committed Jun 10, 2023
2 parents a1f0351 + 7ff0ea4 commit 975f004
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 21 deletions.
7 changes: 2 additions & 5 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -3999,10 +3999,7 @@ func TestJetStreamClusterLeafnodePlusDaisyChainSetup(t *testing.T) {
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, domain: CLOUD, store_dir: '%s'}
leaf {
listen: 127.0.0.1:-1
compression: off
}
leaf { listen: 127.0.0.1:-1 }
cluster {
name: %s
Expand Down Expand Up @@ -4059,7 +4056,7 @@ func TestJetStreamClusterLeafnodePlusDaisyChainSetup(t *testing.T) {
var leafFrag = `
leaf {
listen: 127.0.0.1:-1
remotes [ { urls: [ %s ], account: "T", compression:off }, { urls: [ %s ], account: "F", compression: off } ]
remotes [ { urls: [ %s ], account: "T" }, { urls: [ %s ], account: "F" } ]
}`

genLeafTmpl := func(tmpl string, c *cluster) string {
Expand Down
37 changes: 21 additions & 16 deletions server/leafnode.go
Expand Up @@ -696,6 +696,8 @@ func (s *Server) startLeafNodeAcceptLoop() {

tlsRequired := opts.LeafNode.TLSConfig != nil
tlsVerify := tlsRequired && opts.LeafNode.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert
// Do not set compression in this Info object, it would possibly cause
// issues when sending asynchronous INFO to the remote.
info := Info{
ID: s.info.ID,
Name: s.info.Name,
Expand All @@ -712,11 +714,6 @@ func (s *Server) startLeafNodeAcceptLoop() {
Proto: 1, // Fixed for now.
InfoOnConnect: true,
}
// For tests that want to simulate old servers, do not set the compression
// on the INFO protocol if configured with CompressionNotSupported.
if cm := opts.LeafNode.Compression.Mode; cm != CompressionNotSupported {
info.Compression = cm
}
// If we have selected a random port...
if port == 0 {
// Write resolved port back to options.
Expand Down Expand Up @@ -987,7 +984,11 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf
// Grab server variables
s.mu.Lock()
info = s.copyLeafNodeInfo()
info.Compression = opts.LeafNode.Compression.Mode
// For tests that want to simulate old servers, do not set the compression
// on the INFO protocol if configured with CompressionNotSupported.
if cm := opts.LeafNode.Compression.Mode; cm != CompressionNotSupported {
info.Compression = cm
}
s.generateNonce(nonce[:])
s.mu.Unlock()
}
Expand Down Expand Up @@ -1201,14 +1202,21 @@ func (c *client) processLeafnodeInfo(info *Info) {
c.leaf.compression = CompressionOff
}
}
// Accepting side does not normally process an INFO protocol during
// initial connection handshake. So we keep it consistent by returning
// if we are not soliciting.
if !didSolicit {
// If we had created the ping timer instead of the auth timer, we will
// clear the ping timer and set the auth timer now that the compression
// negotiation is done.
if info.Compression != _EMPTY_ && c.ping.tmr != nil {
clearTimer(&c.ping.tmr)
c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout))
}
c.mu.Unlock()
return
}
// Fall through and process the INFO protocol as usual.
} else if firstINFO && !didSolicit && needsCompression(opts.LeafNode.Compression.Mode) {
// We used the ping timer instead of auth timer when accepting a remote
// connection so that we can exchange INFO protocols and not have the
// parser return a protocol violation. Now that the negotiation is over
// stop the ping timer and set the auth timer.
clearTimer(&c.ping.tmr)
c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout))
}

// Note: For now, only the initial INFO has a nonce. We
Expand Down Expand Up @@ -2859,9 +2867,6 @@ func (s *Server) leafNodeResumeConnectProcess(c *client) {
// Spin up the write loop.
s.startGoRoutine(func() { c.writeLoop() })

// In case there was compression negotiation, the timer could have been
// already created. Destroy and recreate with different callback.
clearTimer(&c.ping.tmr)
// timeout leafNodeFinishConnectProcess
c.ping.tmr = time.AfterFunc(connectProcessTimeout, func() {
c.mu.Lock()
Expand Down
69 changes: 69 additions & 0 deletions server/leafnode_test.go
Expand Up @@ -5880,3 +5880,72 @@ func TestLeafNodeCompressionWithWSGetNeedsData(t *testing.T) {
require_True(t, len(msg.Data) == 156)
require_Equal(t, string(msg.Data), payload)
}

func TestLeafNodeCompressionAuthTimeout(t *testing.T) {
hconf := createConfFile(t, []byte(`
port: -1
server_name: "hub"
leafnodes {
port: -1
authorization {
timeout: 0.75
}
}
`))
sh, oh := RunServerWithConfig(hconf)
defer sh.Shutdown()

sconfTmpl := `
port: -1
server_name: "%s"
cluster {
port: -1
name: "spoke"
%s
}
leafnodes {
port: -1
remotes [
{ url: "nats://127.0.0.1:%d" }
]
}
`
s1conf := createConfFile(t, []byte(fmt.Sprintf(sconfTmpl, "SP1", _EMPTY_, oh.LeafNode.Port)))
s1, o1 := RunServerWithConfig(s1conf)
defer s1.Shutdown()

s2conf := createConfFile(t, []byte(fmt.Sprintf(sconfTmpl, "SP2", fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", o1.Cluster.Port), oh.LeafNode.Port)))
s2, _ := RunServerWithConfig(s2conf)
defer s2.Shutdown()

checkClusterFormed(t, s1, s2)

checkLeafNodeConnected(t, s1)
checkLeafNodeConnected(t, s2)

getCID := func(s *Server) uint64 {
s.mu.RLock()
defer s.mu.RUnlock()
var cid uint64
for _, l := range s.leafs {
l.mu.Lock()
cid = l.cid
l.mu.Unlock()
}
return cid
}
leaf1 := getCID(s1)
leaf2 := getCID(s2)

// Wait for more than auth timeout
time.Sleep(time.Second)

checkLeafNodeConnected(t, s1)
checkLeafNodeConnected(t, s2)
if l1 := getCID(s1); l1 != leaf1 {
t.Fatalf("Leaf connection first connection had CID %v, now %v", leaf1, l1)
}
if l2 := getCID(s2); l2 != leaf2 {
t.Fatalf("Leaf connection first connection had CID %v, now %v", leaf2, l2)
}
}

0 comments on commit 975f004

Please sign in to comment.