Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed issues with leafnode compression negotiation #4230

Merged
merged 1 commit into from Jun 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 2 additions & 5 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -3999,10 +3999,7 @@ func TestJetStreamClusterLeafnodePlusDaisyChainSetup(t *testing.T) {
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, domain: CLOUD, store_dir: '%s'}

leaf {
listen: 127.0.0.1:-1
compression: off
}
leaf { listen: 127.0.0.1:-1 }

cluster {
name: %s
Expand Down Expand Up @@ -4059,7 +4056,7 @@ func TestJetStreamClusterLeafnodePlusDaisyChainSetup(t *testing.T) {
var leafFrag = `
leaf {
listen: 127.0.0.1:-1
remotes [ { urls: [ %s ], account: "T", compression:off }, { urls: [ %s ], account: "F", compression: off } ]
remotes [ { urls: [ %s ], account: "T" }, { urls: [ %s ], account: "F" } ]
}`

genLeafTmpl := func(tmpl string, c *cluster) string {
Expand Down
37 changes: 21 additions & 16 deletions server/leafnode.go
Expand Up @@ -696,6 +696,8 @@ func (s *Server) startLeafNodeAcceptLoop() {

tlsRequired := opts.LeafNode.TLSConfig != nil
tlsVerify := tlsRequired && opts.LeafNode.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert
// Do not set compression in this Info object, it would possibly cause
// issues when sending asynchronous INFO to the remote.
info := Info{
ID: s.info.ID,
Name: s.info.Name,
Expand All @@ -712,11 +714,6 @@ func (s *Server) startLeafNodeAcceptLoop() {
Proto: 1, // Fixed for now.
InfoOnConnect: true,
}
// For tests that want to simulate old servers, do not set the compression
// on the INFO protocol if configured with CompressionNotSupported.
if cm := opts.LeafNode.Compression.Mode; cm != CompressionNotSupported {
info.Compression = cm
}
// If we have selected a random port...
if port == 0 {
// Write resolved port back to options.
Expand Down Expand Up @@ -987,7 +984,11 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf
// Grab server variables
s.mu.Lock()
info = s.copyLeafNodeInfo()
info.Compression = opts.LeafNode.Compression.Mode
// For tests that want to simulate old servers, do not set the compression
// on the INFO protocol if configured with CompressionNotSupported.
if cm := opts.LeafNode.Compression.Mode; cm != CompressionNotSupported {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any value in checking that the proposed compression mode is one we understand?

Copy link
Member Author

@kozlovic kozlovic Jun 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand the question. If it is "are we sure that opts.LeafNode.Compression.Mode is a valid value here?", then yes, it has been already validated on startup (or reload). The point of this test here, is that if we configure as "CompressionNotSupported" (likely in tests), then we don't include the Compression field in the INFO protocol, which makes this server behaves as an old server (protocol wise), since this field won't be set/present in old servers.

info.Compression = cm
}
s.generateNonce(nonce[:])
s.mu.Unlock()
}
Expand Down Expand Up @@ -1201,14 +1202,21 @@ func (c *client) processLeafnodeInfo(info *Info) {
c.leaf.compression = CompressionOff
}
}
// Accepting side does not normally process an INFO protocol during
// initial connection handshake. So we keep it consistent by returning
// if we are not soliciting.
if !didSolicit {
// If we had created the ping timer instead of the auth timer, we will
// clear the ping timer and set the auth timer now that the compression
// negotiation is done.
if info.Compression != _EMPTY_ && c.ping.tmr != nil {
clearTimer(&c.ping.tmr)
c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout))
}
c.mu.Unlock()
return
}
// Fall through and process the INFO protocol as usual.
} else if firstINFO && !didSolicit && needsCompression(opts.LeafNode.Compression.Mode) {
// We used the ping timer instead of auth timer when accepting a remote
// connection so that we can exchange INFO protocols and not have the
// parser return a protocol violation. Now that the negotiation is over
// stop the ping timer and set the auth timer.
clearTimer(&c.ping.tmr)
c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout))
}

// Note: For now, only the initial INFO has a nonce. We
Expand Down Expand Up @@ -2859,9 +2867,6 @@ func (s *Server) leafNodeResumeConnectProcess(c *client) {
// Spin up the write loop.
s.startGoRoutine(func() { c.writeLoop() })

// In case there was compression negotiation, the timer could have been
// already created. Destroy and recreate with different callback.
clearTimer(&c.ping.tmr)
// timeout leafNodeFinishConnectProcess
c.ping.tmr = time.AfterFunc(connectProcessTimeout, func() {
c.mu.Lock()
Expand Down
69 changes: 69 additions & 0 deletions server/leafnode_test.go
Expand Up @@ -5880,3 +5880,72 @@ func TestLeafNodeCompressionWithWSGetNeedsData(t *testing.T) {
require_True(t, len(msg.Data) == 156)
require_Equal(t, string(msg.Data), payload)
}

func TestLeafNodeCompressionAuthTimeout(t *testing.T) {
hconf := createConfFile(t, []byte(`
port: -1
server_name: "hub"
leafnodes {
port: -1
authorization {
timeout: 0.75
}
}
`))
sh, oh := RunServerWithConfig(hconf)
defer sh.Shutdown()

sconfTmpl := `
port: -1
server_name: "%s"
cluster {
port: -1
name: "spoke"
%s
}
leafnodes {
port: -1
remotes [
{ url: "nats://127.0.0.1:%d" }
]
}
`
s1conf := createConfFile(t, []byte(fmt.Sprintf(sconfTmpl, "SP1", _EMPTY_, oh.LeafNode.Port)))
s1, o1 := RunServerWithConfig(s1conf)
defer s1.Shutdown()

s2conf := createConfFile(t, []byte(fmt.Sprintf(sconfTmpl, "SP2", fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", o1.Cluster.Port), oh.LeafNode.Port)))
s2, _ := RunServerWithConfig(s2conf)
defer s2.Shutdown()

checkClusterFormed(t, s1, s2)

checkLeafNodeConnected(t, s1)
checkLeafNodeConnected(t, s2)

getCID := func(s *Server) uint64 {
s.mu.RLock()
defer s.mu.RUnlock()
var cid uint64
for _, l := range s.leafs {
l.mu.Lock()
cid = l.cid
l.mu.Unlock()
}
return cid
}
leaf1 := getCID(s1)
leaf2 := getCID(s2)

// Wait for more than auth timeout
time.Sleep(time.Second)

checkLeafNodeConnected(t, s1)
checkLeafNodeConnected(t, s2)
if l1 := getCID(s1); l1 != leaf1 {
t.Fatalf("Leaf connection first connection had CID %v, now %v", leaf1, l1)
}
if l2 := getCID(s2); l2 != leaf2 {
t.Fatalf("Leaf connection first connection had CID %v, now %v", leaf2, l2)
}
}