Skip to content

Commit

Permalink
feat(common): Log requests and responses (#1112)
Browse files Browse the repository at this point in the history
  • Loading branch information
dazuma authored Dec 5, 2024
1 parent 100b6e2 commit fe9e0ec
Show file tree
Hide file tree
Showing 20 changed files with 962 additions and 184 deletions.
6 changes: 6 additions & 0 deletions gapic-common/.rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,15 @@ Metrics/ClassLength:
Metrics/CyclomaticComplexity:
Max: 15

Metrics/MethodLength:
Max: 30

Metrics/PerceivedComplexity:
Max: 15

Naming/AccessorMethodName:
Enabled: false

Naming/FileName:
Exclude:
- "lib/gapic-common.rb"
Expand Down
1 change: 1 addition & 0 deletions gapic-common/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ source "https://rubygems.org"
gemspec

gem "concurrent-ruby", "~> 1.3"
gem "googleapis-common-protos-types", "~> 1.15"
gem "google-cloud-core", "~> 1.7"
gem "google-style", "~> 1.30.0"
gem "grpc-tools", "~> 1.65"
Expand Down
6 changes: 4 additions & 2 deletions gapic-common/gapic-common.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ Gem::Specification.new do |spec|
spec.add_dependency "faraday-retry", ">= 1.0", "< 3.a"
spec.add_dependency "googleapis-common-protos", "~> 1.6"
spec.add_dependency "googleapis-common-protos-types", "~> 1.15"
spec.add_dependency "googleauth", "~> 1.11"
spec.add_dependency "googleauth", "~> 1.12"
spec.add_dependency "google-cloud-env", "~> 2.2"
spec.add_dependency "google-logging-utils", "~> 0.1"
spec.add_dependency "google-protobuf", ">= 3.25", "< 5.a"
spec.add_dependency "grpc", "~> 1.65"
spec.add_dependency "grpc", "~> 1.66"
end
18 changes: 15 additions & 3 deletions gapic-common/lib/gapic/grpc/service_stub.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
require "gapic/grpc/service_stub/rpc_call"
require "gapic/grpc/service_stub/channel"
require "gapic/grpc/service_stub/channel_pool"
require "gapic/logging_concerns"
require "gapic/universe_domain_concerns"

module Gapic
Expand All @@ -32,6 +33,7 @@ module Gapic
#
class ServiceStub
include UniverseDomainConcerns
include LoggingConcerns

attr_reader :grpc_stub
attr_reader :channel_pool
Expand Down Expand Up @@ -64,6 +66,9 @@ class ServiceStub
# be used for intercepting calls before they are executed Interceptors are an EXPERIMENTAL API.
# @param channel_pool_config [::Gapic::ServiceStub:ChannelPool::Configuration] The configuration for channel
# pool. This argument will raise error when `credentials` is provided as a `::GRPC::Core::Channel`.
# @param logger [Logger,:default,nil] An explicit logger to use, or one
# of the values `:default` (the default) to construct a default logger,
# or `nil` to disable logging explicitly.
#
def initialize grpc_stub_class,
credentials:,
Expand All @@ -72,13 +77,19 @@ def initialize grpc_stub_class,
universe_domain: nil,
channel_args: nil,
interceptors: nil,
channel_pool_config: nil
channel_pool_config: nil,
logger: :default
raise ArgumentError, "grpc_stub_class is required" if grpc_stub_class.nil?

setup_universe_domain universe_domain: universe_domain,
endpoint: endpoint,
endpoint_template: endpoint_template,
credentials: credentials
setup_logging logger: logger,
system_name: grpc_stub_class,
service: grpc_stub_class,
endpoint: self.endpoint,
client_id: object_id

@channel_pool = nil
@grpc_stub = nil
Expand All @@ -102,7 +113,7 @@ def create_channel_pool grpc_stub_class, endpoint:, credentials:, channel_args:
end
@channel_pool = ChannelPool.new grpc_stub_class, endpoint: endpoint, credentials: credentials,
channel_args: channel_args, interceptors: interceptors,
config: channel_pool_config
config: channel_pool_config, stub_logger: stub_logger
end

def create_grpc_stub grpc_stub_class, endpoint:, credentials:, channel_args: nil, interceptors: nil
Expand Down Expand Up @@ -201,7 +212,8 @@ def create_grpc_stub grpc_stub_class, endpoint:, credentials:, channel_args: nil
#
def call_rpc method_name, request, options: nil, &block
if @channel_pool.nil?
rpc_call = RpcCall.new @grpc_stub.method method_name
meth = @grpc_stub.method method_name
rpc_call = RpcCall.new meth, stub_logger: stub_logger, method_name: method_name
rpc_call.call request, options: options, &block
else
@channel_pool.call_rpc method_name, request, options: options, &block
Expand Down
6 changes: 4 additions & 2 deletions gapic-common/lib/gapic/grpc/service_stub/channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ class Channel
# Creates a new Channel instance
#
def initialize grpc_stub_class, endpoint:, credentials:, channel_args: nil, interceptors: nil,
on_channel_create: nil
on_channel_create: nil, stub_logger: nil
@grpc_stub_class = grpc_stub_class
@endpoint = endpoint
@credentials = credentials
@channel_args = Hash channel_args
@interceptors = Array interceptors
@stub_logger = stub_logger
@concurrent_streams = 0
@mutex = Mutex.new
setup_grpc_stub
Expand Down Expand Up @@ -88,7 +89,8 @@ def setup_grpc_stub
def call_rpc method_name, request, options: nil, &block
@mutex.synchronize { @concurrent_streams += 1 }
begin
rpc_call = RpcCall.new @grpc_stub.method method_name
meth = @grpc_stub.method method_name
rpc_call = RpcCall.new meth, stub_logger: @stub_logger, method_name: method_name
response = rpc_call.call request, options: options, &block
response
ensure
Expand Down
11 changes: 9 additions & 2 deletions gapic-common/lib/gapic/grpc/service_stub/channel_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ class ChannelPool
##
# Initialize an instance of ServiceStub::ChannelPool
#
def initialize grpc_stub_class, endpoint:, credentials:, channel_args: nil, interceptors: nil, config: nil
def initialize grpc_stub_class,
endpoint:, credentials:,
channel_args: nil,
interceptors: nil,
config: nil,
stub_logger: nil
if credentials.is_a? ::GRPC::Core::Channel
raise ArgumentError, "Can't create a channel pool with GRPC::Core::Channel as credentials"
end
Expand All @@ -41,6 +46,7 @@ def initialize grpc_stub_class, endpoint:, credentials:, channel_args: nil, inte
@channel_args = channel_args
@interceptors = interceptors
@config = config || Configuration.new
@stub_logger = stub_logger

@channels = (1..@config.channel_count).map { create_channel }
end
Expand All @@ -49,7 +55,8 @@ def initialize grpc_stub_class, endpoint:, credentials:, channel_args: nil, inte
# Creates a new channel.
def create_channel
Channel.new @grpc_stub_class, endpoint: @endpoint, credentials: @credentials, channel_args: @channel_args,
interceptors: @interceptors, on_channel_create: @config.on_channel_create
interceptors: @interceptors, on_channel_create: @config.on_channel_create,
stub_logger: @stub_logger
end

##
Expand Down
118 changes: 108 additions & 10 deletions gapic-common/lib/gapic/grpc/service_stub/rpc_call.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

require "gapic/call_options"
require "gapic/logging_concerns"
require "grpc/errors"

module Gapic
Expand All @@ -32,8 +33,11 @@ class RpcCall
#
# @param stub_method [Proc] Used to make a bare rpc call.
#
def initialize stub_method
def initialize stub_method, stub_logger: nil, method_name: nil
@stub_method = stub_method
@stub_logger = stub_logger
@method_name = method_name
@request_id = LoggingConcerns.random_uuid4
end

##
Expand All @@ -44,7 +48,8 @@ def initialize stub_method
# customize the options object, using keys that match the arguments for {Gapic::CallOptions.new}. This object
# should only be used once.
#
# @yield [response, operation] Access the response along with the RPC operation.
# @yield [response, operation] Access the response along with the RPC operation. Additionally, throwing
# `:response, obj` within the block will change the return value to `obj`.
# @yieldparam response [Object] The response object.
# @yieldparam operation [::GRPC::ActiveCall::Operation] The RPC operation for the response.
#
Expand Down Expand Up @@ -91,7 +96,7 @@ def initialize stub_method
# )
# response = echo_call.call request, options: options
#
# @example Accessing the response and RPC operation using a block:
# @example Accessing the RPC operation using a block:
# require "google/showcase/v1beta1/echo_pb"
# require "google/showcase/v1beta1/echo_services_pb"
# require "gapic"
Expand All @@ -107,8 +112,8 @@ def initialize stub_method
# echo_call = Gapic::ServiceStub::RpcCall.new echo_stub.method :echo
#
# request = Google::Showcase::V1beta1::EchoRequest.new
# echo_call.call request do |response, operation|
# operation.trailing_metadata
# metadata = echo_call.call request do |_response, operation|
# throw :response, operation.trailing_metadata
# end
#
def call request, options: nil
Expand All @@ -117,21 +122,27 @@ def call request, options: nil
deadline = calculate_deadline options
metadata = options.metadata

try_number = 1
retried_exception = nil
begin
request = log_request request, metadata, try_number
operation = stub_method.call request, deadline: deadline, metadata: metadata, return_op: true
response = operation.execute
yield response, operation if block_given?
response
catch :response do
response = log_response response, try_number
yield response, operation if block_given?
response
end
rescue ::GRPC::DeadlineExceeded => e
log_response e, try_number
raise Gapic::GRPC::DeadlineExceededError.new e.message, root_cause: retried_exception
rescue StandardError => e
if e.is_a?(::GRPC::Unavailable) && /Signet::AuthorizationError/ =~ e.message
e = Gapic::GRPC::AuthorizationError.new e.message.gsub(%r{^\d+:}, "")
end
e = normalize_exception e
log_response e, try_number

if check_retry?(deadline) && options.retry_policy.call(e)
retried_exception = e
try_number += 1
retry
end

Expand Down Expand Up @@ -163,6 +174,93 @@ def current_time
nsecs_part = nanos % 1_000_000_000
Time.at secs_part, nsecs_part, :nanosecond
end

def normalize_exception exception
if exception.is_a?(::GRPC::Unavailable) && /Signet::AuthorizationError/ =~ exception.message
exception = Gapic::GRPC::AuthorizationError.new exception.message.gsub(%r{^\d+:}, "")
end
exception
end

def log_request request, metadata, try_number
return request unless @stub_logger
@stub_logger.info do |entry|
entry.set_system_name
entry.set_service
entry.set "rpcName", @method_name
entry.set "retryAttempt", try_number
entry.set "requestId", @request_id
entry.message =
if request.is_a? Enumerable
"Sending stream to #{entry.service}.#{@method_name} (try #{try_number})"
else
"Sending request to #{entry.service}.#{@method_name} (try #{try_number})"
end
end
loggable_metadata = metadata.to_h rescue {}
if request.is_a? Enumerable
request.lazy.map do |req|
log_single_request req, loggable_metadata
end
else
log_single_request request, loggable_metadata
end
end

def log_single_request request, metadata
request_content = request.respond_to?(:to_h) ? (request.to_h rescue {}) : request.to_s
if !request_content.empty? || !metadata.empty?
@stub_logger.debug do |entry|
entry.set "requestId", @request_id
entry.set "request", request_content
entry.set "headers", metadata
entry.message = "(request payload as #{request.class})"
end
end
request
end

def log_response response, try_number
return response unless @stub_logger
@stub_logger.info do |entry|
entry.set_system_name
entry.set_service
entry.set "rpcName", @method_name
entry.set "retryAttempt", try_number
entry.set "requestId", @request_id
case response
when StandardError
entry.set "exception", response.to_s
entry.message = "Received error for #{entry.service}.#{@method_name} (try #{try_number}): #{response}"
when Enumerable
entry.message = "Receiving stream for #{entry.service}.#{@method_name} (try #{try_number})"
else
entry.message = "Received response for #{entry.service}.#{@method_name} (try #{try_number})"
end
end
case response
when StandardError
response
when Enumerable
response.lazy.map do |resp|
log_single_response resp
end
else
log_single_response response
end
end

def log_single_response response
response_content = response.respond_to?(:to_h) ? (response.to_h rescue {}) : response.to_s
unless response_content.empty?
@stub_logger.debug do |entry|
entry.set "requestId", @request_id
entry.set "response", response_content
entry.message = "(response payload as #{response.class})"
end
end
response
end
end
end
end
Loading

0 comments on commit fe9e0ec

Please sign in to comment.