Skip to content

Commit

Permalink
Added UnaryServerInterceptor and UnaryClientInterceptor for GRPC
Browse files Browse the repository at this point in the history
  • Loading branch information
pinglamb committed May 8, 2020
1 parent 79b047a commit 818b132
Show file tree
Hide file tree
Showing 9 changed files with 311 additions and 0 deletions.
1 change: 1 addition & 0 deletions examples/grpc/.ruby-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2.6.5
7 changes: 7 additions & 0 deletions examples/grpc/Gemfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# frozen_string_literal: true

source "https://rubygems.org"

gem "opentelemetry-api", path: "../../api"
gem "opentelemetry-sdk", path: "../../sdk"
gem "grpc", "~> 1.28"
4 changes: 4 additions & 0 deletions examples/grpc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
```
cd ./example/grpc
grpc_tools_ruby_protoc -I api --ruby_out=../lib --grpc_out=../lib api/hello_service.proto
```
34 changes: 34 additions & 0 deletions examples/grpc/api/hello_service.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto3";
package api;

service HelloService {
rpc SayHello (HelloRequest) returns (HelloResponse);

// rpc SayHelloServerStream (HelloRequest) returns (stream HelloResponse);

// rpc SayHelloClientStream (stream HelloRequest) returns (HelloResponse);

// rpc SayHelloBidiStream (stream HelloRequest) returns (stream HelloResponse);
}

message HelloRequest {
string greeting = 1;
}

message HelloResponse {
string reply = 1;
}
20 changes: 20 additions & 0 deletions examples/grpc/api/hello_service_pb.rb

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 37 additions & 0 deletions examples/grpc/api/hello_service_services_pb.rb

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 45 additions & 0 deletions examples/grpc/client.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/usr/bin/env ruby

api_dir = File.expand_path('./api', __dir__)
$LOAD_PATH.unshift(api_dir) unless $LOAD_PATH.include?(api_dir)
lib_dir = File.expand_path('./lib', __dir__)
$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)


require 'rubygems'
require 'bundler/setup'
require 'grpc'
require 'opentelemetry/sdk'

require 'hello_service_services_pb'
require 'grpctrace'

# configure SDK with defaults
OpenTelemetry::SDK.configure

def call_say_hello(conn)
metadata = {
'timestamp' => Time.now.to_i.to_s,
'client-id' => 'web-api-client-us-east-1',
'user-id' => 'some-test-user-id'
}
response = conn.say_hello(
Api::HelloRequest.new(greeting: 'world'),
metadata: metadata
)
puts "Response from server: #{response.reply}"
end

def main
tracer = OpenTelemetry.tracer_provider.tracer('grpc', 'semver:1.0')

conn = Api::HelloService::Stub.new(
'localhost:7777',
:this_channel_is_insecure,
interceptors: [GRPCTrace::UnaryClientInterceptor.new(tracer)]
)

call_say_hello(conn)
end

main
123 changes: 123 additions & 0 deletions examples/grpc/lib/grpctrace.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# frozen_string_literal: true

require 'grpc'

module GRPCTrace
RPC_SERVICE_KEY = 'rpc.service'
NET_PEER_IP_KEY = 'net.peer.ip'
NET_PEER_PORT_KEY = 'net.peer.port'

MESSAGE_TYPE_KEY = 'message.type'
MESSAGE_ID_KEY = 'message.id'
MESSAGE_UNCOMPRESSED_SIZE_KEY = 'message.uncompressed_size'

module Util
FULL_METHOD_REGEXP = /^\/?(?:\S+\.)?(\S+)\/\S+$/
HOST_PORT_REGEXP = /^(?:ipv[46]:)?(?:\[)?([0-9a-f.:]+?)(?:\])?(?::)([0-9]+)$/

def service_from_full_method(method)
match = method.match(FULL_METHOD_REGEXP)
return '' unless match
match[1]
end

