Skip to content

Commit

Permalink
Update gRPC-Java to 1.30.0
Browse files Browse the repository at this point in the history
Motivation:

gRPC-Java [1.30.0](https://github.com/grpc/grpc-java/releases/tag/v1.30.0) has been released
We need a workaround for `ProtoReflectionService`. Because it does not implement `InternalNotifyOnServerBuild` anymore.
See grpc/grpc-java#6967 line#2806
It is a temporary workaround until upstream handles grpc/grpc-java#7138.

Motivations:

- Upgrade gRPC to 1.30.0 from 1.29.0
- Add ProtoReflectionServiceInterceptor that injects dummy server to
  gRPC context.

Result:

You can run gRPC 1.30.0 with Armeria server.
Fixes line#2806
  • Loading branch information
ikhoon committed Jun 18, 2020
1 parent ead4d10 commit 05be143
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 89 deletions.
1 change: 1 addition & 0 deletions benchmarks/build.gradle
Expand Up @@ -18,6 +18,7 @@ dependencies {
implementation project(':thrift')

implementation 'com.squareup.retrofit2:converter-jackson'
implementation 'com.google.protobuf:protobuf-java-util'
implementation 'io.grpc:grpc-okhttp'
implementation 'io.grpc:grpc-netty-shaded'
implementation 'org.awaitility:awaitility'
Expand Down
2 changes: 1 addition & 1 deletion dependencies.yml
Expand Up @@ -5,7 +5,7 @@
boms:
- com.fasterxml.jackson:jackson-bom:2.11.0
- io.dropwizard.metrics:metrics-bom:4.1.9
- io.grpc:grpc-bom:1.29.0
- io.grpc:grpc-bom:1.30.0
- io.micrometer:micrometer-bom:1.5.1
# NOTE: When changing this, re-evaluate netty-tcnative-boringssl-static below
- io.netty:netty-bom:4.1.50.Final
Expand Down
Expand Up @@ -18,7 +18,7 @@
/**
*/
@javax.annotation.Generated(
value = "by gRPC proto compiler (version 1.29.0)",
value = "by gRPC proto compiler (version 1.30.0)",
comments = "Source: hello.proto")
public final class HelloServiceGrpc {

Expand Down
Expand Up @@ -26,20 +26,15 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;

import javax.annotation.Nullable;

import org.curioswitch.common.protobuf.json.MessageMarshaller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.ImmutableList;

import com.linecorp.armeria.common.HttpHeaders;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
Expand Down Expand Up @@ -68,12 +63,10 @@
import io.grpc.DecompressorRegistry;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Server;
import io.grpc.ServerCall;
import io.grpc.ServerMethodDefinition;
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import io.grpc.protobuf.services.ProtoReflectionService;

/**
* The framed {@link GrpcService} implementation.
Expand All @@ -94,8 +87,6 @@ final class FramedGrpcService extends AbstractHttpService implements GrpcService
private final boolean unsafeWrapRequestBuffers;
private final boolean useClientTimeoutHeader;
private final String advertisedEncodingsHeader;
@Nullable
private final ProtoReflectionService protoReflectionService;

private final Map<SerializationFormat, ResponseHeaders> defaultHeaders;

Expand All @@ -111,15 +102,13 @@ final class FramedGrpcService extends AbstractHttpService implements GrpcService
boolean useBlockingTaskExecutor,
boolean unsafeWrapRequestBuffers,
boolean useClientTimeoutHeader,
@Nullable ProtoReflectionService protoReflectionService,
int maxInboundMessageSizeBytes) {
this.registry = requireNonNull(registry, "registry");
this.routes = requireNonNull(routes, "routes");
this.decompressorRegistry = requireNonNull(decompressorRegistry, "decompressorRegistry");
this.compressorRegistry = requireNonNull(compressorRegistry, "compressorRegistry");
this.supportedSerializationFormats = supportedSerializationFormats;
this.useClientTimeoutHeader = useClientTimeoutHeader;
this.protoReflectionService = protoReflectionService;
jsonMarshaller = jsonMarshaller(registry, supportedSerializationFormats, jsonMarshallerCustomizer);
this.maxOutboundMessageSizeBytes = maxOutboundMessageSizeBytes;
this.useBlockingTaskExecutor = useBlockingTaskExecutor;
Expand Down Expand Up @@ -254,76 +243,6 @@ public void serviceAdded(ServiceConfig cfg) {
if (maxInboundMessageSizeBytes == ArmeriaMessageDeframer.NO_MAX_INBOUND_MESSAGE_SIZE) {
maxInboundMessageSizeBytes = (int) Math.min(cfg.maxRequestLength(), Integer.MAX_VALUE);
}

if (protoReflectionService != null) {
final Map<String, ServerServiceDefinition> grpcServices =
cfg.server().config().virtualHosts().stream()
.flatMap(host -> host.serviceConfigs().stream())
.map(serviceConfig -> serviceConfig.service().as(FramedGrpcService.class))
.filter(Objects::nonNull)
.flatMap(service -> service.services().stream())
// Armeria allows the same service to be registered multiple times at different
// paths, but proto reflection service only supports a single instance of each
// service so we dedupe here.
.collect(toImmutableMap(def -> def.getServiceDescriptor().getName(),
Function.identity(),
(a, b) -> a));
protoReflectionService.notifyOnBuild(new Server() {
@Override
public Server start() {
throw new UnsupportedOperationException();
}

@Override
public List<ServerServiceDefinition> getServices() {
return ImmutableList.copyOf(grpcServices.values());
}

@Override
public List<ServerServiceDefinition> getImmutableServices() {
// NB: This will probably go away in favor of just getServices above, so we
// implement both the same.
// https://github.com/grpc/grpc-java/issues/4600
return getServices();
}

@Override
public List<ServerServiceDefinition> getMutableServices() {
// Armeria does not have the concept of mutable services.
return ImmutableList.of();
}

@Override
public Server shutdown() {
throw new UnsupportedOperationException();
}

@Override
public Server shutdownNow() {
throw new UnsupportedOperationException();
}

@Override
public boolean isShutdown() {
throw new UnsupportedOperationException();
}

@Override
public boolean isTerminated() {
throw new UnsupportedOperationException();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) {
throw new UnsupportedOperationException();
}

@Override
public void awaitTermination() {
throw new UnsupportedOperationException();
}
});
}
}

@Override
Expand Down
Expand Up @@ -52,6 +52,7 @@
import io.grpc.BindableService;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.ServerInterceptors;
import io.grpc.ServerServiceDefinition;
import io.grpc.protobuf.services.ProtoReflectionService;

Expand Down Expand Up @@ -87,8 +88,7 @@ public final class GrpcServiceBuilder {

private boolean useClientTimeoutHeader = true;

@Nullable
private ProtoReflectionService protoReflectionService;
private boolean isProtoReflectionServiceSet;

GrpcServiceBuilder() {}

Expand All @@ -107,11 +107,13 @@ public GrpcServiceBuilder addService(ServerServiceDefinition service) {
*/
public GrpcServiceBuilder addService(BindableService bindableService) {
if (bindableService instanceof ProtoReflectionService) {
checkState(protoReflectionService == null,
checkState(!isProtoReflectionServiceSet,
"Attempting to add a ProtoReflectionService but one is already present. " +
"ProtoReflectionService must only be added once.");
protoReflectionService = (ProtoReflectionService) bindableService;
}
isProtoReflectionServiceSet = true;
return addService(ServerInterceptors.intercept(bindableService,
ProtoReflectionServiceInterceptor.INSTANCE));
}

return addService(bindableService.bindService());
}
Expand Down Expand Up @@ -312,7 +314,6 @@ public GrpcService build() {
useBlockingTaskExecutor,
unsafeWrapRequestBuffers,
useClientTimeoutHeader,
protoReflectionService,
maxInboundMessageSizeBytes);
return enableUnframedRequests ? new UnframedGrpcService(grpcService) : grpcService;
}
Expand Down
@@ -0,0 +1,136 @@
/*
* Copyright 2020 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.server.grpc;

import static com.google.common.collect.ImmutableMap.toImmutableMap;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import javax.annotation.Nullable;

import com.google.common.collect.ImmutableList;

import com.linecorp.armeria.server.ServiceRequestContext;

import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.InternalServer;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerServiceDefinition;

final class ProtoReflectionServiceInterceptor implements ServerInterceptor {

static final ProtoReflectionServiceInterceptor INSTANCE = new ProtoReflectionServiceInterceptor();

@Nullable
private static Server dummyServer;

private ProtoReflectionServiceInterceptor() {}

@Override
public <I, O> Listener<I> interceptCall(ServerCall<I, O> call, Metadata headers,
ServerCallHandler<I, O> next) {
if (dummyServer == null) {
synchronized (INSTANCE) {
dummyServer = newDummyServer();
}
}

final Context context = Context.current().withValue(InternalServer.SERVER_CONTEXT_KEY, dummyServer);
return Contexts.interceptCall(context, call, headers, next);
}

private static Server newDummyServer() {
final Map<String, ServerServiceDefinition> grpcServices =
ServiceRequestContext.current().config().server().config().virtualHosts().stream()
.flatMap(host -> host.serviceConfigs().stream())
.map(serviceConfig -> serviceConfig.service().as(FramedGrpcService.class))
.filter(Objects::nonNull)
.flatMap(service -> service.services().stream())
// Armeria allows the same service to be registered multiple times at
// different paths, but proto reflection service only supports a single
// instance of each service so we dedupe here.
.collect(toImmutableMap(def -> def.getServiceDescriptor().getName(),
Function.identity(),
(a, b) -> a));

return new Server() {
@Override
public Server start() {
throw new UnsupportedOperationException();
}

@Override
public List<ServerServiceDefinition> getServices() {
return ImmutableList.copyOf(grpcServices.values());
}

@Override
public List<ServerServiceDefinition> getImmutableServices() {
// NB: This will probably go away in favor of just getServices above, so we
// implement both the same.
// https://github.com/grpc/grpc-java/issues/4600
return getServices();
}

@Override
public List<ServerServiceDefinition> getMutableServices() {
// Armeria does not have the concept of mutable services.
return ImmutableList.of();
}

@Override
public Server shutdown() {
throw new UnsupportedOperationException();
}

@Override
public Server shutdownNow() {
throw new UnsupportedOperationException();
}

@Override
public boolean isShutdown() {
throw new UnsupportedOperationException();
}

@Override
public boolean isTerminated() {
throw new UnsupportedOperationException();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) {
throw new UnsupportedOperationException();
}

@Override
public void awaitTermination() {
throw new UnsupportedOperationException();
}
};
}
}

0 comments on commit 05be143

Please sign in to comment.