Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update grpc-core, grpc-interop-testing, ... to 1.29.0 #933

Merged
merged 10 commits into from May 11, 2020
2 changes: 1 addition & 1 deletion benchmark-java/build.sbt
Expand Up @@ -9,7 +9,7 @@ javaOptions in run ++= List("-Xms1g", "-Xmx1g", "-XX:+PrintGCDetails", "-XX:+Pr
// generate both client and server (default) in Java
akkaGrpcGeneratedLanguages := Seq(AkkaGrpc.Java)

val grpcVersion = "1.28.1" // checked synced by GrpcVersionSyncCheckPlugin
val grpcVersion = "1.29.0" // checked synced by GrpcVersionSyncCheckPlugin

val root = project.in(file("."))
.dependsOn(
Expand Down
19 changes: 10 additions & 9 deletions docs/src/main/paradox/client/details.md
Expand Up @@ -6,17 +6,18 @@ Instances of the generated client may be long-lived and can be used concurrently
You can keep the client running until your system terminates, or close it earlier. To
avoid leaking in the latter case, you should call `.close()` on the client.

When the connection breaks, the client will start failing requests and try reconnecting
to the server automatically. If a connection can not be established after the configured number of attempts then
the client will try to use the `ServiceDiscovery` implementation to connect to a different instance. This mechanism separates the physical connection from the logical one and gives an extra layer of flexibility to support both client-side and server-side balancing. The default number of attempts to reconnect to the same host and port is infinite and configurable via `GrpcClientSettings`'s `connectionAttempts`. The number of times a client will reuse the `ServiceDiscovery` instance to locate a new remote instance is infinite.
When the connection breaks, the client will try reconnecting to the server automatically. On each reconnection
attempt, If a connection the `ServiceDiscovery` will be used and a new host may be found.

The client offers a method `closed()` that returns a @scala[`Future`]@java[`CompletionStage`]
that will complete once the client is explicitly closed after invoking `close()`.
When using client-side [load balancing](details#load-balancing) the reconnection loop will run indefinitely.

When using a direct client (not load balanced) when the connection breaks you can set up a maximum number
of reconnection attempts. If that limit is reached, the client will shutdown. The default number of attempts to
reconnect is infinite and configurable via `GrpcClientSettings`'s `connectionAttempts`.

If you're using a static name for your server (or a Service Discovery with hard-corded values) then the server will
be re-resolved between connection attempts and infinite is a sensible default value for @apidoc[GrpcClientSettings.connectionAttempts](GrpcClientSettings). However,
if you setup another service discovery mechanism (e.g. a service discovery based on DNS-SRV in Kubernetes) then the reconnection attempts should be set to
a small value (i.e. 5) so the client can recreate the connection to a different server instance when the connection is dropped and can't be restablished.
The client offers a method `closed()` that returns a @scala[`Future`]@java[`CompletionStage`]
that will complete once the client is explicitly closed after invoking `close()`. The returned @scala[`Future`]@java[`CompletionStage`]
will complete with a failure when the maximum number of `connectionAttempts` (which causes a shutdown).

## Load balancing

Expand Down
Expand Up @@ -17,7 +17,7 @@ class AkkaGrpcPlugin implements Plugin<Project>, DependencyResolutionListener {
final String pluginVersion = AkkaGrpcPlugin.class.package.implementationVersion

final String protocVersion = "3.4.0"
final String grpcVersion = "1.28.1" // checked synced by GrpcVersionSyncCheckPlugin
final String grpcVersion = "1.29.0" // checked synced by GrpcVersionSyncCheckPlugin

Project project

Expand Down
Expand Up @@ -6,49 +6,26 @@ package akka.grpc.scaladsl

import java.net.InetSocketAddress

import io.grpc.Status.Code
import io.grpc.StatusRuntimeException

import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._

import akka.actor.ActorSystem
import akka.discovery.{ Lookup, ServiceDiscovery }
import akka.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget }
import akka.grpc.GrpcClientSettings
import akka.grpc.internal.ClientConnectionException
import akka.grpc.scaladsl.tools.MutableServiceDiscovery
import akka.http.scaladsl.Http

import example.myapp.helloworld.grpc.helloworld._

import org.scalatest.wordspec.AnyWordSpec
import org.scalatest.matchers.should.Matchers
import io.grpc.Status.Code
import io.grpc.StatusRuntimeException
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.exceptions.TestFailedException
import org.scalatest.matchers.should.Matchers
import org.scalatest.time.Span
import org.scalatest.wordspec.AnyWordSpec

final class MutableServiceDiscovery(targets: List[InetSocketAddress]) extends ServiceDiscovery {
var services: Future[Resolved] = _

setServices(targets)

def setServices(targets: List[InetSocketAddress]): Unit =
services = Future.successful(
Resolved(
"greeter",
targets.map(target => ResolvedTarget(target.getHostString, Some(target.getPort), Some(target.getAddress)))))

override def lookup(query: Lookup, resolveTimeout: FiniteDuration): Future[Resolved] = {
require(query.serviceName == "greeter")
services
}
}
object MutableServiceDiscovery {
def apply(targets: List[Http.ServerBinding]) = new MutableServiceDiscovery(targets.map(_.localAddress))
}
import scala.concurrent.Await
import scala.concurrent.duration._

class LoadBalancingIntegrationSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with ScalaFutures {
implicit val system = ActorSystem("LoadBalancingIntegrationSpec")
implicit val system = ActorSystem("NonBalancingIntegrationSpec")
implicit val mat = akka.stream.ActorMaterializer.create(system)
implicit val ec = system.dispatcher

Expand Down Expand Up @@ -139,7 +116,7 @@ class LoadBalancingIntegrationSpec extends AnyWordSpec with Matchers with Before
service.greetings.get should be(100)
}

"eventually fail when no valid endpoints are provided" in {
"fail when no valid endpoints are provided (don't retry) when max attempts is set to '1'" in {
val discovery =
new MutableServiceDiscovery(
List(new InetSocketAddress("example.invalid", 80), new InetSocketAddress("example.invalid", 80)))
Expand All @@ -149,13 +126,41 @@ class LoadBalancingIntegrationSpec extends AnyWordSpec with Matchers with Before
.withTls(false)
.withGrpcLoadBalancingType("round_robin")
// Low value to speed up the test
.withConnectionAttempts(2))
.withConnectionAttempts(1))

val failure =
client.sayHello(HelloRequest(s"Hello friend")).failed.futureValue.asInstanceOf[StatusRuntimeException]
failure.getStatus.getCode should be(Code.UNAVAILABLE)

client.closed.failed.futureValue shouldBe a[ClientConnectionException]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this line fixes the test.
grpc-java 1.29.0 changed the way subchannel events are notified (in load-balanced situations). With that change, the ChannelUtils listener for channel changes no longer works when using load balanced channels because it can no longer count reconnection attempts. As a consequence, a scenario like the one in the test will no longer fail but retry indefinitely.

Once I realized the reconnection would work indefinitely I decided the test could be removed.

And, if we remove the test, and then start pulling the thread, we should probably deprecate withConnectionAttempts(2) (or maybe re-implement the underlying logic if we consider a connectionAttempts counter makes any sense now that we are using the internal NameResolver engine provided by groc-java.

Before pushing forward and deleting all usages of connectionAttempts I preferred raising this comment for discussion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it makes sense to follow the lead of the reference implementation and just keep retrying when loadbalancing, then.

if we remove the test, and then start pulling the thread, we should probably deprecate withConnectionAttempts(2)

Isn't it used in the non-loadbalanced scenario either?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it used in the non-loadbalanced scenario either?

Yeah, I had the same thought over breakfast earlier today. I think we don't have a test for that and, even if we have such a test, we should improve documentation or API around withConnectionAttempts.

My main concern is whether we should improve the load-balanced use-cases. For example:

  • in situations where none of the records returned by the service discovery are valid, I'd like to see the aggregated channel to fail eventually (like it does in non-balanced usages). Or, at least, I'd like to see failure traces in the logs (I think we can have a look at logging settings for that).

One of the reasons this took me a while to diagnose was that the test code (aka user code) had zero visibility over the underlying failures. Also, I (the developer) didn't see any apparent failure or retries happening even with DEBUG logs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in situations where none of the records returned by the service discovery are valid, I'd like to see the aggregated channel to fail eventually

I agree. If we don't have any insight in the progress of reconnecting / number of reconnects, I guess the best we could do is wait for a timeout: if a channel remains in TRANSIENT_FAILURE for X amount of time, assume it won't ever successfully connect and fail.

Does the reference implementation have any such timeout?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it used in the non-loadbalanced scenario either?

Yeah, I had the same thought over breakfast earlier today. I think we don't have a test for that and, even if we have such a test, we should improve documentation or API around withConnectionAttempts.


I had the answer above DRAFTed and then I realized that I don't know how to control wether the Channel provided is a single-Channel or a load-balanced one with multiple sub-channels.

Maybe we're no longer using a non LoadBalanced client. The client factory methods all create a ServiceDiscovery instance (maybe akka-dns, maybe config, maybe other) and, since we always inject a NamedResolver based on that ServiceDiscovery it's possible we're always running a Channel with sub-channel(s).

I need to dig that deeper.

Copy link
Member

@ignasi35 ignasi35 Apr 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I had multiple GH windows and posted 2 variants of the same comment)

Ok, so with grpc-java 1.29.0 we should keep withConnectionAttempts around but it'll only be used when withGrpcLoadBalancingType is not invoked. I've just tested that and it works.

I think we have to improve documentation with multiple things:

  1. when withGrpcLoadBalancingType is used, withConnectionAttempts is ignored, the underlying ( akka-grpc grpc-java ) implementation for backoff reconnecting will be used indefintely and client.closed only completes when client.close() is directly invoked. Note: TIL the load balancing in the akka-grpc implementation is per call.
  2. when withGrpcLoadBalancingType is not used, but a ServiceDiscovery is provided alls calls are sent to a single server instance but users can map over client.closed to reconnect again which will trigger a new ServiceDiscovery lookup.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the reference implementation have any such timeout?

I'm reading docs here and there trying to see what other choices we have.

The upstream default is pick_first (no loadbalancing...

Again! 🤦🏼‍♂️ I should refresh the page before answering...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the underlying (akka-grpc) implementation for backoff reconnecting will be used

Do you mean grpc-java?

TIL the load balancing in the akka-grpc implementation is per call

What do you mean?

all calls are sent to a single server instance but users can map over client.closed to reconnect again which will trigger a new ServiceDiscovery lookup.

I would expect that when that single server instance fails, grpc-java will automatically fail over to another instance (triggering a new ServiceDiscovery lookup) as needed. Do you think we need to do something extra there?

In any case would be good to add tests for those scenario's.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(🤦🏼‍♂️ Today is not my day)

In both statements above I meant grpc-java (not akka.grpc).

TIL the load balancing in the akka-grpc grpc-java implementation is per call

🤦🏼‍♂️ It is round-robin! What did I expect?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect that when that single server instance fails, grpc-java will automatically fail over to another instance (triggering a new ServiceDiscovery lookup) as needed. Do you think we need to do something extra there?

I think our code listening on the number of re-connection attempts is the issue here and we may want to remove it once and for all. I mean, having moved to an implementation based on NameResolver, even when using pick-first you are probably correct and grpjc-java is doing a new name resolution. So, it doesn't make sense that close after a certain number of reconnection attempts. I mean, we could keep the code around for logging purposes (even though that code is useless in round-robin) but we probably can drop the withConnectionAttempts setting and closed future.

In any case would be good to add tests for those scenario's.

+1

}

"not fail when no valid endpoints are provided (retry indefinitely) when max attempts is set to another positive value" in {
val discovery =
new MutableServiceDiscovery(
List(new InetSocketAddress("example.invalid", 80), new InetSocketAddress("example.invalid", 80)))
val client = GreeterServiceClient(
GrpcClientSettings
.usingServiceDiscovery("greeter", discovery)
.withTls(false)
.withGrpcLoadBalancingType("round_robin")
// Low value to speed up the test
.withConnectionAttempts(2))

val failure =
client.sayHello(HelloRequest(s"Hello friend")).failed.futureValue.asInstanceOf[StatusRuntimeException]
failure.getStatus.getCode should be(Code.UNAVAILABLE)

try {
client.closed.failed.futureValue
// Yes, we actually expect the future to timeout!
fail("The `client.closed`future should not have completed. A Timeout was expected instead.")
} catch {
case _: TestFailedException => // that's what we're hoping for.
}

}

}

override def afterAll(): Unit = {
Expand Down
@@ -0,0 +1,144 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.grpc.scaladsl

import java.net.InetSocketAddress

import akka.actor.ActorSystem
import akka.grpc.GrpcClientSettings
import akka.grpc.internal.ClientConnectionException
import akka.grpc.scaladsl.tools.MutableServiceDiscovery
import akka.http.scaladsl.Http
import example.myapp.helloworld.grpc.helloworld._
import io.grpc.Status.Code
import io.grpc.StatusRuntimeException
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.exceptions.TestFailedException
import org.scalatest.matchers.should.Matchers
import org.scalatest.time.Span
import org.scalatest.wordspec.AnyWordSpec

import scala.concurrent.Await
import scala.concurrent.duration._

class NonBalancingIntegrationSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with ScalaFutures {
implicit val system = ActorSystem("NonBalancingIntegrationSpec")
implicit val mat = akka.stream.ActorMaterializer.create(system)
implicit val ec = system.dispatcher

override implicit val patienceConfig = PatienceConfig(5.seconds, Span(10, org.scalatest.time.Millis))

"Using pick-first (non load balanced clients)" should {
"send requests to a single endpoint" in {
val service1 = new CountingGreeterServiceImpl()
val service2 = new CountingGreeterServiceImpl()

val server1 = Http().bindAndHandleAsync(GreeterServiceHandler(service1), "127.0.0.1", 0).futureValue
val server2 = Http().bindAndHandleAsync(GreeterServiceHandler(service2), "127.0.0.1", 0).futureValue

val discovery = MutableServiceDiscovery(List(server1, server2))
val client = GreeterServiceClient(GrpcClientSettings.usingServiceDiscovery("greeter", discovery).withTls(false))
for (i <- 1 to 100) {
client.sayHello(HelloRequest(s"Hello $i")).futureValue
}

service1.greetings.get + service2.greetings.get should be(100)
service1.greetings.get should be(100)
service2.greetings.get should be(0)
}

"re-discover endpoints on failure" in {
val service1materializer = akka.stream.ActorMaterializer.create(system)
val service1 = new CountingGreeterServiceImpl()
val service2 = new CountingGreeterServiceImpl()

val server1 =
Http().bindAndHandleAsync(GreeterServiceHandler(service1), "127.0.0.1", 0)(service1materializer).futureValue
val server2 = Http().bindAndHandleAsync(GreeterServiceHandler(service2), "127.0.0.1", 0).futureValue

val discovery = MutableServiceDiscovery(List(server1))
val client = GreeterServiceClient(GrpcClientSettings.usingServiceDiscovery("greeter", discovery).withTls(false))

val requestsPerServer = 2

for (i <- 1 to requestsPerServer) {
client.sayHello(HelloRequest(s"Hello $i")).futureValue
}

discovery.setServices(List(server2.localAddress))

// This is rather heavy-handed, but surprisingly it seems just terminating
// the binding isn't sufficient to actually abort the existing connection.
server1.unbind().futureValue
server1.terminate(hardDeadline = 100.milliseconds).futureValue
service1materializer.shutdown()
Thread.sleep(100)

for (i <- 1 to requestsPerServer) {
client.sayHello(HelloRequest(s"Hello $i")).futureValue
}

service1.greetings.get + service2.greetings.get should be(2 * requestsPerServer)
service1.greetings.get should be(requestsPerServer)
service2.greetings.get should be(requestsPerServer)
}

"select the right endpoint among invalid ones" in {
val service = new CountingGreeterServiceImpl()
val server = Http().bindAndHandleAsync(GreeterServiceHandler(service), "127.0.0.1", 0).futureValue
val discovery =
new MutableServiceDiscovery(
List(
new InetSocketAddress("example.invalid", 80),
server.localAddress,
new InetSocketAddress("example.invalid", 80)))
val client = GreeterServiceClient(GrpcClientSettings.usingServiceDiscovery("greeter", discovery).withTls(false))

for (i <- 1 to 100) {
client.sayHello(HelloRequest(s"Hello $i")).futureValue
}

service.greetings.get should be(100)
}

"eventually fail when no valid endpoints are provided" in {
val discovery =
new MutableServiceDiscovery(
List(new InetSocketAddress("example.invalid", 80), new InetSocketAddress("example.invalid", 80)))
val client = GreeterServiceClient(
GrpcClientSettings
.usingServiceDiscovery("greeter", discovery)
.withTls(false)
// Low value to speed up the test
.withConnectionAttempts(2))

val failure =
client.sayHello(HelloRequest(s"Hello friend")).failed.futureValue.asInstanceOf[StatusRuntimeException]
failure.getStatus.getCode should be(Code.UNAVAILABLE)
client.closed.failed.futureValue shouldBe a[ClientConnectionException]
}

"not fail when no valid endpoints are provided but no limit on attempts is set" in {
val discovery =
new MutableServiceDiscovery(
List(new InetSocketAddress("example.invalid", 80), new InetSocketAddress("example.invalid", 80)))
val client = GreeterServiceClient(GrpcClientSettings.usingServiceDiscovery("greeter", discovery).withTls(false))

try {
client.closed.failed.futureValue
// Yes, we actually expect the future to timeout!
fail("The `client.closed`future should not have completed. A Timeout was expected instead.")
} catch {
case _: TestFailedException => // that's what we're hoping for.
}
}

}

override def afterAll(): Unit = {
Await.result(system.terminate(), 10.seconds)
}
}
@@ -0,0 +1,40 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.grpc.scaladsl.tools

import java.net.InetSocketAddress

import akka.discovery.Lookup
import akka.discovery.ServiceDiscovery
import akka.discovery.ServiceDiscovery.Resolved
import akka.discovery.ServiceDiscovery.ResolvedTarget
import akka.http.scaladsl.Http

import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration

/**
* An In-Memory ServiceDiscovery that only can lookup "greeter"
*/
final class MutableServiceDiscovery(targets: List[InetSocketAddress]) extends ServiceDiscovery {
var services: Future[Resolved] = _

setServices(targets)

def setServices(targets: List[InetSocketAddress]): Unit =
services = Future.successful(
Resolved(
"greeter",
targets.map(target => ResolvedTarget(target.getHostString, Some(target.getPort), Some(target.getAddress)))))

override def lookup(query: Lookup, resolveTimeout: FiniteDuration): Future[Resolved] = {
require(query.serviceName == "greeter")
services
}
}

object MutableServiceDiscovery {
def apply(targets: List[Http.ServerBinding]) = new MutableServiceDiscovery(targets.map(_.localAddress))
}
2 changes: 1 addition & 1 deletion plugin-tester-java/pom.xml
Expand Up @@ -14,7 +14,7 @@
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<akka.http.cors.version>0.4.2</akka.http.cors.version>
<grpc.version>1.28.1</grpc.version> <!-- checked synced by GrpcVersionSyncCheckPlugin -->
<grpc.version>1.29.0</grpc.version> <!-- checked synced by GrpcVersionSyncCheckPlugin -->
<project.encoding>UTF-8</project.encoding>
</properties>

Expand Down
2 changes: 1 addition & 1 deletion plugin-tester-scala/pom.xml
Expand Up @@ -15,7 +15,7 @@
<maven.compiler.target>1.8</maven.compiler.target>
<akka.version>2.5.13</akka.version>
<akka.http.cors.version>0.4.2</akka.http.cors.version>
<grpc.version>1.28.1</grpc.version> <!-- checked synced by GrpcVersionSyncCheckPlugin -->
<grpc.version>1.29.0</grpc.version> <!-- checked synced by GrpcVersionSyncCheckPlugin -->
<project.encoding>UTF-8</project.encoding>
</properties>

Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Expand Up @@ -14,7 +14,7 @@ object Dependencies {
val akkaHttp = "10.1.11"
val akkaHttpBinary = "10.1"

val grpc = "1.28.1" // checked synced by GrpcVersionSyncCheckPlugin
val grpc = "1.29.0" // checked synced by GrpcVersionSyncCheckPlugin

val scalaTest = "3.1.1"

Expand Down
@@ -0,0 +1,2 @@
# internal
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.ChannelUtils.monitorChannel")
8 changes: 7 additions & 1 deletion runtime/src/main/resources/reference.conf
Expand Up @@ -33,7 +33,13 @@ akka.grpc.client."*" {

# TODO: Enforce HTTP/2 TLS restrictions: https://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-9.2

connection-attempts = -1
# The number of times to try connecting before giving up.
# '-1': means retry indefinitely, '0' is invalid, '1' means fail
# after the first failed attempt.
# When load balancing we don't count individual connection
# failures, so in that case any value larger than '1' is also
# interpreted as retrying 'indefinitely'.
connection-attempts = 20

# Service discovery mechamism to use. The default is to use a static host
# and port that will be resolved via DNS.
Expand Down