def full_method_from_method(method)
# HACK
# The GRPC::ActiveCall doesn't contain the original full method name anymore
# Either this should be fixed on GRPC side or this hack is needed
"/#{method.owner.service_name}/#{Util.camelcase(method.name.to_s)}"
end

def peer_info_from_call(call)
# HACK
# The GRPC::ActiveCall::InterceptableView we got here doesn't allow accessing peer info
# So instead we grab the info from the underlying ActiveCall
peer = call.instance_variable_get(:@wrapped).peer
host, port = Util.split_host_port(peer)

{
NET_PEER_IP_KEY => host,
NET_PEER_PORT_KEY => port
}
end

def self.camelcase(s)
s.sub!(/^[a-z\d]*/) { |match| match.capitalize }
s.gsub!(/(?:_|(\/))([a-z\d]*)/i) { "#{$1}#{$2.capitalize}" }
s
end

def self.split_host_port(peer)
_, host, port = peer.match(HOST_PORT_REGEXP).to_a
[host, port]
end
end

class UnaryClientInterceptor < GRPC::ClientInterceptor
include Util

def initialize(tracer, options = {})
super(options)
@tracer = tracer
end

def request_response(request: nil, call: nil, method: nil, metadata: nil)
puts "Intercepting request response method #{method}" \
" for request #{request} with call #{call} and metadata: #{metadata}"

@tracer.in_span(
method,
kind: :client,
attributes: {
RPC_SERVICE_KEY => service_from_full_method(method)
}.merge(peer_info_from_call(call))
) do |span|
OpenTelemetry.propagation.http.inject(metadata)

begin
span.status = OpenTelemetry::Trace::Status::OK
yield(request: request, call: call, method: method, metadata: metadata)
rescue GRPC::BadStatus => e
span.status = e.code
raise e
end
end
end
end

class UnaryServerInterceptor < GRPC::ServerInterceptor
include Util

def initialize(tracer, options = {})
super(options)
@tracer = tracer
end

def request_response(request: nil, call: nil, method: nil)
puts "Intercepting request response method #{method}" \
" for request #{request} from #{call.peer}" \
" with call #{call} and metadata: #{call.metadata}"

context = OpenTelemetry.propagation.text.extract(call.metadata)

full_method = full_method_from_method(method)

@tracer.in_span(
full_method,
kind: :server,
attributes: {
RPC_SERVICE_KEY => service_from_full_method(full_method)
}.merge(peer_info_from_call(call)),
with_parent_context: context
) do |span|
begin
span.status = OpenTelemetry::Trace::Status::OK
yield
rescue GRPC::BadStatus => e
span.status = e.code
raise e
end
end
end
end
end
40 changes: 40 additions & 0 deletions examples/grpc/server.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#!/usr/bin/env ruby

api_dir = File.expand_path('./api', __dir__)
$LOAD_PATH.unshift(api_dir) unless $LOAD_PATH.include?(api_dir)
lib_dir = File.expand_path('./lib', __dir__)
$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)

require 'rubygems'
require 'bundler/setup'
require 'grpc'
require 'opentelemetry/sdk'

require 'hello_service_services_pb'
require 'grpctrace'

# configure SDK with defaults
OpenTelemetry::SDK.configure

class HelloServiceServer < Api::HelloService::Service
def say_hello(hello_req, _unused_call)
Api::HelloResponse.new(reply: "Hello #{hello_req.greeting}")
end
end

def main
tracer = OpenTelemetry.tracer_provider.tracer('grpc', 'semver:1.0')

s = GRPC::RpcServer.new(
interceptors: [GRPCTrace::UnaryServerInterceptor.new(tracer)]
)

s.add_http2_port(':7777', :this_port_is_insecure)
s.handle(HelloServiceServer)
# Runs the server with SIGHUP, SIGINT and SIGQUIT signal handlers to
# gracefully shutdown.
# User could also choose to run server via call to run_till_terminated
s.run_till_terminated_or_interrupted([1, 'int', 'SIGQUIT'])
end

main

0 comments on commit 818b132

Please sign in to comment.