-
Notifications
You must be signed in to change notification settings - Fork 19
/
ClusterResolverFactory.java
166 lines (148 loc) · 6.64 KB
/
ClusterResolverFactory.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package com.eventstore.dbclient;
import io.grpc.*;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import org.xbill.DNS.*;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class ClusterResolverFactory extends NameResolver.Factory {
private static final Set<ClusterInfo.MemberState> invalidStates;
private static final Random random = new Random();
static {
invalidStates = new HashSet<ClusterInfo.MemberState>() {{
add(ClusterInfo.MemberState.MANAGER);
add(ClusterInfo.MemberState.SHUTTING_DOWN);
add(ClusterInfo.MemberState.SHUT_DOWN);
add(ClusterInfo.MemberState.UNKNOWN);
add(ClusterInfo.MemberState.INITIALIZING);
add(ClusterInfo.MemberState.RESIGNING_LEADER);
add(ClusterInfo.MemberState.PRE_LEADER);
add(ClusterInfo.MemberState.PRE_REPLICA);
add(ClusterInfo.MemberState.PRE_READ_ONLY_REPLICA);
add(ClusterInfo.MemberState.CLONE);
add(ClusterInfo.MemberState.DISCOVER_LEADER);
}};
}
private List<InetSocketAddress> seedNodes;
private final NodePreference nodePreference;
private final SslContext sslContext;
private final Timeouts timeouts;
public ClusterResolverFactory(List<InetSocketAddress> seedNodes, NodePreference nodePreference, Timeouts timeouts, SslContext sslContext) {
this.seedNodes = seedNodes;
this.nodePreference = nodePreference;
this.sslContext = sslContext;
this.timeouts = timeouts;
}
@Override
public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
return new NameResolver() {
@Override
public String getServiceAuthority() {
return "eventStoreDBGossip";
}
@Override
public void start(Listener2 listener) {
List<InetSocketAddress> candidates;
if (seedNodes != null) {
candidates = new ArrayList<>(seedNodes);
Collections.shuffle(candidates);
} else {
candidates = new ArrayList<>();
try {
org.xbill.DNS.Record[] records = new Lookup(targetUri.toString(), Type.SRV).run();
for (int i = 0; i < records.length; ++i) {
SRVRecord record = (SRVRecord) records[i];
candidates.add(new InetSocketAddress(record.getName().toString(true), record.getPort()));
}
} catch (TextParseException e) {
listener.onError(Status.INTERNAL);
}
}
for (InetSocketAddress seed : candidates) {
try {
ClusterInfo.Endpoint endpoint = attemptDiscovery(seed).get();
if (endpoint == null) {
continue;
}
InetSocketAddress addr = endpoint.toInetSocketAddress();
List<SocketAddress> addrs = new ArrayList<>();
addrs.add(addr);
EquivalentAddressGroup addrGroup = new EquivalentAddressGroup(addrs);
List<EquivalentAddressGroup> addrGroups = new ArrayList<>();
addrGroups.add(addrGroup);
listener.onResult(ResolutionResult.newBuilder()
.setAddresses(addrGroups)
.setAttributes(Attributes.EMPTY)
.build());
return;
} catch (InterruptedException | ExecutionException e) {
listener.onError(Status.INTERNAL);
return;
}
}
}
@Override
public void shutdown() {
}
};
}
@Override
public String getDefaultScheme() {
return "eventstore";
}
private CompletableFuture<ClusterInfo.Endpoint> attemptDiscovery(InetSocketAddress seed) {
ManagedChannel channel = NettyChannelBuilder.forAddress(seed)
.userAgent("Event Store Client (Java) v1.0.0-SNAPSHOT")
.sslContext(this.sslContext)
.build();
GossipClient client = new GossipClient(channel, timeouts);
return client.read()
.thenApply(this::determineBestFitNode)
.thenApply(m -> m.map(ClusterInfo.Member::getHttpEndpoint).orElse(null));
}
private Optional<ClusterInfo.Member> determineBestFitNode(ClusterInfo clusterInfo) {
return clusterInfo.getMembers()
.stream()
.filter(ClusterInfo.Member::isAlive)
.filter(m -> !invalidStates.contains(m.getState()))
.sorted((o1, o2) -> {
switch (nodePreference) {
case LEADER:
if (o1.getState().equals(ClusterInfo.MemberState.LEADER)) {
return 1;
}
if (o2.getState().equals(ClusterInfo.MemberState.LEADER)) {
return -1;
}
return 0;
case FOLLOWER:
if (o1.getState().equals(ClusterInfo.MemberState.FOLLOWER)) {
return 1;
}
if (o2.getState().equals(ClusterInfo.MemberState.FOLLOWER)) {
return -1;
}
return 0;
case READ_ONLY_REPLICA:
if (o1.getState().equals(ClusterInfo.MemberState.READ_ONLY_REPLICA)) {
return 1;
}
if (o2.getState().equals(ClusterInfo.MemberState.READ_ONLY_REPLICA)) {
return -1;
}
return 0;
case RANDOM:
if (random.nextBoolean()) {
return 1;
}
return 1;
}
return 0;
})
.findAny();
}
}