Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set S2 writer concurrency to 1 #4570

Merged
merged 1 commit into from Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
93 changes: 93 additions & 0 deletions server/leafnode_test.go
Expand Up @@ -5471,6 +5471,99 @@ func TestLeafNodeCompression(t *testing.T) {
}
}

func BenchmarkLeafNodeCompression(b *testing.B) {
conf1 := createConfFile(b, []byte(`
port: -1
server_name: "Hub"
accounts {
A { users: [{user: a, password: pwd}] }
B { users: [{user: b, password: pwd}] }
C { users: [{user: c, password: pwd}] }
D { users: [{user: d, password: pwd}] }
}
leafnodes {
port: -1
}
`))
s1, o1 := RunServerWithConfig(conf1)
defer s1.Shutdown()

port := o1.LeafNode.Port
conf2 := createConfFile(b, []byte(fmt.Sprintf(`
port: -1
server_name: "Spoke"
accounts {
A { users: [{user: a, password: pwd}] }
B { users: [{user: b, password: pwd}] }
C { users: [{user: c, password: pwd}] }
D { users: [{user: d, password: pwd}] }
}
leafnodes {
remotes [
{ url: "nats://a:pwd@127.0.0.1:%d", account: "A", compression: s2_better }
{ url: "nats://b:pwd@127.0.0.1:%d", account: "B", compression: s2_best }
{ url: "nats://c:pwd@127.0.0.1:%d", account: "C", compression: s2_fast }
{ url: "nats://d:pwd@127.0.0.1:%d", account: "D", compression: off }
]
}
`, port, port, port, port)))
s2, _ := RunServerWithConfig(conf2)
defer s2.Shutdown()

checkLeafNodeConnectedCount(b, s1, 4)
checkLeafNodeConnectedCount(b, s2, 4)

l, err := s2.Leafz(nil)
require_NoError(b, err)
for _, r := range l.Leafs {
switch {
case r.Account == "A" && r.Compression == CompressionS2Better:
case r.Account == "B" && r.Compression == CompressionS2Best:
case r.Account == "C" && r.Compression == CompressionS2Fast:
case r.Account == "D" && r.Compression == CompressionOff:
default:
b.Fatalf("Account %q had incorrect compression mode %q on leaf connection", r.Account, r.Compression)
}
}

msg := make([]byte, 1024)
for _, p := range []struct {
algo string
user string
}{
{"Better", "a"},
{"Best", "b"},
{"Fast", "c"},
{"Off", "d"},
} {
nc1 := natsConnect(b, s1.ClientURL(), nats.UserInfo(p.user, "pwd"))
nc2 := natsConnect(b, s2.ClientURL(), nats.UserInfo(p.user, "pwd"))

sub, err := nc1.SubscribeSync("foo")
require_NoError(b, err)

time.Sleep(time.Second)

b.Run(p.algo, func(b *testing.B) {
start := time.Now()

for i := 0; i < b.N; i++ {
err = nc2.Publish("foo", msg)
require_NoError(b, err)

_, err = sub.NextMsg(time.Second)
require_NoError(b, err)
}

b.ReportMetric(float64(len(msg)*b.N)/1024/1024, "MB")
b.ReportMetric(float64(len(msg)*b.N)/1024/1024/float64(time.Since(start).Seconds()), "MB/sec")
})

nc1.Close()
nc2.Close()
}
}

func TestLeafNodeCompressionMatrixModes(t *testing.T) {
for _, test := range []struct {
name string
Expand Down
11 changes: 8 additions & 3 deletions server/server.go
Expand Up @@ -572,13 +572,18 @@ func selectS2AutoModeBasedOnRTT(rtt time.Duration, rttThresholds []time.Duration
// with a nil []s2.WriterOption, but not with a nil s2.WriterOption, so
// this is more versatile.
func s2WriterOptions(cm string) []s2.WriterOption {
_opts := [2]s2.WriterOption{}
opts := append(
_opts[:0],
s2.WriterConcurrency(1), // Stop asynchronous flushing in separate goroutines
)
switch cm {
case CompressionS2Uncompressed:
return []s2.WriterOption{s2.WriterUncompressed()}
return append(opts, s2.WriterUncompressed())
case CompressionS2Best:
return []s2.WriterOption{s2.WriterBestCompression()}
return append(opts, s2.WriterBestCompression())
case CompressionS2Better:
return []s2.WriterOption{s2.WriterBetterCompression()}
return append(opts, s2.WriterBetterCompression())
default:
return nil
}
Expand Down