-
Notifications
You must be signed in to change notification settings - Fork 882
/
load_balancer_prose_test.go
130 lines (110 loc) · 4.69 KB
/
load_balancer_prose_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package integration
import (
"context"
"fmt"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/integration/mtest"
"go.mongodb.org/mongo-driver/mongo/options"
)
func TestLoadBalancerSupport(t *testing.T) {
mt := mtest.New(t, mtest.NewOptions().Topologies(mtest.LoadBalanced).CreateClient(false))
defer mt.Close()
mt.Run("RunCommandCursor pins to a connection", func(mt *mtest.T) {
// The LB spec tests cover the behavior for cursors created by CRUD operations, but RunCommandCursor is
// Go-specific so there is no spec test coverage for it.
initCollection(mt, mt.Coll)
findCmd := bson.D{
{"find", mt.Coll.Name()},
{"filter", bson.D{}},
{"batchSize", 2},
}
cursor, err := mt.DB.RunCommandCursor(context.Background(), findCmd)
assert.Nil(mt, err, "RunCommandCursor error: %v", err)
defer func() {
_ = cursor.Close(context.Background())
}()
assert.True(mt, cursor.ID() > 0, "expected cursor ID to be non-zero")
assert.Equal(mt, 1, mt.NumberConnectionsCheckedOut(),
"expected one connection to be checked out, got %d", mt.NumberConnectionsCheckedOut())
})
mt.RunOpts("wait queue timeout errors include extra information", noClientOpts, func(mt *mtest.T) {
// There are spec tests to assert this behavior, but they rely on the waitQueueTimeoutMS Client option, which is
// not supported in Go, so we have to skip them. These prose tests make the same assertions, but use context
// deadlines to force wait queue timeout errors.
assertErrorHasInfo := func(mt *mtest.T, err error, numCursorConns, numTxnConns, numOtherConns int) {
mt.Helper()
assert.NotNil(mt, err, "expected wait queue timeout error, got nil")
expectedMsg := fmt.Sprintf("maxPoolSize: 1, "+
"connections in use by cursors: %d, "+
"connections in use by transactions: %d, "+
"connections in use by other operations: %d",
numCursorConns, numTxnConns, numOtherConns,
)
assert.True(mt, strings.Contains(err.Error(), expectedMsg),
"expected error %q to contain substring %q", err, expectedMsg)
}
maxPoolSizeMtOpts := mtest.NewOptions().
ClientOptions(options.Client().SetMaxPoolSize(1))
mt.RunOpts("cursors", maxPoolSizeMtOpts, func(mt *mtest.T) {
initCollection(mt, mt.Coll)
findOpts := options.Find().SetBatchSize(2)
cursor, err := mt.Coll.Find(context.Background(), bson.M{}, findOpts)
assert.Nil(mt, err, "Find error: %v", err)
defer func() {
_ = cursor.Close(context.Background())
}()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
defer cancel()
_, err = mt.Coll.InsertOne(ctx, bson.M{"x": 1})
assertErrorHasInfo(mt, err, 1, 0, 0)
})
mt.RunOpts("transactions", maxPoolSizeMtOpts, func(mt *mtest.T) {
sess, err := mt.Client.StartSession()
assert.Nil(mt, err, "StartSession error: %v", err)
defer sess.EndSession(context.Background())
sessCtx := mongo.NewSessionContext(context.Background(), sess)
// Start a transaction and perform one transactional operation to pin a connection.
err = sess.StartTransaction()
assert.Nil(mt, err, "StartTransaction error: %v", err)
_, err = mt.Coll.InsertOne(sessCtx, bson.M{"x": 1})
assert.Nil(mt, err, "InsertOne error: %v", err)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
defer cancel()
_, err = mt.Coll.InsertOne(ctx, bson.M{"x": 1})
assertErrorHasInfo(mt, err, 0, 1, 0)
})
// GODRIVER-2867: Test that connections are unpinned from transactions
// when the transaction session is ended. Create a Client with
// maxPoolSize=1 and expect that it can start and commit 5 transactions
// with that 1 connection.
mt.RunOpts("transaction connections are unpinned", maxPoolSizeMtOpts, func(mt *mtest.T) {
{
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
for i := 0; i < 5; i++ {
sess, err := mt.Client.StartSession()
require.NoError(mt, err, "StartSession error")
err = sess.StartTransaction()
require.NoError(mt, err, "StartTransaction error")
ctx := mongo.NewSessionContext(ctx, sess)
_, err = mt.Coll.InsertOne(ctx, bson.M{"x": 1})
assert.NoError(mt, err, "InsertOne error")
err = sess.CommitTransaction(ctx)
assert.NoError(mt, err, "CommitTransaction error")
sess.EndSession(ctx)
}
}
})
})
}