New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update grpc-core, grpc-interop-testing, ... to 1.29.0 #933
Merged
Merged
Changes from all commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
e3239a8
Update grpc-core, grpc-interop-testing, ... to 1.29.0
scala-steward 60f2c83
Add missing updates
raboof ce786d0
Since grpc-java 1.29.0 subchannel failures are not notified (ChannelU…
ignasi35 30ac6b2
Since grpc-java 1.29.0 the client will never fail when no valid endpo…
ignasi35 582f647
Adds NonBalacingIntegrationSpec
ignasi35 789282d
Review documentation for client reconnection
ignasi35 1c6545d
Adds missing header
ignasi35 860b433
Allow failing after a loadbalanced connection failure
raboof 4ea1b2e
Fix case where a failure occurs after the channel has been ready
raboof 14b6e82
Add mima filter
raboof File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
144 changes: 144 additions & 0 deletions
144
interop-tests/src/test/scala/akka/grpc/scaladsl/NonBalancingIntegrationSpec.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
/* | ||
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com> | ||
*/ | ||
|
||
package akka.grpc.scaladsl | ||
|
||
import java.net.InetSocketAddress | ||
|
||
import akka.actor.ActorSystem | ||
import akka.grpc.GrpcClientSettings | ||
import akka.grpc.internal.ClientConnectionException | ||
import akka.grpc.scaladsl.tools.MutableServiceDiscovery | ||
import akka.http.scaladsl.Http | ||
import example.myapp.helloworld.grpc.helloworld._ | ||
import io.grpc.Status.Code | ||
import io.grpc.StatusRuntimeException | ||
import org.scalatest.BeforeAndAfterAll | ||
import org.scalatest.concurrent.ScalaFutures | ||
import org.scalatest.exceptions.TestFailedException | ||
import org.scalatest.matchers.should.Matchers | ||
import org.scalatest.time.Span | ||
import org.scalatest.wordspec.AnyWordSpec | ||
|
||
import scala.concurrent.Await | ||
import scala.concurrent.duration._ | ||
|
||
class NonBalancingIntegrationSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with ScalaFutures { | ||
implicit val system = ActorSystem("NonBalancingIntegrationSpec") | ||
implicit val mat = akka.stream.ActorMaterializer.create(system) | ||
implicit val ec = system.dispatcher | ||
|
||
override implicit val patienceConfig = PatienceConfig(5.seconds, Span(10, org.scalatest.time.Millis)) | ||
|
||
"Using pick-first (non load balanced clients)" should { | ||
"send requests to a single endpoint" in { | ||
val service1 = new CountingGreeterServiceImpl() | ||
val service2 = new CountingGreeterServiceImpl() | ||
|
||
val server1 = Http().bindAndHandleAsync(GreeterServiceHandler(service1), "127.0.0.1", 0).futureValue | ||
val server2 = Http().bindAndHandleAsync(GreeterServiceHandler(service2), "127.0.0.1", 0).futureValue | ||
|
||
val discovery = MutableServiceDiscovery(List(server1, server2)) | ||
val client = GreeterServiceClient(GrpcClientSettings.usingServiceDiscovery("greeter", discovery).withTls(false)) | ||
for (i <- 1 to 100) { | ||
client.sayHello(HelloRequest(s"Hello $i")).futureValue | ||
} | ||
|
||
service1.greetings.get + service2.greetings.get should be(100) | ||
service1.greetings.get should be(100) | ||
service2.greetings.get should be(0) | ||
} | ||
|
||
"re-discover endpoints on failure" in { | ||
val service1materializer = akka.stream.ActorMaterializer.create(system) | ||
val service1 = new CountingGreeterServiceImpl() | ||
val service2 = new CountingGreeterServiceImpl() | ||
|
||
val server1 = | ||
Http().bindAndHandleAsync(GreeterServiceHandler(service1), "127.0.0.1", 0)(service1materializer).futureValue | ||
val server2 = Http().bindAndHandleAsync(GreeterServiceHandler(service2), "127.0.0.1", 0).futureValue | ||
|
||
val discovery = MutableServiceDiscovery(List(server1)) | ||
val client = GreeterServiceClient(GrpcClientSettings.usingServiceDiscovery("greeter", discovery).withTls(false)) | ||
|
||
val requestsPerServer = 2 | ||
|
||
for (i <- 1 to requestsPerServer) { | ||
client.sayHello(HelloRequest(s"Hello $i")).futureValue | ||
} | ||
|
||
discovery.setServices(List(server2.localAddress)) | ||
|
||
// This is rather heavy-handed, but surprisingly it seems just terminating | ||
// the binding isn't sufficient to actually abort the existing connection. | ||
server1.unbind().futureValue | ||
server1.terminate(hardDeadline = 100.milliseconds).futureValue | ||
service1materializer.shutdown() | ||
Thread.sleep(100) | ||
|
||
for (i <- 1 to requestsPerServer) { | ||
client.sayHello(HelloRequest(s"Hello $i")).futureValue | ||
} | ||
|
||
service1.greetings.get + service2.greetings.get should be(2 * requestsPerServer) | ||
service1.greetings.get should be(requestsPerServer) | ||
service2.greetings.get should be(requestsPerServer) | ||
} | ||
|
||
"select the right endpoint among invalid ones" in { | ||
val service = new CountingGreeterServiceImpl() | ||
val server = Http().bindAndHandleAsync(GreeterServiceHandler(service), "127.0.0.1", 0).futureValue | ||
val discovery = | ||
new MutableServiceDiscovery( | ||
List( | ||
new InetSocketAddress("example.invalid", 80), | ||
server.localAddress, | ||
new InetSocketAddress("example.invalid", 80))) | ||
val client = GreeterServiceClient(GrpcClientSettings.usingServiceDiscovery("greeter", discovery).withTls(false)) | ||
|
||
for (i <- 1 to 100) { | ||
client.sayHello(HelloRequest(s"Hello $i")).futureValue | ||
} | ||
|
||
service.greetings.get should be(100) | ||
} | ||
|
||
"eventually fail when no valid endpoints are provided" in { | ||
val discovery = | ||
new MutableServiceDiscovery( | ||
List(new InetSocketAddress("example.invalid", 80), new InetSocketAddress("example.invalid", 80))) | ||
val client = GreeterServiceClient( | ||
GrpcClientSettings | ||
.usingServiceDiscovery("greeter", discovery) | ||
.withTls(false) | ||
// Low value to speed up the test | ||
.withConnectionAttempts(2)) | ||
|
||
val failure = | ||
client.sayHello(HelloRequest(s"Hello friend")).failed.futureValue.asInstanceOf[StatusRuntimeException] | ||
failure.getStatus.getCode should be(Code.UNAVAILABLE) | ||
client.closed.failed.futureValue shouldBe a[ClientConnectionException] | ||
} | ||
|
||
"not fail when no valid endpoints are provided but no limit on attempts is set" in { | ||
val discovery = | ||
new MutableServiceDiscovery( | ||
List(new InetSocketAddress("example.invalid", 80), new InetSocketAddress("example.invalid", 80))) | ||
val client = GreeterServiceClient(GrpcClientSettings.usingServiceDiscovery("greeter", discovery).withTls(false)) | ||
|
||
try { | ||
client.closed.failed.futureValue | ||
// Yes, we actually expect the future to timeout! | ||
fail("The `client.closed`future should not have completed. A Timeout was expected instead.") | ||
} catch { | ||
case _: TestFailedException => // that's what we're hoping for. | ||
} | ||
} | ||
|
||
} | ||
|
||
override def afterAll(): Unit = { | ||
Await.result(system.terminate(), 10.seconds) | ||
} | ||
} |
40 changes: 40 additions & 0 deletions
40
interop-tests/src/test/scala/akka/grpc/scaladsl/tools/MutableServiceDiscovery.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
/* | ||
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com> | ||
*/ | ||
|
||
package akka.grpc.scaladsl.tools | ||
|
||
import java.net.InetSocketAddress | ||
|
||
import akka.discovery.Lookup | ||
import akka.discovery.ServiceDiscovery | ||
import akka.discovery.ServiceDiscovery.Resolved | ||
import akka.discovery.ServiceDiscovery.ResolvedTarget | ||
import akka.http.scaladsl.Http | ||
|
||
import scala.concurrent.Future | ||
import scala.concurrent.duration.FiniteDuration | ||
|
||
/** | ||
* An In-Memory ServiceDiscovery that only can lookup "greeter" | ||
*/ | ||
final class MutableServiceDiscovery(targets: List[InetSocketAddress]) extends ServiceDiscovery { | ||
var services: Future[Resolved] = _ | ||
|
||
setServices(targets) | ||
|
||
def setServices(targets: List[InetSocketAddress]): Unit = | ||
services = Future.successful( | ||
Resolved( | ||
"greeter", | ||
targets.map(target => ResolvedTarget(target.getHostString, Some(target.getPort), Some(target.getAddress))))) | ||
|
||
override def lookup(query: Lookup, resolveTimeout: FiniteDuration): Future[Resolved] = { | ||
require(query.serviceName == "greeter") | ||
services | ||
} | ||
} | ||
|
||
object MutableServiceDiscovery { | ||
def apply(targets: List[Http.ServerBinding]) = new MutableServiceDiscovery(targets.map(_.localAddress)) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2 changes: 2 additions & 0 deletions
2
.../main/mima-filters/0.8.4.backwards.excludes/update-to-grpc-core-1.29.0.backwards.excludes
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
# internal | ||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.ChannelUtils.monitorChannel") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing this line fixes the test.
grpc-java
1.29.0
changed the way subchannel events are notified (in load-balanced situations). With that change, theChannelUtils
listener for channel changes no longer works when using load balanced channels because it can no longer count reconnection attempts. As a consequence, a scenario like the one in the test will no longer fail but retry indefinitely.Once I realized the reconnection would work indefinitely I decided the test could be removed.
And, if we remove the test, and then start pulling the thread, we should probably deprecate
withConnectionAttempts(2)
(or maybe re-implement the underlying logic if we consider aconnectionAttempts
counter makes any sense now that we are using the internalNameResolver
engine provided bygroc-java
.Before pushing forward and deleting all usages of
connectionAttempts
I preferred raising this comment for discussion.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it makes sense to follow the lead of the reference implementation and just keep retrying when loadbalancing, then.
Isn't it used in the non-loadbalanced scenario either?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I had the same thought over breakfast earlier today. I think we don't have a test for that and, even if we have such a test, we should improve documentation or API around
withConnectionAttempts
.My main concern is whether we should improve the load-balanced use-cases. For example:
One of the reasons this took me a while to diagnose was that the test code (aka user code) had zero visibility over the underlying failures. Also, I (the developer) didn't see any apparent failure or retries happening even with
DEBUG
logs.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. If we don't have any insight in the progress of reconnecting / number of reconnects, I guess the best we could do is wait for a timeout: if a channel remains in TRANSIENT_FAILURE for X amount of time, assume it won't ever successfully connect and fail.
Does the reference implementation have any such timeout?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I had the same thought over breakfast earlier today. I think we don't have a test for that and, even if we have such a test, we should improve documentation or API around
withConnectionAttempts
.I had the answer above DRAFTed and then I realized that I don't know how to control wether the
Channel
provided is a single-Channel or a load-balanced one with multiple sub-channels.Maybe we're no longer using a non LoadBalanced client. The client factory methods all create a
ServiceDiscovery
instance (maybeakka-dns
, maybeconfig
, maybe other) and, since we always inject aNamedResolver
based on thatServiceDiscovery
it's possible we're always running aChannel
with sub-channel(s).I need to dig that deeper.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(I had multiple GH windows and posted 2 variants of the same comment)
Ok, so with
grpc-java
1.29.0
we should keepwithConnectionAttempts
around but it'll only be used whenwithGrpcLoadBalancingType
is not invoked. I've just tested that and it works.I think we have to improve documentation with multiple things:
withGrpcLoadBalancingType
is used,withConnectionAttempts
is ignored, the underlying (akka-grpc
grpc-java
) implementation for backoff reconnecting will be used indefintely andclient.closed
only completes whenclient.close()
is directly invoked. Note: TIL the load balancing in theakka-grpc
implementation is per call.withGrpcLoadBalancingType
is not used, but aServiceDiscovery
is provided alls calls are sent to a single server instance but users can map overclient.closed
to reconnect again which will trigger a newServiceDiscovery
lookup.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm reading docs here and there trying to see what other choices we have.
Again! 🤦🏼♂️ I should refresh the page before answering...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean
grpc-java
?What do you mean?
I would expect that when that single server instance fails, grpc-java will automatically fail over to another instance (triggering a new ServiceDiscovery lookup) as needed. Do you think we need to do something extra there?
In any case would be good to add tests for those scenario's.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(🤦🏼♂️ Today is not my day)
In both statements above I meant
grpc-java
(notakka.grpc
).🤦🏼♂️ It is
round-robin
! What did I expect?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think our code listening on the number of re-connection attempts is the issue here and we may want to remove it once and for all. I mean, having moved to an implementation based on
NameResolver
, even when usingpick-first
you are probably correct andgrpjc-java
is doing a new name resolution. So, it doesn't make sense that close after a certain number of reconnection attempts. I mean, we could keep the code around for logging purposes (even though that code is useless inround-robin
) but we probably can drop thewithConnectionAttempts
setting andclosed
future.+1