Skip to content

Commit

Permalink
Implement cluster connection.
Browse files Browse the repository at this point in the history
  • Loading branch information
YoEight committed Aug 28, 2020
1 parent 6bace47 commit 1d579e8
Show file tree
Hide file tree
Showing 13 changed files with 701 additions and 12 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Expand Up @@ -22,7 +22,7 @@ allprojects {
}
}

apply from: file('gradle/testReports.gradle')
//apply from: file('gradle/testReports.gradle')

if (!JavaVersion.current().isJava8Compatible()) {
throw new Exception("Java 8 is required to build EventStoreDB-Client-Java")
Expand Down
3 changes: 2 additions & 1 deletion db-client-java/build.gradle
@@ -1,6 +1,6 @@
apply plugin: 'java-library'
apply from: rootProject.file('gradle/proto.gradle')
apply from: rootProject.file('gradle/spotbugs.gradle')
//apply from: rootProject.file('gradle/spotbugs.gradle')
apply from: rootProject.file('gradle/updateProtos.gradle')
apply from: rootProject.file('gradle/publishing.gradle')

Expand All @@ -14,6 +14,7 @@ dependencies {
implementation 'io.grpc:grpc-netty-shaded'
implementation 'io.grpc:grpc-stub'
implementation 'io.grpc:grpc-protobuf'
implementation 'dnsjava:dnsjava:3.2.2'

testImplementation "junit:junit:${junitVersion}"
testImplementation "org.slf4j:slf4j-nop:${slf4jNopVersion}"
Expand Down
135 changes: 135 additions & 0 deletions db-client-java/src/main/java/com/eventstore/dbclient/ClusterInfo.java
@@ -0,0 +1,135 @@
package com.eventstore.dbclient;

import com.eventstore.dbclient.proto.gossip.GossipOuterClass;
import com.eventstore.dbclient.proto.shared.Shared;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

public class ClusterInfo {
private final List<Member> members;

public ClusterInfo(List<Member> members) {
this.members = members;
}

public List<Member> getMembers() {
return members;
}

static ClusterInfo fromWire(GossipOuterClass.ClusterInfo wire) {
List<ClusterInfo.Member> members = new ArrayList<>();
for (GossipOuterClass.MemberInfo member : wire.getMembersList()) {
UUID instanceId;
if (member.getInstanceId().hasStructured()) {
Shared.UUID.Structured structured = member.getInstanceId().getStructured();
instanceId = new UUID(structured.getMostSignificantBits(), structured.getLeastSignificantBits());
} else {
instanceId = UUID.fromString(member.getInstanceId().getString());
}
boolean isAlive = member.getIsAlive();
MemberState state = MemberState.fromWire(member.getState());
Endpoint httpEndpoint = new Endpoint(member.getHttpEndPoint().getAddress(), member.getHttpEndPoint().getPort());

members.add(new Member(instanceId, isAlive, state, httpEndpoint));
}

return new ClusterInfo(members);
}

public enum MemberState {
INITIALIZING, DISCOVER_LEADER, UNKNOWN, PRE_REPLICA, CATCHING_UP, CLONE,
FOLLOWER, PRE_LEADER, LEADER, MANAGER, SHUTTING_DOWN, SHUT_DOWN, READ_ONLY_LEADERLESS,
PRE_READ_ONLY_REPLICA, READ_ONLY_REPLICA, RESIGNING_LEADER;

static MemberState fromWire(GossipOuterClass.MemberInfo.VNodeState state) {
switch (state) {
case Initializing:
return INITIALIZING;
case DiscoverLeader:
return DISCOVER_LEADER;
case PreReplica:
return PRE_REPLICA;
case CatchingUp:
return CATCHING_UP;
case Clone:
return CLONE;
case Follower:
return FOLLOWER;
case PreLeader:
return PRE_LEADER;
case Leader:
return LEADER;
case Manager:
return MANAGER;
case ShuttingDown:
return SHUTTING_DOWN;
case Shutdown:
return SHUT_DOWN;
case ReadOnlyLeaderless:
return READ_ONLY_LEADERLESS;
case PreReadOnlyReplica:
return PRE_READ_ONLY_REPLICA;
case ReadOnlyReplica:
return READ_ONLY_REPLICA;
case ResigningLeader:
return RESIGNING_LEADER;
}
return UNKNOWN;
}
}

public static class Endpoint {
private final String address;
private final int port;

Endpoint(String address, int port) {
this.address = address;
this.port = port;
}

InetSocketAddress toInetSocketAddress() {
return new InetSocketAddress(this.address, this.port);
}

public String getAddress() {
return address;
}

public int getPort() {
return port;
}
}

public static class Member {
private final UUID instanceId;
private final boolean isAlive;
private final MemberState state;
private final Endpoint httpEndpoint;

Member(UUID instanceId, boolean isAlive, MemberState state, Endpoint httpEndpoint) {
this.instanceId = instanceId;
this.isAlive = isAlive;
this.state = state;
this.httpEndpoint = httpEndpoint;
}

public UUID getInstanceId() {
return instanceId;
}

public boolean isAlive() {
return isAlive;
}

public MemberState getState() {
return state;
}

public Endpoint getHttpEndpoint() {
return httpEndpoint;
}
}
}
@@ -0,0 +1,166 @@
package com.eventstore.dbclient;

import io.grpc.*;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import org.xbill.DNS.*;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ClusterResolverFactory extends NameResolver.Factory {
private static final Set<ClusterInfo.MemberState> invalidStates;
private static final Random random = new Random();

static {
invalidStates = new HashSet<ClusterInfo.MemberState>() {{
add(ClusterInfo.MemberState.MANAGER);
add(ClusterInfo.MemberState.SHUTTING_DOWN);
add(ClusterInfo.MemberState.SHUT_DOWN);
add(ClusterInfo.MemberState.UNKNOWN);
add(ClusterInfo.MemberState.INITIALIZING);
add(ClusterInfo.MemberState.RESIGNING_LEADER);
add(ClusterInfo.MemberState.PRE_LEADER);
add(ClusterInfo.MemberState.PRE_REPLICA);
add(ClusterInfo.MemberState.PRE_READ_ONLY_REPLICA);
add(ClusterInfo.MemberState.CLONE);
add(ClusterInfo.MemberState.DISCOVER_LEADER);
}};
}

private List<InetSocketAddress> seedNodes;
private final NodePreference nodePreference;
private final SslContext sslContext;
private final Timeouts timeouts;

public ClusterResolverFactory(List<InetSocketAddress> seedNodes, NodePreference nodePreference, Timeouts timeouts, SslContext sslContext) {
this.seedNodes = seedNodes;
this.nodePreference = nodePreference;
this.sslContext = sslContext;
this.timeouts = timeouts;
}

@Override
public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
return new NameResolver() {
@Override
public String getServiceAuthority() {
return "eventStoreDBGossip";
}

@Override
public void start(Listener2 listener) {
List<InetSocketAddress> candidates;

if (seedNodes != null) {
candidates = new ArrayList<>(seedNodes);
Collections.shuffle(candidates);
} else {
candidates = new ArrayList<>();
try {
org.xbill.DNS.Record[] records = new Lookup(targetUri.toString(), Type.SRV).run();
for (int i = 0; i < records.length; ++i) {
SRVRecord record = (SRVRecord) records[i];

candidates.add(new InetSocketAddress(record.getName().toString(true), record.getPort()));
}
} catch (TextParseException e) {
listener.onError(Status.INTERNAL);
}
}

for (InetSocketAddress seed : candidates) {
try {
ClusterInfo.Endpoint endpoint = attemptDiscovery(seed).get();
if (endpoint == null) {
continue;
}

InetSocketAddress addr = endpoint.toInetSocketAddress();
List<SocketAddress> addrs = new ArrayList<>();
addrs.add(addr);
EquivalentAddressGroup addrGroup = new EquivalentAddressGroup(addrs);
List<EquivalentAddressGroup> addrGroups = new ArrayList<>();
addrGroups.add(addrGroup);

listener.onResult(ResolutionResult.newBuilder()
.setAddresses(addrGroups)
.setAttributes(Attributes.EMPTY)
.build());
return;
} catch (InterruptedException | ExecutionException e) {
listener.onError(Status.INTERNAL);
return;
}
}
}

@Override
public void shutdown() {
}
};
}

@Override
public String getDefaultScheme() {
return "eventstore";
}

private CompletableFuture<ClusterInfo.Endpoint> attemptDiscovery(InetSocketAddress seed) {
ManagedChannel channel = NettyChannelBuilder.forAddress(seed)
.userAgent("Event Store Client (Java) v1.0.0-SNAPSHOT")
.sslContext(this.sslContext)
.build();
GossipClient client = new GossipClient(channel, timeouts);
return client.read()
.thenApply(this::determineBestFitNode)
.thenApply(m -> m.map(ClusterInfo.Member::getHttpEndpoint).orElse(null));
}

private Optional<ClusterInfo.Member> determineBestFitNode(ClusterInfo clusterInfo) {
return clusterInfo.getMembers()
.stream()
.filter(ClusterInfo.Member::isAlive)
.filter(m -> !invalidStates.contains(m.getState()))
.sorted((o1, o2) -> {
switch (nodePreference) {
case LEADER:
if (o1.getState().equals(ClusterInfo.MemberState.LEADER)) {
return 1;
}
if (o2.getState().equals(ClusterInfo.MemberState.LEADER)) {
return -1;
}
return 0;
case FOLLOWER:
if (o1.getState().equals(ClusterInfo.MemberState.FOLLOWER)) {
return 1;
}
if (o2.getState().equals(ClusterInfo.MemberState.FOLLOWER)) {
return -1;
}
return 0;
case READ_ONLY_REPLICA:
if (o1.getState().equals(ClusterInfo.MemberState.READ_ONLY_REPLICA)) {
return 1;
}
if (o2.getState().equals(ClusterInfo.MemberState.READ_ONLY_REPLICA)) {
return -1;
}
return 0;
case RANDOM:
if (random.nextBoolean()) {
return 1;
}

return 1;
}
return 0;
})
.findAny();
}
}
19 changes: 19 additions & 0 deletions db-client-java/src/main/java/com/eventstore/dbclient/Endpoint.java
@@ -0,0 +1,19 @@
package com.eventstore.dbclient;

public class Endpoint {
private String hostname;
private int port;

public Endpoint(String hostname, int port) {
this.hostname = hostname;
this.port = port;
}

public String getHostname() {
return hostname;
}

public int getPort() {
return port;
}
}

0 comments on commit 1d579e8

Please sign in to comment.