/
topology.go
138 lines (109 loc) · 3.13 KB
/
topology.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package vshard_router
import (
"context"
"fmt"
"sync/atomic"
"github.com/google/uuid"
"github.com/tarantool/go-tarantool/v2"
"github.com/tarantool/go-tarantool/v2/pool"
)
var ErrReplicasetNotExists = fmt.Errorf("replicaset not exists")
type TopologyController interface {
AddInstance(ctx context.Context, rsID uuid.UUID, info InstanceInfo) error
RemoveReplicaset(ctx context.Context, rsID uuid.UUID) []error
RemoveInstance(ctx context.Context, rsID, instanceID uuid.UUID) error
AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, instances []InstanceInfo) error
AddReplicasets(ctx context.Context, replicasets map[ReplicasetInfo][]InstanceInfo) error
}
// TopologyController is an entity that allows you to interact with the topology
type controller struct {
r *Router
}
func (r *Router) Topology() TopologyController {
return &controller{r: r}
}
func (c *controller) AddInstance(ctx context.Context, rsID uuid.UUID, info InstanceInfo) error {
err := info.Validate()
if err != nil {
return err
}
instance := pool.Instance{
Name: info.UUID.String(),
Dialer: tarantool.NetDialer{
Address: info.Addr,
User: c.r.cfg.User,
Password: c.r.cfg.Password,
},
}
rs := c.r.idToReplicaset[rsID]
if rs == nil {
return ErrReplicasetNotExists
}
return rs.conn.Add(ctx, instance)
}
func (c *controller) RemoveInstance(ctx context.Context, rsID, instanceID uuid.UUID) error {
rs := c.r.idToReplicaset[rsID]
if rs == nil {
return ErrReplicasetNotExists
}
return rs.conn.Remove(instanceID.String())
}
func (c *controller) AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, instances []InstanceInfo) error {
router := c.r
cfg := router.cfg
replicaset := &Replicaset{
info: ReplicasetInfo{
Name: rsInfo.Name,
UUID: rsInfo.UUID,
},
bucketCount: atomic.Int32{},
}
replicaset.bucketCount.Store(0)
rsInstances := make([]pool.Instance, len(instances))
for i, instance := range instances {
dialer := tarantool.NetDialer{
Address: instance.Addr,
User: cfg.User,
Password: cfg.Password,
}
inst := pool.Instance{
Name: instance.UUID.String(),
Dialer: dialer,
Opts: router.cfg.PoolOpts,
}
rsInstances[i] = inst
}
conn, err := pool.Connect(ctx, rsInstances)
if err != nil {
return err
}
isConnected, err := conn.ConnectedNow(pool.RW)
if err != nil {
return fmt.Errorf("cant check rs pool conntected rw now with error: %s", err)
}
if !isConnected {
return fmt.Errorf("got connected now as false, storage must be configured first")
}
replicaset.conn = conn
router.idToReplicaset[rsInfo.UUID] = replicaset // add when conn is ready
return nil
}
func (c *controller) AddReplicasets(ctx context.Context, replicasets map[ReplicasetInfo][]InstanceInfo) error {
for rsInfo, rsInstances := range replicasets {
err := c.AddReplicaset(ctx, rsInfo, rsInstances)
if err != nil {
return err
}
}
return nil
}
func (c *controller) RemoveReplicaset(ctx context.Context, rsID uuid.UUID) []error {
r := c.r
rs := r.idToReplicaset[rsID]
if rs == nil {
return []error{ErrReplicasetNotExists}
}
errors := rs.conn.CloseGraceful()
delete(r.idToReplicaset, rsID)
return errors
}