Skip to content

Commit

Permalink
Set S2 writer concurrency to 1
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Sep 25, 2023
1 parent f3411f6 commit d4e8a44
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 3 deletions.
93 changes: 93 additions & 0 deletions server/leafnode_test.go
Expand Up @@ -5471,6 +5471,99 @@ func TestLeafNodeCompression(t *testing.T) {
}
}

func BenchmarkLeafNodeCompression(b *testing.B) {
conf1 := createConfFile(b, []byte(`
port: -1
server_name: "Hub"
accounts {
A { users: [{user: a, password: pwd}] }
B { users: [{user: b, password: pwd}] }
C { users: [{user: c, password: pwd}] }
D { users: [{user: d, password: pwd}] }
}
leafnodes {
port: -1
}
`))
s1, o1 := RunServerWithConfig(conf1)
defer s1.Shutdown()

port := o1.LeafNode.Port
conf2 := createConfFile(b, []byte(fmt.Sprintf(`
port: -1
server_name: "Spoke"
accounts {
A { users: [{user: a, password: pwd}] }
B { users: [{user: b, password: pwd}] }
C { users: [{user: c, password: pwd}] }
D { users: [{user: d, password: pwd}] }
}
leafnodes {
remotes [
{ url: "nats://a:pwd@127.0.0.1:%d", account: "A", compression: s2_better }
{ url: "nats://b:pwd@127.0.0.1:%d", account: "B", compression: s2_best }
{ url: "nats://c:pwd@127.0.0.1:%d", account: "C", compression: s2_fast }
{ url: "nats://d:pwd@127.0.0.1:%d", account: "D", compression: off }
]
}
`, port, port, port, port)))
s2, _ := RunServerWithConfig(conf2)
defer s2.Shutdown()

checkLeafNodeConnectedCount(b, s1, 4)
checkLeafNodeConnectedCount(b, s2, 4)

l, err := s2.Leafz(nil)
require_NoError(b, err)
for _, r := range l.Leafs {
switch {
case r.Account == "A" && r.Compression == CompressionS2Better:
case r.Account == "B" && r.Compression == CompressionS2Best:
case r.Account == "C" && r.Compression == CompressionS2Fast:
case r.Account == "D" && r.Compression == CompressionOff:
default:
b.Fatalf("Account %q had incorrect compression mode %q on leaf connection", r.Account, r.Compression)
}
}

msg := make([]byte, 1024)
for _, p := range []struct {
algo string
user string
}{
{"Better", "a"},
{"Best", "b"},
{"Fast", "c"},
{"Off", "d"},
} {
nc1 := natsConnect(b, s1.ClientURL(), nats.UserInfo(p.user, "pwd"))
nc2 := natsConnect(b, s2.ClientURL(), nats.UserInfo(p.user, "pwd"))

sub, err := nc1.SubscribeSync("foo")
require_NoError(b, err)

time.Sleep(time.Second)

b.Run(p.algo, func(b *testing.B) {
start := time.Now()

for i := 0; i < b.N; i++ {
err = nc2.Publish("foo", msg)
require_NoError(b, err)

_, err = sub.NextMsg(time.Second)
require_NoError(b, err)
}

b.ReportMetric(float64(len(msg)*b.N)/1024/1024, "MB")
b.ReportMetric(float64(len(msg)*b.N)/1024/1024/float64(time.Since(start).Seconds()), "MB/sec")
})

nc1.Close()
nc2.Close()
}
}

func TestLeafNodeCompressionMatrixModes(t *testing.T) {
for _, test := range []struct {
name string
Expand Down
11 changes: 8 additions & 3 deletions server/server.go
Expand Up @@ -572,13 +572,18 @@ func selectS2AutoModeBasedOnRTT(rtt time.Duration, rttThresholds []time.Duration
// with a nil []s2.WriterOption, but not with a nil s2.WriterOption, so
// this is more versatile.
func s2WriterOptions(cm string) []s2.WriterOption {
_opts := [2]s2.WriterOption{}
opts := append(
_opts[:0],
s2.WriterConcurrency(1), // Stop asynchronous flushing in separate goroutines
)
switch cm {
case CompressionS2Uncompressed:
return []s2.WriterOption{s2.WriterUncompressed()}
return append(opts, s2.WriterUncompressed())
case CompressionS2Best:
return []s2.WriterOption{s2.WriterBestCompression()}
return append(opts, s2.WriterBestCompression())
case CompressionS2Better:
return []s2.WriterOption{s2.WriterBetterCompression()}
return append(opts, s2.WriterBetterCompression())
default:
return nil
}
Expand Down

0 comments on commit d4e8a44

Please sign in to comment.