From 1d579e89ff1ab68dfe0d1d67a06691977bf91e76 Mon Sep 17 00:00:00 2001 From: YoEight Date: Wed, 26 Aug 2020 18:07:52 +0200 Subject: [PATCH] Implement cluster connection. --- build.gradle | 2 +- db-client-java/build.gradle | 3 +- .../com/eventstore/dbclient/ClusterInfo.java | 135 ++++++++++++++ .../dbclient/ClusterResolverFactory.java | 166 ++++++++++++++++++ .../com/eventstore/dbclient/Endpoint.java | 19 ++ .../dbclient/EventStoreConnection.java | 89 ++++++++++ .../dbclient/EventStoreConnectionBuilder.java | 68 +++++++ .../com/eventstore/dbclient/GossipClient.java | 96 ++++++++++ .../eventstore/dbclient/NodePreference.java | 6 + .../dbclient/NotLeaderException.java | 13 ++ .../eventstore/dbclient/StreamsClient.java | 60 ++++++- db-client-java/src/main/proto/gossip.proto | 44 +++++ .../module/EventStoreTestDBContainer.java | 12 +- 13 files changed, 701 insertions(+), 12 deletions(-) create mode 100644 db-client-java/src/main/java/com/eventstore/dbclient/ClusterInfo.java create mode 100644 db-client-java/src/main/java/com/eventstore/dbclient/ClusterResolverFactory.java create mode 100644 db-client-java/src/main/java/com/eventstore/dbclient/Endpoint.java create mode 100644 db-client-java/src/main/java/com/eventstore/dbclient/EventStoreConnection.java create mode 100644 db-client-java/src/main/java/com/eventstore/dbclient/EventStoreConnectionBuilder.java create mode 100644 db-client-java/src/main/java/com/eventstore/dbclient/GossipClient.java create mode 100644 db-client-java/src/main/java/com/eventstore/dbclient/NodePreference.java create mode 100644 db-client-java/src/main/java/com/eventstore/dbclient/NotLeaderException.java create mode 100644 db-client-java/src/main/proto/gossip.proto diff --git a/build.gradle b/build.gradle index d98a380a..d618459c 100644 --- a/build.gradle +++ b/build.gradle @@ -22,7 +22,7 @@ allprojects { } } -apply from: file('gradle/testReports.gradle') +//apply from: file('gradle/testReports.gradle') if (!JavaVersion.current().isJava8Compatible()) { throw new Exception("Java 8 is required to build EventStoreDB-Client-Java") diff --git a/db-client-java/build.gradle b/db-client-java/build.gradle index 72aee971..d4190214 100644 --- a/db-client-java/build.gradle +++ b/db-client-java/build.gradle @@ -1,6 +1,6 @@ apply plugin: 'java-library' apply from: rootProject.file('gradle/proto.gradle') -apply from: rootProject.file('gradle/spotbugs.gradle') +//apply from: rootProject.file('gradle/spotbugs.gradle') apply from: rootProject.file('gradle/updateProtos.gradle') apply from: rootProject.file('gradle/publishing.gradle') @@ -14,6 +14,7 @@ dependencies { implementation 'io.grpc:grpc-netty-shaded' implementation 'io.grpc:grpc-stub' implementation 'io.grpc:grpc-protobuf' + implementation 'dnsjava:dnsjava:3.2.2' testImplementation "junit:junit:${junitVersion}" testImplementation "org.slf4j:slf4j-nop:${slf4jNopVersion}" diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/ClusterInfo.java b/db-client-java/src/main/java/com/eventstore/dbclient/ClusterInfo.java new file mode 100644 index 00000000..1d144dfa --- /dev/null +++ b/db-client-java/src/main/java/com/eventstore/dbclient/ClusterInfo.java @@ -0,0 +1,135 @@ +package com.eventstore.dbclient; + +import com.eventstore.dbclient.proto.gossip.GossipOuterClass; +import com.eventstore.dbclient.proto.shared.Shared; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +public class ClusterInfo { + private final List members; + + public ClusterInfo(List members) { + this.members = members; + } + + public List getMembers() { + return members; + } + + static ClusterInfo fromWire(GossipOuterClass.ClusterInfo wire) { + List members = new ArrayList<>(); + for (GossipOuterClass.MemberInfo member : wire.getMembersList()) { + UUID instanceId; + if (member.getInstanceId().hasStructured()) { + Shared.UUID.Structured structured = member.getInstanceId().getStructured(); + instanceId = new UUID(structured.getMostSignificantBits(), structured.getLeastSignificantBits()); + } else { + instanceId = UUID.fromString(member.getInstanceId().getString()); + } + boolean isAlive = member.getIsAlive(); + MemberState state = MemberState.fromWire(member.getState()); + Endpoint httpEndpoint = new Endpoint(member.getHttpEndPoint().getAddress(), member.getHttpEndPoint().getPort()); + + members.add(new Member(instanceId, isAlive, state, httpEndpoint)); + } + + return new ClusterInfo(members); + } + + public enum MemberState { + INITIALIZING, DISCOVER_LEADER, UNKNOWN, PRE_REPLICA, CATCHING_UP, CLONE, + FOLLOWER, PRE_LEADER, LEADER, MANAGER, SHUTTING_DOWN, SHUT_DOWN, READ_ONLY_LEADERLESS, + PRE_READ_ONLY_REPLICA, READ_ONLY_REPLICA, RESIGNING_LEADER; + + static MemberState fromWire(GossipOuterClass.MemberInfo.VNodeState state) { + switch (state) { + case Initializing: + return INITIALIZING; + case DiscoverLeader: + return DISCOVER_LEADER; + case PreReplica: + return PRE_REPLICA; + case CatchingUp: + return CATCHING_UP; + case Clone: + return CLONE; + case Follower: + return FOLLOWER; + case PreLeader: + return PRE_LEADER; + case Leader: + return LEADER; + case Manager: + return MANAGER; + case ShuttingDown: + return SHUTTING_DOWN; + case Shutdown: + return SHUT_DOWN; + case ReadOnlyLeaderless: + return READ_ONLY_LEADERLESS; + case PreReadOnlyReplica: + return PRE_READ_ONLY_REPLICA; + case ReadOnlyReplica: + return READ_ONLY_REPLICA; + case ResigningLeader: + return RESIGNING_LEADER; + } + return UNKNOWN; + } + } + + public static class Endpoint { + private final String address; + private final int port; + + Endpoint(String address, int port) { + this.address = address; + this.port = port; + } + + InetSocketAddress toInetSocketAddress() { + return new InetSocketAddress(this.address, this.port); + } + + public String getAddress() { + return address; + } + + public int getPort() { + return port; + } + } + + public static class Member { + private final UUID instanceId; + private final boolean isAlive; + private final MemberState state; + private final Endpoint httpEndpoint; + + Member(UUID instanceId, boolean isAlive, MemberState state, Endpoint httpEndpoint) { + this.instanceId = instanceId; + this.isAlive = isAlive; + this.state = state; + this.httpEndpoint = httpEndpoint; + } + + public UUID getInstanceId() { + return instanceId; + } + + public boolean isAlive() { + return isAlive; + } + + public MemberState getState() { + return state; + } + + public Endpoint getHttpEndpoint() { + return httpEndpoint; + } + } +} \ No newline at end of file diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/ClusterResolverFactory.java b/db-client-java/src/main/java/com/eventstore/dbclient/ClusterResolverFactory.java new file mode 100644 index 00000000..7826a44a --- /dev/null +++ b/db-client-java/src/main/java/com/eventstore/dbclient/ClusterResolverFactory.java @@ -0,0 +1,166 @@ +package com.eventstore.dbclient; + +import io.grpc.*; +import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; +import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; +import org.xbill.DNS.*; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.URI; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +public class ClusterResolverFactory extends NameResolver.Factory { + private static final Set invalidStates; + private static final Random random = new Random(); + + static { + invalidStates = new HashSet() {{ + add(ClusterInfo.MemberState.MANAGER); + add(ClusterInfo.MemberState.SHUTTING_DOWN); + add(ClusterInfo.MemberState.SHUT_DOWN); + add(ClusterInfo.MemberState.UNKNOWN); + add(ClusterInfo.MemberState.INITIALIZING); + add(ClusterInfo.MemberState.RESIGNING_LEADER); + add(ClusterInfo.MemberState.PRE_LEADER); + add(ClusterInfo.MemberState.PRE_REPLICA); + add(ClusterInfo.MemberState.PRE_READ_ONLY_REPLICA); + add(ClusterInfo.MemberState.CLONE); + add(ClusterInfo.MemberState.DISCOVER_LEADER); + }}; + } + + private List seedNodes; + private final NodePreference nodePreference; + private final SslContext sslContext; + private final Timeouts timeouts; + + public ClusterResolverFactory(List seedNodes, NodePreference nodePreference, Timeouts timeouts, SslContext sslContext) { + this.seedNodes = seedNodes; + this.nodePreference = nodePreference; + this.sslContext = sslContext; + this.timeouts = timeouts; + } + + @Override + public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) { + return new NameResolver() { + @Override + public String getServiceAuthority() { + return "eventStoreDBGossip"; + } + + @Override + public void start(Listener2 listener) { + List candidates; + + if (seedNodes != null) { + candidates = new ArrayList<>(seedNodes); + Collections.shuffle(candidates); + } else { + candidates = new ArrayList<>(); + try { + org.xbill.DNS.Record[] records = new Lookup(targetUri.toString(), Type.SRV).run(); + for (int i = 0; i < records.length; ++i) { + SRVRecord record = (SRVRecord) records[i]; + + candidates.add(new InetSocketAddress(record.getName().toString(true), record.getPort())); + } + } catch (TextParseException e) { + listener.onError(Status.INTERNAL); + } + } + + for (InetSocketAddress seed : candidates) { + try { + ClusterInfo.Endpoint endpoint = attemptDiscovery(seed).get(); + if (endpoint == null) { + continue; + } + + InetSocketAddress addr = endpoint.toInetSocketAddress(); + List addrs = new ArrayList<>(); + addrs.add(addr); + EquivalentAddressGroup addrGroup = new EquivalentAddressGroup(addrs); + List addrGroups = new ArrayList<>(); + addrGroups.add(addrGroup); + + listener.onResult(ResolutionResult.newBuilder() + .setAddresses(addrGroups) + .setAttributes(Attributes.EMPTY) + .build()); + return; + } catch (InterruptedException | ExecutionException e) { + listener.onError(Status.INTERNAL); + return; + } + } + } + + @Override + public void shutdown() { + } + }; + } + + @Override + public String getDefaultScheme() { + return "eventstore"; + } + + private CompletableFuture attemptDiscovery(InetSocketAddress seed) { + ManagedChannel channel = NettyChannelBuilder.forAddress(seed) + .userAgent("Event Store Client (Java) v1.0.0-SNAPSHOT") + .sslContext(this.sslContext) + .build(); + GossipClient client = new GossipClient(channel, timeouts); + return client.read() + .thenApply(this::determineBestFitNode) + .thenApply(m -> m.map(ClusterInfo.Member::getHttpEndpoint).orElse(null)); + } + + private Optional determineBestFitNode(ClusterInfo clusterInfo) { + return clusterInfo.getMembers() + .stream() + .filter(ClusterInfo.Member::isAlive) + .filter(m -> !invalidStates.contains(m.getState())) + .sorted((o1, o2) -> { + switch (nodePreference) { + case LEADER: + if (o1.getState().equals(ClusterInfo.MemberState.LEADER)) { + return 1; + } + if (o2.getState().equals(ClusterInfo.MemberState.LEADER)) { + return -1; + } + return 0; + case FOLLOWER: + if (o1.getState().equals(ClusterInfo.MemberState.FOLLOWER)) { + return 1; + } + if (o2.getState().equals(ClusterInfo.MemberState.FOLLOWER)) { + return -1; + } + return 0; + case READ_ONLY_REPLICA: + if (o1.getState().equals(ClusterInfo.MemberState.READ_ONLY_REPLICA)) { + return 1; + } + if (o2.getState().equals(ClusterInfo.MemberState.READ_ONLY_REPLICA)) { + return -1; + } + return 0; + case RANDOM: + if (random.nextBoolean()) { + return 1; + } + + return 1; + } + return 0; + }) + .findAny(); + } +} \ No newline at end of file diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/Endpoint.java b/db-client-java/src/main/java/com/eventstore/dbclient/Endpoint.java new file mode 100644 index 00000000..a1a27957 --- /dev/null +++ b/db-client-java/src/main/java/com/eventstore/dbclient/Endpoint.java @@ -0,0 +1,19 @@ +package com.eventstore.dbclient; + +public class Endpoint { + private String hostname; + private int port; + + public Endpoint(String hostname, int port) { + this.hostname = hostname; + this.port = port; + } + + public String getHostname() { + return hostname; + } + + public int getPort() { + return port; + } +} diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/EventStoreConnection.java b/db-client-java/src/main/java/com/eventstore/dbclient/EventStoreConnection.java new file mode 100644 index 00000000..21900cc5 --- /dev/null +++ b/db-client-java/src/main/java/com/eventstore/dbclient/EventStoreConnection.java @@ -0,0 +1,89 @@ +package com.eventstore.dbclient; + +import io.grpc.ManagedChannel; +import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; +import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; +import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; +import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory; + +import javax.net.ssl.SSLException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +public class EventStoreConnection { + private Endpoint endpoint = null; + private Endpoint[] gossipSeeds = null; + private String domain = null; + private SslContext sslContext = null; + private Timeouts timeouts = null; + private UserCredentials userCredentials = null; + private NodePreference nodePreference; + private boolean requiresLeader; + + public EventStoreConnection(Endpoint endpoint, Endpoint[] gossipSeeds, String domain, SslContext sslContext, UserCredentials userCredentials, NodePreference nodePreference, boolean requiresLeader, Timeouts timeouts) { + this.endpoint = endpoint; + this.gossipSeeds = gossipSeeds; + this.domain = domain; + this.sslContext = sslContext; + this.userCredentials = userCredentials; + this.timeouts = timeouts; + this.nodePreference = nodePreference; + this.requiresLeader = requiresLeader; + + if (sslContext == null) { + try { + this.sslContext = GrpcSslContexts. + forClient(). + trustManager(InsecureTrustManagerFactory.INSTANCE). + build(); + } catch (SSLException e) { + throw new RuntimeException(e); + } + } + } + + public static EventStoreConnectionBuilder builder() { + return new EventStoreConnectionBuilder(); + } + + public StreamsClient newStreamsClient() { + return new StreamsClient(createManagedChannel(), userCredentials, requiresLeader, timeouts); + } + + private ManagedChannel createManagedChannel() { + ManagedChannel channel = null; + + if (domain != null) { + channel = NettyChannelBuilder + .forTarget(domain) + .nameResolverFactory(new ClusterResolverFactory(null, nodePreference, timeouts, sslContext)) + .userAgent("Event Store Client (Java)") + .sslContext(sslContext) + .build(); + } else if (gossipSeeds != null) { + List addresses = new ArrayList<>(); + + for (int i = 0; i < gossipSeeds.length; ++i) { + Endpoint seed = gossipSeeds[i]; + InetSocketAddress address = new InetSocketAddress(seed.getHostname(), seed.getPort()); + + addresses.add(address); + } + + channel = NettyChannelBuilder.forTarget("eventstore") + .nameResolverFactory(new ClusterResolverFactory(addresses, nodePreference, timeouts, sslContext)) + .userAgent("Event Store Client (Java)") + .sslContext(sslContext) + .build(); + } else { + channel = NettyChannelBuilder + .forAddress(endpoint.getHostname(), endpoint.getPort()) + .userAgent("Event Store Client (Java)") + .sslContext(sslContext) + .build(); + } + + return channel; + } +} diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/EventStoreConnectionBuilder.java b/db-client-java/src/main/java/com/eventstore/dbclient/EventStoreConnectionBuilder.java new file mode 100644 index 00000000..41867130 --- /dev/null +++ b/db-client-java/src/main/java/com/eventstore/dbclient/EventStoreConnectionBuilder.java @@ -0,0 +1,68 @@ +package com.eventstore.dbclient; + +import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; + +public class EventStoreConnectionBuilder { + private UserCredentials _defaultUserCredentials = null; + private Timeouts _timeouts; + private SslContext _sslContext = null; + private Endpoint endpoint = null; + private boolean requiresLeader = false; + + public EventStoreConnectionBuilder() { + _timeouts = Timeouts.DEFAULT; + } + + public EventStoreConnectionBuilder defaultUserCredentials(UserCredentials userCredentials) { + _defaultUserCredentials = userCredentials; + return this; + } + + public EventStoreConnectionBuilder connectionTimeouts(Timeouts timeouts) { + _timeouts = timeouts; + return this; + } + + public EventStoreConnectionBuilder insecure() { + _sslContext = null; + return this; + } + + public EventStoreConnectionBuilder sslContext(SslContext context) { + _sslContext = context; + return this; + } + + public EventStoreConnectionBuilder requiresLeader() { + return setRequiresLeader(true); + } + + public EventStoreConnectionBuilder setRequiresLeader(boolean value) { + this.requiresLeader = value; + return this; + } + + public EventStoreConnection createSingleNodeConnection(String hostname, int port) { + return createSingleNodeConnection(new Endpoint(hostname, port)); + } + + public EventStoreConnection createSingleNodeConnection(Endpoint endpoint) { + return new EventStoreConnection(endpoint, null, null, _sslContext, _defaultUserCredentials, NodePreference.RANDOM, requiresLeader, _timeouts); + } + + public EventStoreConnection createClusterConnectionUsingSeeds(Endpoint[] endpoints) { + return createClusterConnectionUsingSeeds(endpoints, NodePreference.RANDOM); + } + + public EventStoreConnection createClusterConnectionUsingSeeds(Endpoint[] endpoints, NodePreference nodePreference) { + return new EventStoreConnection(null, endpoints, null, _sslContext, _defaultUserCredentials, nodePreference, requiresLeader, _timeouts); + } + + public EventStoreConnection createClusterConnectionUsingDns(String domain) { + return createClusterConnectionUsingDns(domain, NodePreference.RANDOM); + } + + public EventStoreConnection createClusterConnectionUsingDns(String domain, NodePreference nodePreference) { + return new EventStoreConnection(null, null, domain, _sslContext, _defaultUserCredentials, nodePreference, requiresLeader, _timeouts); + } +} diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/GossipClient.java b/db-client-java/src/main/java/com/eventstore/dbclient/GossipClient.java new file mode 100644 index 00000000..d02fcb87 --- /dev/null +++ b/db-client-java/src/main/java/com/eventstore/dbclient/GossipClient.java @@ -0,0 +1,96 @@ +package com.eventstore.dbclient; + +import com.eventstore.dbclient.proto.gossip.GossipGrpc; +import com.eventstore.dbclient.proto.gossip.GossipOuterClass; +import com.eventstore.dbclient.proto.shared.Shared; +import io.grpc.ManagedChannel; +import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; +import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; +import io.grpc.stub.ClientCallStreamObserver; +import io.grpc.stub.ClientResponseObserver; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +public class GossipClient { + private final ManagedChannel _channel; + private final GossipGrpc.GossipStub _stub; + private final Timeouts _timeouts; + + public GossipClient(String host, int port, Timeouts timeouts, SslContext sslContext) { + this(NettyChannelBuilder.forAddress(host, port) + .userAgent("Event Store Client (Java) v1.0.0-SNAPSHOT") + .sslContext(sslContext) + .build(), timeouts); + } + + public GossipClient(ManagedChannel channel, Timeouts timeouts) { + _channel = channel; + _timeouts = timeouts; + _stub = GossipGrpc.newStub(_channel); + } + + public void shutdown() throws InterruptedException { + _channel.shutdown().awaitTermination(_timeouts.shutdownTimeout, _timeouts.shutdownTimeoutUnit); + } + + public CompletableFuture read() { + CompletableFuture result = new CompletableFuture<>(); + + _stub.read(Shared.Empty.getDefaultInstance(), convertSingleResponse(result, resp -> { + List members = new ArrayList<>(); + + for (GossipOuterClass.MemberInfo info: resp.getMembersList()) { + UUID instanceId = null; + + if (info.hasInstanceId()) { + if (info.getInstanceId().hasStructured()) { + instanceId = new UUID(info.getInstanceId().getStructured().getMostSignificantBits(), info.getInstanceId().getStructured().getLeastSignificantBits()); + } else { + instanceId = UUID.fromString(info.getInstanceId().getString()); + } + } + + ClusterInfo.MemberState state = ClusterInfo.MemberState.fromWire(info.getState()); + ClusterInfo.Endpoint endpoint = new ClusterInfo.Endpoint(info.getHttpEndPoint().getAddress(), info.getHttpEndPoint().getPort()); + + ClusterInfo.Member member = new ClusterInfo.Member(instanceId, info.getIsAlive(), state, endpoint); + members.add(member); + } + + return new ClusterInfo(members); + })); + return result; + } + + private ClientResponseObserver convertSingleResponse( + CompletableFuture dest, Function converter) { + return new ClientResponseObserver() { + @Override + public void beforeStart(ClientCallStreamObserver requestStream) { + } + + @Override + public void onNext(RespT value) { + try { + TargetT converted = converter.apply(value); + dest.complete(converted); + } catch (Throwable e) { + dest.completeExceptionally(e); + } + } + + @Override + public void onError(Throwable t) { + dest.completeExceptionally(t); + } + + @Override + public void onCompleted() { + } + }; + } +} diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/NodePreference.java b/db-client-java/src/main/java/com/eventstore/dbclient/NodePreference.java new file mode 100644 index 00000000..ce162f50 --- /dev/null +++ b/db-client-java/src/main/java/com/eventstore/dbclient/NodePreference.java @@ -0,0 +1,6 @@ + +package com.eventstore.dbclient; + +public enum NodePreference { + LEADER, FOLLOWER, READ_ONLY_REPLICA, RANDOM +} \ No newline at end of file diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/NotLeaderException.java b/db-client-java/src/main/java/com/eventstore/dbclient/NotLeaderException.java new file mode 100644 index 00000000..338a67f7 --- /dev/null +++ b/db-client-java/src/main/java/com/eventstore/dbclient/NotLeaderException.java @@ -0,0 +1,13 @@ +package com.eventstore.dbclient; + +public class NotLeaderException extends Exception { + private Endpoint leaderEndpoint; + + public NotLeaderException(String host, int port) { + leaderEndpoint = new Endpoint(host, port); + } + + public Endpoint getLeaderEndpoint() { + return leaderEndpoint; + } +} diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/StreamsClient.java b/db-client-java/src/main/java/com/eventstore/dbclient/StreamsClient.java index e2dad6c1..64c24fc5 100644 --- a/db-client-java/src/main/java/com/eventstore/dbclient/StreamsClient.java +++ b/db-client-java/src/main/java/com/eventstore/dbclient/StreamsClient.java @@ -44,18 +44,26 @@ public StreamsClient(String host, int port, UserCredentials defaultCredentials, this(NettyChannelBuilder.forAddress(host, port) .userAgent("Event Store Client (Java) v1.0.0-SNAPSHOT") .sslContext(sslContext) - .build(), defaultCredentials, timeouts); + .build(), defaultCredentials, false, timeouts); } public StreamsClient( @NotNull ManagedChannel channel, - @NotNull UserCredentials credentials, + UserCredentials credentials, + boolean requiresLeader, @NotNull Timeouts timeouts) { _channel = channel; _timeouts = timeouts; Metadata headers = new Metadata(); - headers.put(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER), credentials.basicAuthHeader()); + + if (credentials != null) { + headers.put(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER), credentials.basicAuthHeader()); + } + + if (requiresLeader) { + headers.put(Metadata.Key.of("requires-leader", Metadata.ASCII_STRING_MARSHALLER), String.valueOf(requiresLeader)); + } _stub = MetadataUtils.attachHeaders(StreamsGrpc.newStub(_channel), headers); } @@ -246,6 +254,18 @@ public void onNext(RespT value) { @Override public void onError(Throwable t) { + if (t instanceof StatusRuntimeException) { + StatusRuntimeException e = (StatusRuntimeException) t; + String leaderHost = e.getTrailers().get(Metadata.Key.of("leader-endpoint-host", Metadata.ASCII_STRING_MARSHALLER)); + String leaderPort = e.getTrailers().get(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER)); + + if (leaderHost != null && leaderPort != null) { + NotLeaderException reason = new NotLeaderException(leaderHost, Integer.valueOf(leaderPort)); + dest.completeExceptionally(reason); + return; + } + } + dest.completeExceptionally(t); } @@ -290,6 +310,18 @@ public void onError(Throwable t) { return; } + if (t instanceof StatusRuntimeException) { + StatusRuntimeException e = (StatusRuntimeException) t; + String leaderHost = e.getTrailers().get(Metadata.Key.of("leader-endpoint-host", Metadata.ASCII_STRING_MARSHALLER)); + String leaderPort = e.getTrailers().get(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER)); + + if (leaderHost != null && leaderPort != null) { + NotLeaderException reason = new NotLeaderException(leaderHost, Integer.valueOf(leaderPort)); + future.completeExceptionally(reason); + return; + } + } + future.completeExceptionally(t); } }); @@ -364,6 +396,16 @@ private CompletableFuture appendInternal( .build()); } requestStream.onCompleted(); + } catch (StatusRuntimeException e) { + String leaderHost = e.getTrailers().get(Metadata.Key.of("leader-endpoint-host", Metadata.ASCII_STRING_MARSHALLER)); + String leaderPort = e.getTrailers().get(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER)); + + if (leaderHost != null && leaderPort != null) { + NotLeaderException reason = new NotLeaderException(leaderHost, Integer.valueOf(leaderPort)); + result.completeExceptionally(reason); + } else { + result.completeExceptionally(e); + } } catch (RuntimeException e) { result.completeExceptionally(e); } @@ -427,11 +469,19 @@ public void onNext(@NotNull StreamsOuterClass.ReadResp readResp) { @Override public void onError(Throwable throwable) { if (throwable instanceof StatusRuntimeException) { - Status s = ((StatusRuntimeException) throwable).getStatus(); - if (s.getCode() == Status.Code.CANCELLED) { + StatusRuntimeException e = (StatusRuntimeException) throwable; + if (e.getStatus().getCode() == Status.Code.CANCELLED) { listener.onCancelled(this._subscription); return; } + + String leaderHost = e.getTrailers().get(Metadata.Key.of("leader-endpoint-host", Metadata.ASCII_STRING_MARSHALLER)); + String leaderPort = e.getTrailers().get(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER)); + + if (leaderHost != null && leaderPort != null) { + NotLeaderException reason = new NotLeaderException(leaderHost, Integer.valueOf(leaderPort)); + listener.onError(this._subscription, reason); + } } listener.onError(this._subscription, throwable); diff --git a/db-client-java/src/main/proto/gossip.proto b/db-client-java/src/main/proto/gossip.proto new file mode 100644 index 00000000..a6214a99 --- /dev/null +++ b/db-client-java/src/main/proto/gossip.proto @@ -0,0 +1,44 @@ +syntax = "proto3"; +package event_store.client.gossip; +option java_package = "com.eventstore.dbclient.proto.gossip"; + +import "shared.proto"; + +service Gossip { + rpc Read (event_store.client.shared.Empty) returns (ClusterInfo); +} + +message ClusterInfo { + repeated MemberInfo members = 1; +} + +message EndPoint { + string address = 1; + uint32 port = 2; +} + +message MemberInfo { + enum VNodeState { + Initializing = 0; + DiscoverLeader = 1; + Unknown = 2; + PreReplica = 3; + CatchingUp = 4; + Clone = 5; + Follower = 6; + PreLeader = 7; + Leader = 8; + Manager = 9; + ShuttingDown = 10; + Shutdown = 11; + ReadOnlyLeaderless = 12; + PreReadOnlyReplica = 13; + ReadOnlyReplica = 14; + ResigningLeader = 15; + } + event_store.client.shared.UUID instance_id = 1; + int64 time_stamp = 2; + VNodeState state = 3; + bool is_alive = 4; + EndPoint http_end_point = 5; +} \ No newline at end of file diff --git a/db-client-java/src/test/java/testcontainers/module/EventStoreTestDBContainer.java b/db-client-java/src/test/java/testcontainers/module/EventStoreTestDBContainer.java index 830def70..1e11e010 100644 --- a/db-client-java/src/test/java/testcontainers/module/EventStoreTestDBContainer.java +++ b/db-client-java/src/test/java/testcontainers/module/EventStoreTestDBContainer.java @@ -1,7 +1,7 @@ package testcontainers.module; +import com.eventstore.dbclient.EventStoreConnection; import com.eventstore.dbclient.StreamsClient; -import com.eventstore.dbclient.Timeouts; import com.eventstore.dbclient.UserCredentials; import com.github.dockerjava.api.model.HealthCheck; import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; @@ -56,11 +56,13 @@ public EventStoreTestDBContainer(String image, boolean emptyDatabase) { public StreamsClient getStreamsClient() { final String address = getContainerIpAddress(); final int port = getMappedPort(DB_HTTP_PORT); - final SslContext sslContext = getClientSslContext(); - final UserCredentials creds = new UserCredentials("admin", "changeit"); - final Timeouts timeouts = Timeouts.DEFAULT; - return new StreamsClient(address, port, creds, timeouts, sslContext); + return EventStoreConnection + .builder() + .insecure() + .defaultUserCredentials(new UserCredentials("admin", "changeit")) + .createSingleNodeConnection(address, port) + .newStreamsClient(); } private SslContext getClientSslContext() {