Skip to content

Commit

Permalink
Fix test, make more simple
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@nats.io>
  • Loading branch information
wallyqs committed Mar 25, 2022
1 parent 944c1f1 commit 0aa7f9d
Showing 1 changed file with 31 additions and 154 deletions.
185 changes: 31 additions & 154 deletions test/headers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"net/http"
"reflect"
"sort"
"sync/atomic"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -127,194 +127,71 @@ func TestRequestMsgRaceAsyncInfo(t *testing.T) {
s := natsserver.RunServer(&s1Opts)
defer s.Shutdown()

eventsCh := make(chan int, 20)
discoverCB := func(nc *nats.Conn) {
eventsCh <- len(nc.DiscoveredServers())
}

reconnectCh := make(chan struct{})
reconnectedEvent := make(chan struct{})
reconnectCB := func(nc *nats.Conn) {
reconnectCh <- struct{}{}
}

copts := []nats.Option{
nats.DiscoveredServersHandler(discoverCB),
nats.DontRandomize(),
nats.ReconnectHandler(reconnectCB),
}
nc, err := nats.Connect(s.ClientURL(), copts...)
nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Error connecting to server: %v", err)
}
defer nc.Close()

// Extra client with old request.
nc2, err := nats.Connect(s.ClientURL(), nats.DontRandomize(), nats.UseOldRequestStyle())
nc2, err := nats.Connect(s.ClientURL(), nats.UseOldRequestStyle())
if err != nil {
t.Fatalf("Error connecting to server: %v", err)
}
defer nc2.Close()

subject := "headers.test"
sub, err := nc.Subscribe(subject, func(m *nats.Msg) {
if _, err := nc.Subscribe(subject, func(m *nats.Msg) {
r := nats.NewMsg(m.Reply)
r.Header["Hdr-Test"] = []string{"bar"}
r.Data = []byte("+OK")
m.RespondMsg(r)
})
if err != nil {
}); err != nil {
t.Fatalf("subscribe failed: %v", err)
}
defer sub.Unsubscribe()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Leave some goroutines publishing in parallel while
// async protocols are being received.
var received, receivedWithContext int64
var receivedOldStyle, receivedOldStyleWithContext int64
var producers int = 50
for i := 0; i < producers; i++ {
go func() {
for range time.NewTicker(1 * time.Millisecond).C {
select {
case <-ctx.Done():
return
case <-reconnectedEvent:
return
default:
}
msg := nats.NewMsg(subject)
msg.Header["Hdr-Test"] = []string{"foo"}

ttl := 250 * time.Millisecond
resp, _ := nc.RequestMsg(msg, ttl)
if resp != nil {
atomic.AddInt64(&received, 1)
}
nc.Flush()

ctx2, cancel2 := context.WithTimeout(context.Background(), ttl)
resp, _ = nc.RequestMsgWithContext(ctx2, msg)
if resp != nil {
atomic.AddInt64(&receivedWithContext, 1)
}
cancel2()

// Check with old style requests as well.
resp, _ = nc2.RequestMsg(msg, ttl)
if resp != nil {
atomic.AddInt64(&receivedOldStyle, 1)
}
ctx2, cancel2 = context.WithTimeout(context.Background(), ttl)
resp, _ = nc2.RequestMsgWithContext(ctx2, msg)
if resp != nil {
atomic.AddInt64(&receivedOldStyleWithContext, 1)
}
cancel2()
}
}()
}
wg := sync.WaitGroup{}
wg.Add(1)
ch := make(chan struct{})
go func() {
defer wg.Done()

// Add servers a few times to get async info protocols.
expectedServers := 5
runningServers := make([]*server.Server, expectedServers)
for i := 0; i < expectedServers; i++ {
s2Opts := natsserver.DefaultTestOptions
s2Opts.Host = "127.0.0.1"
s2Opts.Port = -1
s2Opts.Cluster.Name = "CLUSTER"
s2Opts.Cluster.Host = "127.0.0.1"
s2Opts.Cluster.Port = -1
s2Opts.Routes = server.RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", s.ClusterAddr().Port))

// New servers will not have Header support so APIs ought to fail on reconnect.
s2Opts.NoHeaderSupport = true

s2 := natsserver.RunServer(&s2Opts)
runningServers[i] = s2
time.Sleep(10 * time.Millisecond)
}

defer func() {
for _, rs := range runningServers {
rs.Shutdown()
}
}()

Loop:
for {
select {
case i := <-eventsCh:
if i == expectedServers {
break Loop
for {
s := natsserver.RunServer(&s2Opts)
s.Shutdown()
select {
case <-ch:
return
default:
}
case <-ctx.Done():
t.Fatal("Timed out waiting for enough servers to join")
}
}
if !nc.HeadersSupported() {
t.Fatalf("Expected Headers support")
}

// Trigger a disconnect to reconnect to a server without Headers support.
s.Shutdown()

select {
case <-reconnectCh:
// Stop producers in goroutines.
close(reconnectedEvent)
case <-time.After(2 * time.Second):
t.Fatal("Timed out waiting for reconnect")
}
}()

// Try to send message to server without header support.
msg := nats.NewMsg(subject)
msg.Header["Hdr-Test"] = []string{"quux"}
if _, err := nc.RequestMsg(msg, time.Second); err != nats.ErrHeadersNotSupported {
t.Fatalf("Expected an error, got %v", err)
}
if err := nc.PublishMsg(msg); err != nats.ErrHeadersNotSupported {
t.Fatalf("Expected an error, got %v", err)
}
for i := 0; i < 100; i++ {
nc.RequestMsg(msg, time.Second)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
nc.RequestMsgWithContext(ctx, msg)
cancel()

// Check context based variations as well.
ctx2, cancel2 := context.WithTimeout(context.Background(), 250*time.Millisecond)
defer cancel2()
if _, err := nc.RequestMsgWithContext(ctx2, msg); err != nats.ErrHeadersNotSupported {
t.Fatalf("Expected an error, got %v", err)
}
if _, err := nc2.RequestMsgWithContext(ctx2, msg); err != nats.ErrHeadersNotSupported {
t.Fatalf("Expected an error, got %v", err)
// Check with old style requests as well.
nc2.RequestMsg(msg, time.Second)
ctx2, cancel2 := context.WithTimeout(context.Background(), time.Second)
nc2.RequestMsgWithContext(ctx2, msg)
cancel2()
}

if nc.HeadersSupported() {
t.Fatalf("Unexpected Headers support")
}
if nc2.HeadersSupported() {
t.Fatalf("Unexpected Headers support")
}

count := atomic.LoadInt64(&received)
if int(count) < producers {
t.Errorf("Expected at least %d responses, got: %d", producers, count)
}

count = atomic.LoadInt64(&receivedWithContext)
if int(count) < producers {
t.Errorf("Expected at least %d responses, got: %d", producers, count)
}

count = atomic.LoadInt64(&receivedOldStyle)
if int(count) < producers {
t.Errorf("Expected at least %d responses, got: %d", producers, count)
}

count = atomic.LoadInt64(&receivedOldStyleWithContext)
if int(count) < producers {
t.Errorf("Expected at least %d responses, got: %d", producers, count)
}
close(ch)
wg.Wait()
}

func TestNoHeaderSupport(t *testing.T) {
Expand Down

0 comments on commit 0aa7f9d

Please sign in to comment.