From 174c35416884b4a705effb1c6fbd4665aff6a5b1 Mon Sep 17 00:00:00 2001 From: Calvin Aditya Jonathan Date: Tue, 5 Jan 2021 10:42:41 +0700 Subject: [PATCH] Publish to rubygems (#1) * Set a default ruby version * Remove deprecated has_rdoc option This has been deprecated [since 2011][0] [0]: https://github.com/rubygems/rubygems/blob/1aebd7ddd69c9a38aa9daa3aa89f396d59c9e2a4/History.txt#L3296-L3297 * Add rubocop configuration * Add rubocop fixups and configuration Use the rubocop generated configuration to ignore any major changes that can't be auto-corrected. * Adjust supported Ruby versions Ruby 2.4 is now end-of-life. As a result of this, the `google-protobuf` gem is no longer releasing artifacts compatible with this version. (https://github.com/protocolbuffers/protobuf/pull/7453) In CI we encounter this error from `bundle install`: ``` google-protobuf-3.12.0-x86_64-linux requires ruby version >= 2.5, which is incompatible with the current version, ruby 2.4.6p354 ``` Therefore remove Ruby 2.4.6 as a tested version in Travis CI. Additionally remove the constraint on patch versions in the Travis config, so that we'll use the latest patch version available for each release branch. * Add Prometheus metrics Augment the existing operations with Prometheus metrics in order to provide observability around the operations that the plugin is performing. Introduce a new metrics helper to prevent attempting to register the same metric more than once in a multi-threaded or multi-instance context. * Add compress_batches feature As per the README updates, this can be used to compress a number of input records into a single Pub/Sub message, therefore saving on costs. * Include gocardless branch in Travis config * Minor README fixups Clarify that the published bytes value is before compression. Correct the name of the name of the compression ratio metric. * publish to rubygems Co-authored-by: Ben Wheatley Co-authored-by: Calvin Aditya Jonathan --- .rubocop.yml | 46 ++++ .rubocop_todo.yml | 98 ++++++++ .ruby-version | 1 + .travis.yml | 13 +- CHANGELOG.md | 4 + Gemfile | 4 +- README.md | 76 +++++- Rakefile | 12 +- fluent-plugin-gcloud-pubsub-custom.gemspec | 30 ++- lib/fluent/plugin/gcloud_pubsub/client.rb | 133 +++++++++-- lib/fluent/plugin/gcloud_pubsub/metrics.rb | 24 ++ lib/fluent/plugin/in_gcloud_pubsub.rb | 224 +++++++++++------- lib/fluent/plugin/out_gcloud_pubsub.rb | 121 +++++++--- test/plugin/test_in_gcloud_pubsub.rb | 258 +++++++++++++-------- test/plugin/test_out_gcloud_pubsub.rb | 186 ++++++++++----- test/test_helper.rb | 26 ++- 16 files changed, 930 insertions(+), 326 deletions(-) create mode 100644 .rubocop.yml create mode 100644 .rubocop_todo.yml create mode 100644 .ruby-version create mode 100644 lib/fluent/plugin/gcloud_pubsub/metrics.rb diff --git a/.rubocop.yml b/.rubocop.yml new file mode 100644 index 0000000..2371fde --- /dev/null +++ b/.rubocop.yml @@ -0,0 +1,46 @@ +--- +inherit_from: .rubocop_todo.yml + +AllCops: + # Matches the minimum version in .travis.yml + TargetRubyVersion: 2.4 + +Style/StringLiterals: + EnforcedStyle: "double_quotes" + +# New cops: https://docs.rubocop.org/en/latest/versioning/ +Layout/EmptyLinesAroundAttributeAccessor: + Enabled: true + +Layout/SpaceAroundMethodCallOperator: + Enabled: true + +Lint/RaiseException: + Enabled: true + +Lint/StructNewOverride: + Enabled: true + +Style/ExponentialNotation: + Enabled: true + +Style/HashEachMethods: + Enabled: true + +Style/HashTransformKeys: + Enabled: true + +Style/HashTransformValues: + Enabled: true + +Style/SlicingWithRange: + Enabled: true + +Style/TrailingCommaInHashLiteral: + EnforcedStyleForMultiline: comma + +Style/TrailingCommaInArrayLiteral: + EnforcedStyleForMultiline: comma + +Style/TrailingCommaInArguments: + EnforcedStyleForMultiline: comma diff --git a/.rubocop_todo.yml b/.rubocop_todo.yml new file mode 100644 index 0000000..ce3bcf0 --- /dev/null +++ b/.rubocop_todo.yml @@ -0,0 +1,98 @@ +--- +# This configuration was generated by +# `rubocop --auto-gen-config` +# on 2020-05-12 17:34:19 +0100 using RuboCop version 0.83.0. +# The point is for the user to remove these configuration records +# one by one as the offenses are removed from the code base. +# Note that changes in the inspected code, or installation of new +# versions of RuboCop, may require this file to be generated again. + +# Offense count: 3 +# Configuration parameters: IgnoredMethods. +Metrics/AbcSize: + Max: 50 + +Metrics/BlockLength: + Enabled: false + +Metrics/ClassLength: + Enabled: false + +Metrics/MethodLength: + Max: 30 + +# Offense count: 1 +# Configuration parameters: IgnoredPatterns. +# SupportedStyles: snake_case, camelCase +Naming/MethodName: + EnforcedStyle: snake_case + +# Offense count: 3 +# Cop supports --auto-correct. +# Configuration parameters: AutoCorrect, EnforcedStyle. +# SupportedStyles: nested, compact +Style/ClassAndModuleChildren: + Exclude: + - 'lib/fluent/plugin/in_gcloud_pubsub.rb' + - 'lib/fluent/plugin/out_gcloud_pubsub.rb' + - 'test/test_helper.rb' + +# Offense count: 6 +Style/Documentation: + Exclude: + - 'spec/**/*' + - 'test/**/*' + - 'lib/fluent/plugin/gcloud_pubsub/client.rb' + - 'lib/fluent/plugin/in_gcloud_pubsub.rb' + - 'lib/fluent/plugin/out_gcloud_pubsub.rb' + +# Offense count: 1 +# Configuration parameters: AllowedVariables. +Style/GlobalVars: + Exclude: + - 'test/test_helper.rb' + +# Offense count: 1 +# Configuration parameters: MinBodyLength. +Style/GuardClause: + Exclude: + - 'lib/fluent/plugin/gcloud_pubsub/client.rb' + +# Offense count: 2 +# Cop supports --auto-correct. +Style/IfUnlessModifier: + Exclude: + - 'lib/fluent/plugin/gcloud_pubsub/client.rb' + +# Offense count: 1 +Style/MethodMissingSuper: + Exclude: + - 'test/test_helper.rb' + +# Offense count: 1 +Style/MissingRespondToMissing: + Exclude: + - 'test/test_helper.rb' + +# Offense count: 260 +# Cop supports --auto-correct. +# Configuration parameters: EnforcedStyle, ConsistentQuotesInMultiline. +# SupportedStyles: single_quotes, double_quotes +Style/StringLiterals: + Exclude: + - 'Gemfile' + - 'Rakefile' + - 'fluent-plugin-gcloud-pubsub-custom.gemspec' + - 'lib/fluent/plugin/gcloud_pubsub/client.rb' + - 'lib/fluent/plugin/in_gcloud_pubsub.rb' + - 'lib/fluent/plugin/out_gcloud_pubsub.rb' + - 'test/plugin/test_in_gcloud_pubsub.rb' + - 'test/plugin/test_out_gcloud_pubsub.rb' + - 'test/test_helper.rb' + +# Offense count: 36 +# Cop supports --auto-correct. +# Configuration parameters: AutoCorrect, AllowHeredoc, AllowURI, URISchemes, IgnoreCopDirectives, IgnoredPatterns. +# URISchemes: http, https +Layout/LineLength: + Max: 120 diff --git a/.ruby-version b/.ruby-version new file mode 100644 index 0000000..57cf282 --- /dev/null +++ b/.ruby-version @@ -0,0 +1 @@ +2.6.5 diff --git a/.travis.yml b/.travis.yml index ae9377e..ffc87b8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,20 +1,23 @@ language: ruby rvm: - - 2.4.6 - - 2.5.5 - - 2.6.3 + - 2.5 + - 2.6 + - 2.7 - ruby-head gemfile: - - Gemfile + - Gemfile branches: only: - master + - gocardless before_install: gem update bundler -script: bundle exec rake test +script: + - bundle exec rake test + - bundle exec rubocop sudo: false diff --git a/CHANGELOG.md b/CHANGELOG.md index 87cb518..b0e91f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## ChangeLog +### Release 1.3.3 - 2021/01/05 + +- Add support for payload compression with `compress_batches` + ### Release 1.3.2 - 2019/08/16 - Input plugin diff --git a/Gemfile b/Gemfile index fa75df1..be173b2 100644 --- a/Gemfile +++ b/Gemfile @@ -1,3 +1,5 @@ -source 'https://rubygems.org' +# frozen_string_literal: true + +source "https://rubygems.org" gemspec diff --git a/README.md b/README.md index 1263a70..9620d62 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,7 @@ Use `gcloud_pubsub` output plugin. max_messages 1000 max_total_size 9800000 max_message_size 4000000 + compress_batches false @type memory flush_interval 1s @@ -92,7 +93,11 @@ Use `gcloud_pubsub` output plugin. - `max_message_size` (optional, default: `4000000` = `4MB`) - Messages exceeding `max_message_size` are not published because Pub/Sub clients cannot receive it. - `attribute_keys` (optional, default: `[]`) - - Publishing the set fields as attributes. + - Extract these fields from the record and send them as attributes on the Pub/Sub message. Cannot be set if compress_batches is enabled. +- `metric_prefix` (optional, default: `fluentd_output_gcloud_pubsub`) + - The prefix for Prometheus metric names +- `compress_batches` (optional, default: `false`) + - If set to `true`, messages will be batched and compressed before publication. See [message compression](#message-compression) for details. ### Pull messages @@ -147,11 +152,13 @@ Use `gcloud_pubsub` input plugin. - `pull_threads` (optional, default: `1`) - Set number of threads to pull messages. - `attribute_keys` (optional, default: `[]`) - - Specify the key of the attribute to be emitted as the field of record. + - Acquire these fields from attributes on the Pub/Sub message and merge them into the record. - `parse_error_action` (optional, default: `exception`) - Set error type when parsing messages fails. - `exception`: Raise exception. Messages are not acknowledged. - `warning`: Only logging as warning. +- `metric_prefix` (optional, default: `fluentd_input_gcloud_pubsub`) + - The prefix for Prometheus metric names - `enable_rpc` (optional, default: `false`) - If `true` is specified, HTTP RPC to stop or start pulling message is enabled. - `rpc_bind` (optional, default: `0.0.0.0`) @@ -159,6 +166,71 @@ Use `gcloud_pubsub` input plugin. - `rpc_port` (optional, default: `24680`) - Port for HTTP RPC. +## Message compression + +The `compress_batches` option can be used to enable the compression of messages +_before_ publication to Pub/Sub. + +This works by collecting the buffered messages, taking up to `max_total_size` or +`max_message_size` input records, then compressing them with Zlib (i.e. +gzip/Deflate) before publishing them as a single message to the Pub/Sub topic. + +When transporting large volumes of records via Pub/Sub, e.g. multiple Terabytes +per month, this can lead to significant cost savings, as typically the CPU time +required to compress the messages will be minimal in comparison to the Pub/Sub +costs. + +The compression ratio achievable will vary largely depending on the homogeneity +of the input records, but typically will be 50% at the very minimum and often +around 80-90%. + +In order to achieve good compression, consider the following: +- Ensure that the buffer is being filled with a reasonable batch of messages: do + not use `flush_mode immediate`, and keep the `flush_interval` value + sufficiently high. Use the Prometheus metrics to determine how many records + are being published per message. +- Keep the `max_messages` and `max_message_size` values high (the defaults are + optimal). +- If there are many different sources of messages being mixed and routed to a + single `gcloud_pubsub` output, use multiple outputs (which will each have + their own buffer) through tagging or [labelling][fluentd-labels]. + +[fluentd-labels]: https://docs.fluentd.org/quickstart/life-of-a-fluentd-event#labels + +The receiving end must be able to decode these compressed batches of messages, +which it can determine via an attribute set on the Pub/Sub message. The +`gcloud_pubsub` input plugin will do this transparently, decompressing any +messages which contain a batch of records and normally processing any messages +which represent just a single record. +Therefore, as long as all of the receivers are updated with support for +compressed batches first, it's then possible to gradually roll out this feature. + +## Prometheus metrics + +The input and output plugins expose several metrics in order to monitor +performance: + +- `fluentd_output_gcloud_pubsub_compression_enabled` + - Gauge: Whether compression/batching is enabled +- `fluentd_output_gcloud_pubsub_messages_published_per_batch` + - Histogram: Number of records published to Pub/Sub per buffer flush +- `fluentd_output_gcloud_pubsub_messages_published_bytes` + - Histogram: Total size in bytes of the records published to Pub/Sub, + **before** compression. +- `fluentd_output_gcloud_pubsub_messages_compression_duration_seconds` + - Histogram: Time taken to compress a batch of messages +- `fluentd_output_gcloud_pubsub_messages_compressed_size_per_original_size_ratio` + - Histogram: Compression ratio achieved on a batch of messages, expressed in + terms of space saved. + +- `fluentd_input_gcloud_pubsub_pull_errors_total` + - Counter: Errors encountered while pulling or processing messages (split by a + `retryable` label) +- `fluentd_input_gcloud_pubsub_messages_pulled` + - Histogram: Number of Pub/Sub messages pulled by the subscriber on each invocation +- `fluentd_input_gcloud_pubsub_messages_pulled_bytes` + - Histogram: Total size in bytes of the Pub/Sub messages pulled by the subscriber on each invocation + ## Contributing 1. Fork it diff --git a/Rakefile b/Rakefile index 0e90de2..548614f 100644 --- a/Rakefile +++ b/Rakefile @@ -1,12 +1,14 @@ -require 'bundler' +# frozen_string_literal: true + +require "bundler" Bundler::GemHelper.install_tasks -require 'rake/testtask' +require "rake/testtask" Rake::TestTask.new(:test) do |test| - test.libs << 'lib' << 'test' - test.test_files = FileList['test/plugin/test_*.rb'] + test.libs << "lib" << "test" + test.test_files = FileList["test/plugin/test_*.rb"] test.verbose = true end -task :default => [:build] +task default: [:build] diff --git a/fluent-plugin-gcloud-pubsub-custom.gemspec b/fluent-plugin-gcloud-pubsub-custom.gemspec index 37b5045..9222ecb 100644 --- a/fluent-plugin-gcloud-pubsub-custom.gemspec +++ b/fluent-plugin-gcloud-pubsub-custom.gemspec @@ -1,26 +1,32 @@ -# encoding: utf-8 -$:.push File.expand_path('../lib', __FILE__) +# frozen_string_literal: true + +$LOAD_PATH.push File.expand_path("lib", __dir__) Gem::Specification.new do |gem| - gem.name = "fluent-plugin-gcloud-pubsub-custom" - gem.description = "Google Cloud Pub/Sub input/output plugin for Fluentd event collector" + gem.name = "fluent-plugin-gcloud-pubsub-custom-compress-batches" + gem.description = "Google Cloud Pub/Sub input/output plugin for Fluentd event collector - with payload compression. Forked from https://github.com/gocardless/fluent-plugin-gcloud-pubsub-custom" gem.license = "MIT" - gem.homepage = "https://github.com/mia-0032/fluent-plugin-gcloud-pubsub-custom" - gem.summary = gem.description - gem.version = "1.3.2" - gem.authors = ["Yoshihiro MIYAI"] - gem.email = "msparrow17@gmail.com" - gem.has_rdoc = false + gem.homepage = "https://github.com/calvinaditya95/fluent-plugin-gcloud-pubsub-custom" + gem.summary = "Google Cloud Pub/Sub input/output plugin for Fluentd event collector - with payload compression" + gem.version = "1.3.3" + gem.authors = ["Calvin Aditya"] + gem.email = "calvin.aditya95@gmail.com" gem.files = `git ls-files`.split("\n") gem.test_files = `git ls-files -- {test,spec,features}/*`.split("\n") - gem.executables = `git ls-files -- bin/*`.split("\n").map{ |f| File.basename(f) } - gem.require_paths = ['lib'] + gem.executables = `git ls-files -- bin/*`.split("\n").map { |f| File.basename(f) } + gem.require_paths = ["lib"] gem.add_runtime_dependency "fluentd", [">= 0.14.15", "< 2"] gem.add_runtime_dependency "google-cloud-pubsub", "~> 0.30.0" + # Use the same version constraint as fluent-plugin-prometheus currently specifies + gem.add_runtime_dependency "prometheus-client", "< 0.10" + gem.add_development_dependency "bundler" + gem.add_development_dependency "pry" + gem.add_development_dependency "pry-byebug" gem.add_development_dependency "rake" + gem.add_development_dependency "rubocop", "~>0.83" gem.add_development_dependency "test-unit" gem.add_development_dependency "test-unit-rr" end diff --git a/lib/fluent/plugin/gcloud_pubsub/client.rb b/lib/fluent/plugin/gcloud_pubsub/client.rb index cae3469..9138e55 100644 --- a/lib/fluent/plugin/gcloud_pubsub/client.rb +++ b/lib/fluent/plugin/gcloud_pubsub/client.rb @@ -1,4 +1,7 @@ -require 'google/cloud/pubsub' +# frozen_string_literal: true + +require "google/cloud/pubsub" +require "zlib" module Fluent module GcloudPubSub @@ -7,14 +10,19 @@ class Error < StandardError class RetryableError < Error end + COMPRESSION_ALGORITHM_ZLIB = "zlib" + # 30 is the ASCII record separator character + BATCHED_RECORD_SEPARATOR = 30.chr + class Message attr_reader :message, :attributes - def initialize(message, attributes={}) + + def initialize(message, attributes = {}) @message = message @attributes = attributes end - def bytesize() + def bytesize attr_size = 0 @attributes.each do |key, val| attr_size += key.bytesize + val.bytesize @@ -24,35 +32,95 @@ def bytesize() end class Publisher - def initialize(project, key, autocreate_topic) + def initialize(project, key, autocreate_topic, metric_prefix) @pubsub = Google::Cloud::Pubsub.new project_id: project, credentials: key @autocreate_topic = autocreate_topic @topics = {} + + # rubocop:disable Layout/LineLength + @compression_ratio = + Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{metric_prefix}_messages_compressed_size_per_original_size_ratio") do + ::Prometheus::Client.registry.histogram( + :"#{metric_prefix}_messages_compressed_size_per_original_size_ratio", + "Compression ratio achieved on a batch of messages", + {}, + # We expect compression for even a single message to be typically + # above 2x (0.5/50%), so bias the buckets towards the higher end + # of the range. + [0, 0.25, 0.5, 0.75, 0.85, 0.9, 0.95, 0.975, 1], + ) + end + + @compression_duration = + Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{metric_prefix}_messages_compression_duration_seconds") do + ::Prometheus::Client.registry.histogram( + :"#{metric_prefix}_messages_compression_duration_seconds", + "Time taken to compress a batch of messages", + {}, + [0, 0.0001, 0.0005, 0.001, 0.01, 0.05, 0.1, 0.25, 0.5, 1], + ) + end + # rubocop:enable Layout/LineLength end def topic(topic_name) - return @topics[topic_name] if @topics.has_key? topic_name + return @topics[topic_name] if @topics.key? topic_name client = @pubsub.topic topic_name - if client.nil? && @autocreate_topic - client = @pubsub.create_topic topic_name - end - if client.nil? - raise Error.new "topic:#{topic_name} does not exist." - end + client = @pubsub.create_topic topic_name if client.nil? && @autocreate_topic + raise Error, "topic:#{topic_name} does not exist." if client.nil? @topics[topic_name] = client client end - def publish(topic_name, messages) - topic(topic_name).publish do |batch| - messages.each do |m| - batch.publish m.message, m.attributes + def publish(topic_name, messages, compress_batches = false) + if compress_batches + topic(topic_name).publish(*compress_messages_with_zlib(messages, topic_name)) + else + topic(topic_name).publish do |batch| + messages.each do |m| + batch.publish m.message, m.attributes + end end end - rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => ex - raise RetryableError.new "Google api returns error:#{ex.class.to_s} message:#{ex.to_s}" + rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => e + raise RetryableError, "Google api returns error:#{e.class} message:#{e}" + end + + private + + def compress_messages_with_zlib(messages, topic_name) + original_size = messages.sum(&:bytesize) + # This should never happen, only a programming error or major + # misconfiguration should lead to this situation. But checking against + # it here avoids a potential division by zero later on. + raise ArgumentError, "not compressing empty inputs" if original_size.zero? + + # Here we're implicitly dropping the 'attributes' field of the messages + # that we're iterating over. + # This is fine, because the :attribute_keys config param is not + # supported when in compressed mode, so this field will always be + # empty. + packed_messages = messages.map(&:message).join(BATCHED_RECORD_SEPARATOR) + + duration, compressed_messages = Fluent::GcloudPubSub::Metrics.measure_duration do + Zlib::Deflate.deflate(packed_messages) + end + + @compression_duration.observe( + { topic: topic_name, algorithm: COMPRESSION_ALGORITHM_ZLIB }, + duration, + ) + + compressed_size = compressed_messages.bytesize + @compression_ratio.observe( + { topic: topic_name, algorithm: COMPRESSION_ALGORITHM_ZLIB }, + # If original = 1MiB and compressed = 256KiB; then metric value = 0.75 = 75% when plotted + 1 - compressed_size.to_f / original_size, + ) + + [compressed_messages, { "compression_algorithm": COMPRESSION_ALGORITHM_ZLIB }] end end @@ -65,19 +133,40 @@ def initialize(project, key, topic_name, subscription_name) topic = pubsub.topic topic_name @client = topic.subscription subscription_name end - raise Error.new "subscription:#{subscription_name} does not exist." if @client.nil? + raise Error, "subscription:#{subscription_name} does not exist." if @client.nil? end def pull(immediate, max) @client.pull immediate: immediate, max: max - rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => ex - raise RetryableError.new "Google pull api returns error:#{ex.class.to_s} message:#{ex.to_s}" + rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => e + raise RetryableError, "Google pull api returns error:#{e.class} message:#{e}" end def acknowledge(messages) @client.acknowledge messages - rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => ex - raise RetryableError.new "Google acknowledge api returns error:#{ex.class.to_s} message:#{ex.to_s}" + rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => e + raise RetryableError, "Google acknowledge api returns error:#{e.class} message:#{e}" + end + end + + class MessageUnpacker + def self.unpack(message) + attributes = message.attributes + algorithm = attributes["compression_algorithm"] + + case algorithm + when nil + # For an uncompressed message return the single line and attributes + [[message.message.data.chomp, message.attributes]] + when COMPRESSION_ALGORITHM_ZLIB + # Return all of the lines in the message, with empty attributes + Zlib::Inflate + .inflate(message.message.data) + .split(BATCHED_RECORD_SEPARATOR) + .map { |line| [line, {}] } + else + raise ArgumentError, "unknown compression algorithm: '#{algorithm}'" + end end end end diff --git a/lib/fluent/plugin/gcloud_pubsub/metrics.rb b/lib/fluent/plugin/gcloud_pubsub/metrics.rb new file mode 100644 index 0000000..7fe6a00 --- /dev/null +++ b/lib/fluent/plugin/gcloud_pubsub/metrics.rb @@ -0,0 +1,24 @@ +# frozen_string_literal: true + +module Fluent + module GcloudPubSub + # Utilities for interacting with Prometheus metrics + module Metrics + def self.register_or_existing(metric_name) + return ::Prometheus::Client.registry.get(metric_name) if ::Prometheus::Client.registry.exist?(metric_name) + + yield + end + + # Time the elapsed execution of the provided block, return the duration + # as the first element followed by the result of the block. + def self.measure_duration + start = Process.clock_gettime(Process::CLOCK_MONOTONIC) + result = yield + finish = Process.clock_gettime(Process::CLOCK_MONOTONIC) + + [finish - start, *result] + end + end + end +end diff --git a/lib/fluent/plugin/in_gcloud_pubsub.rb b/lib/fluent/plugin/in_gcloud_pubsub.rb index 8b0f47e..f079b5a 100644 --- a/lib/fluent/plugin/in_gcloud_pubsub.rb +++ b/lib/fluent/plugin/in_gcloud_pubsub.rb @@ -1,53 +1,58 @@ -require 'json' -require 'webrick' +# frozen_string_literal: true -require 'fluent/plugin/input' -require 'fluent/plugin/parser' +require "json" +require "webrick" -require 'fluent/plugin/gcloud_pubsub/client' +require "fluent/plugin/input" +require "fluent/plugin/parser" + +require "fluent/plugin/gcloud_pubsub/client" +require "fluent/plugin/gcloud_pubsub/metrics" module Fluent::Plugin class GcloudPubSubInput < Input - Fluent::Plugin.register_input('gcloud_pubsub', self) + Fluent::Plugin.register_input("gcloud_pubsub", self) helpers :compat_parameters, :parser, :thread - DEFAULT_PARSER_TYPE = 'json' + DEFAULT_PARSER_TYPE = "json" class FailedParseError < StandardError end - desc 'Set tag of messages.' - config_param :tag, :string - desc 'Set key to be used as tag.' - config_param :tag_key, :string, default: nil - desc 'Set your GCP project.' - config_param :project, :string, default: nil - desc 'Set your credential file path.' - config_param :key, :string, default: nil - desc 'Set topic name to pull.' - config_param :topic, :string, default: nil - desc 'Set subscription name to pull.' - config_param :subscription, :string - desc 'Pulling messages by intervals of specified seconds.' - config_param :pull_interval, :float, default: 5.0 - desc 'Max messages pulling at once.' - config_param :max_messages, :integer, default: 100 - desc 'Setting `true`, keepalive connection to wait for new messages.' - config_param :return_immediately, :bool, default: true - desc 'Set number of threads to pull messages.' - config_param :pull_threads, :integer, default: 1 - desc 'Specify the key of the attribute to be acquired as a record' - config_param :attribute_keys, :array, default: [] - desc 'Set error type when parsing messages fails.' - config_param :parse_error_action, :enum, default: :exception, list: [:exception, :warning] + desc "Set tag of messages." + config_param :tag, :string + desc "Set key to be used as tag." + config_param :tag_key, :string, default: nil + desc "Set your GCP project." + config_param :project, :string, default: nil + desc "Set your credential file path." + config_param :key, :string, default: nil + desc "Set topic name to pull." + config_param :topic, :string, default: nil + desc "Set subscription name to pull." + config_param :subscription, :string + desc "Pulling messages by intervals of specified seconds." + config_param :pull_interval, :float, default: 5.0 + desc "Max messages pulling at once." + config_param :max_messages, :integer, default: 100 + desc "Setting `true`, keepalive connection to wait for new messages." + config_param :return_immediately, :bool, default: true + desc "Set number of threads to pull messages." + config_param :pull_threads, :integer, default: 1 + desc "Acquire these fields from attributes on the Pub/Sub message and merge them into the record" + config_param :attribute_keys, :array, default: [] + desc "Set error type when parsing messages fails." + config_param :parse_error_action, :enum, default: :exception, list: %i[exception warning] + desc "The prefix for Prometheus metric names" + config_param :metric_prefix, :string, default: "fluentd_input_gcloud_pubsub" # for HTTP RPC - desc 'If `true` is specified, HTTP RPC to stop or start pulling message is enabled.' - config_param :enable_rpc, :bool, default: false - desc 'Bind IP address for HTTP RPC.' - config_param :rpc_bind, :string, default: '0.0.0.0' - desc 'Port for HTTP RPC.' - config_param :rpc_port, :integer, default: 24680 + desc "If `true` is specified, HTTP RPC to stop or start pulling message is enabled." + config_param :enable_rpc, :bool, default: false + desc "Bind IP address for HTTP RPC." + config_param :rpc_bind, :string, default: "0.0.0.0" + desc "Port for HTTP RPC." + config_param :rpc_port, :integer, default: 24_680 config_section :parse do config_set_default :@type, DEFAULT_PARSER_TYPE @@ -61,45 +66,48 @@ def initialize(server, plugin) @plugin = plugin end + # rubocop:disable Naming/MethodName def do_GET(req, res) begin code, header, body = process(req, res) - rescue + rescue StandardError code, header, body = render_json(500, { - 'ok' => false, - 'message' => 'Internal Server Error', - 'error' => "#{$!}", - 'backtrace'=> $!.backtrace - }) + "ok" => false, + "message" => "Internal Server Error", + "error" => $ERROR_INFO.to_s, + "backtrace" => $ERROR_INFO.backtrace, + }) end res.status = code - header.each_pair {|k,v| + header.each_pair do |k, v| res[k] = v - } + end res.body = body end + # rubocop:enable Naming/MethodName def render_json(code, obj) - [code, {'Content-Type' => 'application/json'}, obj.to_json] + [code, { "Content-Type" => "application/json" }, obj.to_json] end - def process(req, res) - ret = {'ok' => true} + def process(req, _res) + ret = { "ok" => true } case req.path_info - when '/stop' + when "/stop" @plugin.stop_pull - when '/start' + when "/start" @plugin.start_pull - when '/status' - ret['status'] = @plugin.status_of_pull + when "/status" + ret["status"] = @plugin.status_of_pull else - raise Error.new "Invalid path_info: #{req.path_info}" + raise Error, "Invalid path_info: #{req.path_info}" end render_json(200, ret) end end + # rubocop:disable Metrics/MethodLength def configure(conf) compat_parameters_convert(conf, :parser) super @@ -114,7 +122,37 @@ def configure(conf) end @parser = parser_create + + @messages_pulled = + Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_messages_pulled") do + ::Prometheus::Client.registry.histogram( + :"#{@metric_prefix}_messages_pulled", + "Number of Pub/Sub messages pulled by the subscriber on each invocation", + {}, + [0, 1, 10, 50, 100, 250, 500, 1000], + ) + end + + @messages_pulled_bytes = + Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_messages_pulled_bytes") do + ::Prometheus::Client.registry.histogram( + :"#{@metric_prefix}_messages_pulled_bytes", + "Total size in bytes of the Pub/Sub messages pulled by the subscriber on each invocation", + {}, + [100, 1000, 10_000, 100_000, 1_000_000, 5_000_000, 10_000_000], + ) + end + + @pull_errors = + Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_pull_errors_total") do + ::Prometheus::Client.registry.counter( + :"#{@metric_prefix}_pull_errors_total", + "Errors encountered while pulling or processing messages", + {}, + ) + end end + # rubocop:enable Metrics/MethodLength def start super @@ -136,9 +174,7 @@ def shutdown @rpc_srv.shutdown @rpc_srv = nil end - if @rpc_thread - @rpc_thread = nil - end + @rpc_thread = nil if @rpc_thread @stop_subscribing = true @subscribe_threads.each(&:join) super @@ -155,12 +191,12 @@ def start_pull end def status_of_pull - @stop_pull ? 'stopped' : 'started' + @stop_pull ? "stopped" : "started" end private - def static_tag(record) + def static_tag(_record) @tag end @@ -175,44 +211,50 @@ def start_rpc BindAddress: @rpc_bind, Port: @rpc_port, Logger: WEBrick::Log.new(STDERR, WEBrick::Log::FATAL), - AccessLog: [] - } + AccessLog: [], + }, ) - @rpc_srv.mount('/api/in_gcloud_pubsub/pull/', RPCServlet, self) - @rpc_thread = thread_create(:in_gcloud_pubsub_rpc_thread){ + @rpc_srv.mount("/api/in_gcloud_pubsub/pull/", RPCServlet, self) + @rpc_thread = thread_create(:in_gcloud_pubsub_rpc_thread) do @rpc_srv.start - } + end end def subscribe until @stop_subscribing _subscribe unless @stop_pull - if @return_immediately || @stop_pull - sleep @pull_interval - end + sleep @pull_interval if @return_immediately || @stop_pull end - rescue => ex - log.error "unexpected error", error_message: ex.to_s, error_class: ex.class.to_s - log.error_backtrace ex.backtrace + rescue StandardError => e + log.error "unexpected error", error_message: e.to_s, error_class: e.class.to_s + log.error_backtrace e.backtrace end def _subscribe messages = @subscriber.pull @return_immediately, @max_messages - if messages.length == 0 + @messages_pulled.observe(common_labels, messages.size) + if messages.empty? log.debug "no messages are pulled" return end + messages_size = messages.sum do |message| + message.data.bytesize + message.attributes.sum { |k, v| k.bytesize + v.bytesize } + end + @messages_pulled_bytes.observe(common_labels, messages_size) + process messages @subscriber.acknowledge messages log.debug "#{messages.length} message(s) processed" - rescue Fluent::GcloudPubSub::RetryableError => ex - log.warn "Retryable error occurs. Fluentd will retry.", error_message: ex.to_s, error_class: ex.class.to_s - rescue => ex - log.error "unexpected error", error_message: ex.to_s, error_class: ex.class.to_s - log.error_backtrace ex.backtrace + rescue Fluent::GcloudPubSub::RetryableError => e + @pull_errors.increment(common_labels.merge({ retryable: true })) + log.warn "Retryable error occurs. Fluentd will retry.", error_message: e.to_s, error_class: e.class.to_s + rescue StandardError => e + @pull_errors.increment(common_labels.merge({ retryable: false })) + log.error "unexpected error", error_message: e.to_s, error_class: e.class.to_s + log.error_backtrace e.backtrace end def process(messages) @@ -221,21 +263,23 @@ def process(messages) end messages.each do |m| - line = m.message.data.chomp - attributes = m.attributes - @parser.parse(line) do |time, record| - if time && record - @attribute_keys.each do |key| - record[key] = attributes[key] - end + lines_attributes = Fluent::GcloudPubSub::MessageUnpacker.unpack(m) + + lines_attributes.each do |line, attributes| + @parser.parse(line) do |time, record| + if time && record + @attribute_keys.each do |key| + record[key] = attributes[key] + end - event_streams[@extract_tag.call(record)].add(time, record) - else - case @parse_error_action - when :exception - raise FailedParseError.new "pattern not match: #{line}" + event_streams[@extract_tag.call(record)].add(time, record) else - log.warn 'pattern not match', record: line + case @parse_error_action + when :exception + raise FailedParseError, "pattern not match: #{line}" + else + log.warn "pattern not match", record: line + end end end end @@ -249,5 +293,9 @@ def process(messages) end end end + + def common_labels + { subscription: @subscription } + end end end diff --git a/lib/fluent/plugin/out_gcloud_pubsub.rb b/lib/fluent/plugin/out_gcloud_pubsub.rb index 4047929..7fd62d3 100644 --- a/lib/fluent/plugin/out_gcloud_pubsub.rb +++ b/lib/fluent/plugin/out_gcloud_pubsub.rb @@ -1,34 +1,43 @@ -require 'fluent/plugin/output' -require 'fluent/plugin/gcloud_pubsub/client' -require 'fluent/plugin_helper/inject' +# frozen_string_literal: true + +require "fluent/plugin/output" +require "fluent/plugin/gcloud_pubsub/client" +require "fluent/plugin/gcloud_pubsub/metrics" +require "fluent/plugin_helper/inject" +require "prometheus/client" module Fluent::Plugin class GcloudPubSubOutput < Output include Fluent::PluginHelper::Inject - Fluent::Plugin.register_output('gcloud_pubsub', self) + Fluent::Plugin.register_output("gcloud_pubsub", self) helpers :compat_parameters, :formatter DEFAULT_BUFFER_TYPE = "memory" DEFAULT_FORMATTER_TYPE = "json" - desc 'Set your GCP project.' - config_param :project, :string, :default => nil - desc 'Set your credential file path.' - config_param :key, :string, :default => nil - desc 'Set topic name to publish.' - config_param :topic, :string + desc "Set your GCP project." + config_param :project, :string, default: nil + desc "Set your credential file path." + config_param :key, :string, default: nil + desc "Set topic name to publish." + config_param :topic, :string desc "If set to `true`, specified topic will be created when it doesn't exist." - config_param :autocreate_topic, :bool, :default => false - desc 'Publishing messages count per request to Cloud Pub/Sub.' - config_param :max_messages, :integer, :default => 1000 - desc 'Publishing messages bytesize per request to Cloud Pub/Sub.' - config_param :max_total_size, :integer, :default => 9800000 # 9.8MB - desc 'Limit bytesize per message.' - config_param :max_message_size, :integer, :default => 4000000 # 4MB - desc 'Publishing the set field as an attribute' - config_param :attribute_keys, :array, :default => [] + config_param :autocreate_topic, :bool, default: false + desc "Publishing messages count per request to Cloud Pub/Sub." + config_param :max_messages, :integer, default: 1000 + desc "Publishing messages bytesize per request to Cloud Pub/Sub." + config_param :max_total_size, :integer, default: 9_800_000 # 9.8MB + desc "Limit bytesize per message." + config_param :max_message_size, :integer, default: 4_000_000 # 4MB + desc "Extract these fields from the record and send them as attributes on the Pub/Sub message. " \ + "Cannot be set if compress_batches is enabled." + config_param :attribute_keys, :array, default: [] + desc "The prefix for Prometheus metric names" + config_param :metric_prefix, :string, default: "fluentd_output_gcloud_pubsub" + desc "If set to `true`, messages will be batched and compressed before publication" + config_param :compress_batches, :bool, default: false config_section :buffer do config_set_default :@type, DEFAULT_BUFFER_TYPE @@ -38,16 +47,57 @@ class GcloudPubSubOutput < Output config_set_default :@type, DEFAULT_FORMATTER_TYPE end + # rubocop:disable Metrics/MethodLength def configure(conf) compat_parameters_convert(conf, :buffer, :formatter) super placeholder_validate!(:topic, @topic) @formatter = formatter_create + + if @compress_batches && !@attribute_keys.empty? + # The attribute_keys option is implemented by extracting keys from the + # record and setting them on the Pub/Sub message. + # This is not possible in compressed mode, because we're sending just a + # single Pub/Sub message that comprises many records, therefore the + # attribute keys would clash. + raise Fluent::ConfigError, ":attribute_keys cannot be used when compression is enabled" + end + + @messages_published = + Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_messages_published_per_batch") do + ::Prometheus::Client.registry.histogram( + :"#{@metric_prefix}_messages_published_per_batch", + "Number of records published to Pub/Sub per buffer flush", + {}, + [1, 10, 50, 100, 250, 500, 1000], + ) + end + + @bytes_published = + Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_messages_published_bytes") do + ::Prometheus::Client.registry.histogram( + :"#{@metric_prefix}_messages_published_bytes", + "Total size in bytes of the records published to Pub/Sub", + {}, + [100, 1000, 10_000, 100_000, 1_000_000, 5_000_000, 10_000_000], + ) + end + + @compression_enabled = + Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_compression_enabled") do + ::Prometheus::Client.registry.gauge( + :"#{@metric_prefix}_compression_enabled", + "Whether compression/batching is enabled", + {}, + ) + end + @compression_enabled.set(common_labels, @compress_batches ? 1 : 0) end + # rubocop:enable Metrics/MethodLength def start super - @publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @autocreate_topic + @publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @autocreate_topic, @metric_prefix end def format(tag, time, record) @@ -76,7 +126,7 @@ def write(chunk) chunk.msgpack_each do |msg, attr| msg = Fluent::GcloudPubSub::Message.new(msg, attr) if msg.bytesize > @max_message_size - log.warn 'Drop a message because its size exceeds `max_message_size`', size: msg.bytesize + log.warn "Drop a message because its size exceeds `max_message_size`", size: msg.bytesize next end if messages.length + 1 > @max_messages || size + msg.bytesize > @max_total_size @@ -88,23 +138,30 @@ def write(chunk) size += msg.bytesize end - if messages.length > 0 - publish(topic, messages) - end - rescue Fluent::GcloudPubSub::RetryableError => ex - log.warn "Retryable error occurs. Fluentd will retry.", error_message: ex.to_s, error_class: ex.class.to_s - raise ex - rescue => ex - log.error "unexpected error", error_message: ex.to_s, error_class: ex.class.to_s + publish(topic, messages) unless messages.empty? + rescue Fluent::GcloudPubSub::RetryableError => e + log.warn "Retryable error occurs. Fluentd will retry.", error_message: e.to_s, error_class: e.class.to_s + raise e + rescue StandardError => e + log.error "unexpected error", error_message: e.to_s, error_class: e.class.to_s log.error_backtrace - raise ex + raise e end private def publish(topic, messages) - log.debug "send message topic:#{topic} length:#{messages.length} size:#{messages.map(&:bytesize).inject(:+)}" - @publisher.publish(topic, messages) + size = messages.map(&:bytesize).inject(:+) + log.debug "send message topic:#{topic} length:#{messages.length} size:#{size}" + + @messages_published.observe(common_labels, messages.length) + @bytes_published.observe(common_labels, size) + + @publisher.publish(topic, messages, @compress_batches) + end + + def common_labels + { topic: @topic } end end end diff --git a/test/plugin/test_in_gcloud_pubsub.rb b/test/plugin/test_in_gcloud_pubsub.rb index ad2588c..c4a5737 100644 --- a/test/plugin/test_in_gcloud_pubsub.rb +++ b/test/plugin/test_in_gcloud_pubsub.rb @@ -1,42 +1,49 @@ -require 'net/http' -require 'json' +# frozen_string_literal: true + +require "net/http" +require "json" require_relative "../test_helper" require "fluent/test/driver/input" class GcloudPubSubInputTest < Test::Unit::TestCase - CONFIG = %[ + CONFIG = %( tag test project project-test topic topic-test subscription subscription-test key key-test - ] + ) - DEFAULT_HOST = '127.0.0.1' - DEFAULT_PORT = 24680 + DEFAULT_HOST = "127.0.0.1" + DEFAULT_PORT = 24_680 class DummyInvalidMsgData def data - 'foo:bar' + "foo:bar" end end class DummyInvalidMessage def message DummyInvalidMsgData.new end + + def data + message.data + end + def attributes - {"attr_1" => "a", "attr_2" => "b"} + { "attr_1" => "a", "attr_2" => "b" } end end - def create_driver(conf=CONFIG) + def create_driver(conf = CONFIG) Fluent::Test::Driver::Input.new(Fluent::Plugin::GcloudPubSubInput).configure(conf) end def http_get(path) http = Net::HTTP.new(DEFAULT_HOST, DEFAULT_PORT) - req = Net::HTTP::Get.new(path, {'Content-Type' => 'application/x-www-form-urlencoded'}) + req = Net::HTTP::Get.new(path, { "Content-Type" => "application/x-www-form-urlencoded" }) http.request(req) end @@ -44,9 +51,9 @@ def http_get(path) Fluent::Test.setup end - sub_test_case 'configure' do - test 'all params are configured' do - d = create_driver(%[ + sub_test_case "configure" do + test "all params are configured" do + d = create_driver(%( tag test project project-test topic topic-test @@ -60,24 +67,24 @@ def http_get(path) enable_rpc true rpc_bind 127.0.0.1 rpc_port 24681 - ]) + )) - assert_equal('test', d.instance.tag) - assert_equal('project-test', d.instance.project) - assert_equal('topic-test', d.instance.topic) - assert_equal('subscription-test', d.instance.subscription) - assert_equal('key-test', d.instance.key) + assert_equal("test", d.instance.tag) + assert_equal("project-test", d.instance.project) + assert_equal("topic-test", d.instance.topic) + assert_equal("subscription-test", d.instance.subscription) + assert_equal("key-test", d.instance.key) assert_equal(2.0, d.instance.pull_interval) assert_equal(1000, d.instance.max_messages) assert_equal(true, d.instance.return_immediately) assert_equal(3, d.instance.pull_threads) - assert_equal(['attr-test'], d.instance.attribute_keys) + assert_equal(["attr-test"], d.instance.attribute_keys) assert_equal(true, d.instance.enable_rpc) - assert_equal('127.0.0.1', d.instance.rpc_bind) - assert_equal(24681, d.instance.rpc_port) + assert_equal("127.0.0.1", d.instance.rpc_bind) + assert_equal(24_681, d.instance.rpc_port) end - test 'default values are configured' do + test "default values are configured" do d = create_driver assert_equal(5.0, d.instance.pull_interval) assert_equal(100, d.instance.max_messages) @@ -85,21 +92,21 @@ def http_get(path) assert_equal(1, d.instance.pull_threads) assert_equal([], d.instance.attribute_keys) assert_equal(false, d.instance.enable_rpc) - assert_equal('0.0.0.0', d.instance.rpc_bind) - assert_equal(24680, d.instance.rpc_port) + assert_equal("0.0.0.0", d.instance.rpc_bind) + assert_equal(24_680, d.instance.rpc_port) end end - sub_test_case 'start' do + sub_test_case "start" do setup do @topic_mock = mock! - @pubsub_mock = mock!.topic('topic-test').at_least(1) { @topic_mock } + @pubsub_mock = mock!.topic("topic-test").at_least(1) { @topic_mock } stub(Google::Cloud::Pubsub).new { @pubsub_mock } end - test '40x error occurred on connecting to Pub/Sub' do - @topic_mock.subscription('subscription-test').once do - raise Google::Cloud::NotFoundError.new('TEST') + test "40x error occurred on connecting to Pub/Sub" do + @topic_mock.subscription("subscription-test").once do + raise Google::Cloud::NotFoundError, "TEST" end d = create_driver @@ -108,9 +115,9 @@ def http_get(path) end end - test '50x error occurred on connecting to Pub/Sub' do - @topic_mock.subscription('subscription-test').once do - raise Google::Cloud::UnavailableError.new('TEST') + test "50x error occurred on connecting to Pub/Sub" do + @topic_mock.subscription("subscription-test").once do + raise Google::Cloud::UnavailableError, "TEST" end d = create_driver @@ -119,8 +126,8 @@ def http_get(path) end end - test 'subscription is nil' do - @topic_mock.subscription('subscription-test').once { nil } + test "subscription is nil" do + @topic_mock.subscription("subscription-test").once { nil } d = create_driver assert_raise Fluent::GcloudPubSub::Error do @@ -129,18 +136,48 @@ def http_get(path) end end - sub_test_case 'emit' do + sub_test_case "emit" do class DummyMsgData def data '{"foo": "bar"}' end end + class DummyMessage def message DummyMsgData.new end + + def data + message.data + end + + def attributes + { "attr_1" => "a", "attr_2" => "b" } + end + end + + class DummyCompressedMessageData + attr_reader :data + + def initialize(messages) + @data = Zlib::Deflate.deflate(messages.join(30.chr)) + end + end + + class DummyCompressedMessage + attr_reader :message + + def initialize(messages) + @message = DummyCompressedMessageData.new(messages) + end + + def data + message.data + end + def attributes - {"attr_1" => "a", "attr_2" => "b"} + { "compression_algorithm" => "zlib" } end end @@ -148,6 +185,7 @@ class DummyMsgDataWithTagKey def initialize(tag) @tag = tag end + def data '{"foo": "bar", "test_tag_key": "' + @tag + '"}' end @@ -156,22 +194,28 @@ class DummyMessageWithTagKey def initialize(tag) @tag = tag end + def message DummyMsgDataWithTagKey.new @tag end + + def data + message.data + end + def attributes - {"attr_1" => "a", "attr_2" => "b"} + { "attr_1" => "a", "attr_2" => "b" } end end setup do @subscriber = mock! - @topic_mock = mock!.subscription('subscription-test') { @subscriber } - @pubsub_mock = mock!.topic('topic-test') { @topic_mock } + @topic_mock = mock!.subscription("subscription-test") { @subscriber } + @pubsub_mock = mock!.topic("topic-test") { @topic_mock } stub(Google::Cloud::Pubsub).new { @pubsub_mock } end - test 'empty' do + test "empty" do @subscriber.pull(immediate: true, max: 100).at_least(1) { [] } @subscriber.acknowledge.times(0) @@ -181,7 +225,7 @@ def attributes assert_true d.events.empty? end - test 'simple' do + test "simple" do messages = Array.new(1, DummyMessage.new) @subscriber.pull(immediate: true, max: 100).at_least(1) { messages } @subscriber.acknowledge(messages).at_least(1) @@ -190,14 +234,14 @@ def attributes d.run(expect_emits: 1, timeout: 3) emits = d.events - assert(1 <= emits.length) - emits.each do |tag, time, record| + assert(emits.length >= 1) + emits.each do |tag, _time, record| assert_equal("test", tag) - assert_equal({"foo" => "bar"}, record) + assert_equal({ "foo" => "bar" }, record) end end - test 'multithread' do + test "multithread" do messages = Array.new(1, DummyMessage.new) @subscriber.pull(immediate: true, max: 100).at_least(2) { messages } @subscriber.acknowledge(messages).at_least(2) @@ -206,18 +250,18 @@ def attributes d.run(expect_emits: 2, timeout: 1) emits = d.events - assert(2 <= emits.length) - emits.each do |tag, time, record| + assert(emits.length >= 2) + emits.each do |tag, _time, record| assert_equal("test", tag) - assert_equal({"foo" => "bar"}, record) + assert_equal({ "foo" => "bar" }, record) end end - test 'with tag_key' do + test "with tag_key" do messages = [ - DummyMessageWithTagKey.new('tag1'), - DummyMessageWithTagKey.new('tag2'), - DummyMessage.new + DummyMessageWithTagKey.new("tag1"), + DummyMessageWithTagKey.new("tag2"), + DummyMessage.new, ] @subscriber.pull(immediate: true, max: 100).at_least(1) { messages } @subscriber.acknowledge(messages).at_least(1) @@ -226,18 +270,18 @@ def attributes d.run(expect_emits: 1, timeout: 3) emits = d.events - assert(3 <= emits.length) + assert(emits.length >= 3) # test tag assert_equal("tag1", emits[0][0]) assert_equal("tag2", emits[1][0]) assert_equal("test", emits[2][0]) # test record - emits.each do |tag, time, record| - assert_equal({"foo" => "bar"}, record) + emits.each do |_tag, _time, record| + assert_equal({ "foo" => "bar" }, record) end end - test 'invalid messages with parse_error_action exception ' do + test "invalid messages with parse_error_action exception " do messages = Array.new(1, DummyInvalidMessage.new) @subscriber.pull(immediate: true, max: 100).at_least(1) { messages } @subscriber.acknowledge.times(0) @@ -247,7 +291,7 @@ def attributes assert_true d.events.empty? end - test 'with attributes' do + test "with attributes" do messages = Array.new(1, DummyMessage.new) @subscriber.pull(immediate: true, max: 100).at_least(1) { messages } @subscriber.acknowledge(messages).at_least(1) @@ -256,14 +300,38 @@ def attributes d.run(expect_emits: 1, timeout: 3) emits = d.events - assert(1 <= emits.length) - emits.each do |tag, time, record| + assert(emits.length >= 1) + emits.each do |tag, _time, record| assert_equal("test", tag) - assert_equal({"foo" => "bar", "attr_1" => "a"}, record) + assert_equal({ "foo" => "bar", "attr_1" => "a" }, record) end end - test 'invalid messages with parse_error_action warning' do + test "compressed batch of messages" do + original_messages = [ + { foo: "bar" }, + { baz: "qux" }, + ] + messages = Array.new(1, DummyCompressedMessage.new(original_messages.map(&:to_json))) + + @subscriber.pull(immediate: true, max: 100).once { messages } + @subscriber.acknowledge(messages).at_least(1) + + d = create_driver + d.run(expect_emits: 1, timeout: 3) + emits = d.events + + output_records = emits.map do |e| + # Pick out only the record element, i.e. ignore the time and tag + record = e[2] + # Convert the keys from strings to symbols, to allow for strict comparison + record.map { |k, v| [k.to_sym, v] }.to_h + end + + assert_equal(original_messages, output_records) + end + + test "invalid messages with parse_error_action warning" do messages = Array.new(1, DummyInvalidMessage.new) @subscriber.pull(immediate: true, max: 100).at_least(1) { messages } @subscriber.acknowledge(messages).at_least(1) @@ -273,21 +341,21 @@ def attributes assert_true d.events.empty? end - test 'retry if raised error' do + test "retry if raised error" do class UnknownError < StandardError end - @subscriber.pull(immediate: true, max: 100).at_least(2) { raise UnknownError.new('test') } + @subscriber.pull(immediate: true, max: 100).at_least(2) { raise UnknownError, "test" } @subscriber.acknowledge.times(0) - d = create_driver(CONFIG + 'pull_interval 0.5') + d = create_driver(CONFIG + "pull_interval 0.5") d.run(expect_emits: 1, timeout: 0.8) assert_equal(0.5, d.instance.pull_interval) assert_true d.events.empty? end - test 'retry if raised RetryableError on pull' do - @subscriber.pull(immediate: true, max: 100).at_least(2) { raise Google::Cloud::UnavailableError.new('TEST') } + test "retry if raised RetryableError on pull" do + @subscriber.pull(immediate: true, max: 100).at_least(2) { raise Google::Cloud::UnavailableError, "TEST" } @subscriber.acknowledge.times(0) d = create_driver("#{CONFIG}\npull_interval 0.5") @@ -297,24 +365,24 @@ class UnknownError < StandardError assert_true d.events.empty? end - test 'retry if raised RetryableError on acknowledge' do + test "retry if raised RetryableError on acknowledge" do messages = Array.new(1, DummyMessage.new) @subscriber.pull(immediate: true, max: 100).at_least(2) { messages } - @subscriber.acknowledge(messages).at_least(2) { raise Google::Cloud::UnavailableError.new('TEST') } + @subscriber.acknowledge(messages).at_least(2) { raise Google::Cloud::UnavailableError, "TEST" } d = create_driver("#{CONFIG}\npull_interval 0.5") d.run(expect_emits: 2, timeout: 3) emits = d.events # not acknowledged, but already emitted to engine. - assert(2 <= emits.length) - emits.each do |tag, time, record| + assert(emits.length >= 2) + emits.each do |tag, _time, record| assert_equal("test", tag) - assert_equal({"foo" => "bar"}, record) + assert_equal({ "foo" => "bar" }, record) end end - test 'stop by http rpc' do + test "stop by http rpc" do messages = Array.new(1, DummyMessage.new) @subscriber.pull(immediate: true, max: 100).once { messages } @subscriber.acknowledge(messages).once @@ -322,23 +390,23 @@ class UnknownError < StandardError d = create_driver("#{CONFIG}\npull_interval 1.0\nenable_rpc true") assert_equal(false, d.instance.instance_variable_get(:@stop_pull)) - d.run { - http_get('/api/in_gcloud_pubsub/pull/stop') + d.run do + http_get("/api/in_gcloud_pubsub/pull/stop") sleep 0.75 # d.run sleeps 0.5 sec - } + end emits = d.events assert_equal(1, emits.length) assert_true d.instance.instance_variable_get(:@stop_pull) - emits.each do |tag, time, record| + emits.each do |tag, _time, record| assert_equal("test", tag) - assert_equal({"foo" => "bar"}, record) + assert_equal({ "foo" => "bar" }, record) end end - test 'start by http rpc' do + test "start by http rpc" do messages = Array.new(1, DummyMessage.new) @subscriber.pull(immediate: true, max: 100).at_least(1) { messages } @subscriber.acknowledge(messages).at_least(1) @@ -347,41 +415,41 @@ class UnknownError < StandardError d.instance.stop_pull assert_equal(true, d.instance.instance_variable_get(:@stop_pull)) - d.run(expect_emits: 1, timeout: 3) { - http_get('/api/in_gcloud_pubsub/pull/start') + d.run(expect_emits: 1, timeout: 3) do + http_get("/api/in_gcloud_pubsub/pull/start") sleep 0.75 # d.run sleeps 0.5 sec - } + end emits = d.events - assert_equal(true, emits.length > 0) + assert_equal(true, !emits.empty?) assert_false d.instance.instance_variable_get(:@stop_pull) - emits.each do |tag, time, record| + emits.each do |tag, _time, record| assert_equal("test", tag) - assert_equal({"foo" => "bar"}, record) + assert_equal({ "foo" => "bar" }, record) end end - test 'get status by http rpc when started' do + test "get status by http rpc when started" do d = create_driver("#{CONFIG}\npull_interval 1.0\nenable_rpc true") assert_false d.instance.instance_variable_get(:@stop_pull) - d.run { - res = http_get('/api/in_gcloud_pubsub/pull/status') - assert_equal({"ok" => true, "status" => "started"}, JSON.parse(res.body)) - } + d.run do + res = http_get("/api/in_gcloud_pubsub/pull/status") + assert_equal({ "ok" => true, "status" => "started" }, JSON.parse(res.body)) + end end - test 'get status by http rpc when stopped' do + test "get status by http rpc when stopped" do d = create_driver("#{CONFIG}\npull_interval 1.0\nenable_rpc true") d.instance.stop_pull assert_true d.instance.instance_variable_get(:@stop_pull) - d.run { - res = http_get('/api/in_gcloud_pubsub/pull/status') - assert_equal({"ok" => true, "status" => "stopped"}, JSON.parse(res.body)) - } + d.run do + res = http_get("/api/in_gcloud_pubsub/pull/status") + assert_equal({ "ok" => true, "status" => "stopped" }, JSON.parse(res.body)) + end end end end diff --git a/test/plugin/test_out_gcloud_pubsub.rb b/test/plugin/test_out_gcloud_pubsub.rb index e105c86..cfd1b31 100644 --- a/test/plugin/test_out_gcloud_pubsub.rb +++ b/test/plugin/test_out_gcloud_pubsub.rb @@ -1,4 +1,7 @@ -# coding: utf-8 +# frozen_string_literal: true + +require "zlib" + require_relative "../test_helper" require "fluent/test/driver/output" require "fluent/test/helpers" @@ -6,11 +9,11 @@ class GcloudPubSubOutputTest < Test::Unit::TestCase include Fluent::Test::Helpers - CONFIG = %[ + CONFIG = %( project project-test topic topic-test key key-test - ] + ) ReRaisedError = Class.new(RuntimeError) @@ -23,48 +26,59 @@ def create_driver(conf = CONFIG) end setup do - @time = event_time('2016-07-09 11:12:13 UTC') + @time = event_time("2016-07-09 11:12:13 UTC") end - sub_test_case 'configure' do - test 'default values are configured' do - d = create_driver(%[ + sub_test_case "configure" do + test "default values are configured" do + d = create_driver(%( project project-test topic topic-test key key-test - ]) + )) - assert_equal('project-test', d.instance.project) - assert_equal('topic-test', d.instance.topic) - assert_equal('key-test', d.instance.key) + assert_equal("project-test", d.instance.project) + assert_equal("topic-test", d.instance.topic) + assert_equal("key-test", d.instance.key) assert_equal(false, d.instance.autocreate_topic) assert_equal(1000, d.instance.max_messages) - assert_equal(9800000, d.instance.max_total_size) - assert_equal(4000000, d.instance.max_message_size) + assert_equal(9_800_000, d.instance.max_total_size) + assert_equal(4_000_000, d.instance.max_message_size) end test '"topic" must be specified' do assert_raises Fluent::ConfigError do - create_driver(%[ + create_driver(%( project project-test key key-test - ]) + )) end end test '"autocreate_topic" can be specified' do - d = create_driver(%[ + d = create_driver(%( project project-test topic topic-test key key-test autocreate_topic true - ]) + )) assert_equal(true, d.instance.autocreate_topic) end + + test "'attribute_keys' cannot be used with 'compress_batches'" do + assert_raise(Fluent::ConfigError.new(":attribute_keys cannot be used when compression is enabled")) do + create_driver(%( + project project-test + topic topic-test + attribute_keys attr-test + compress_batches true + )) + end + end end - sub_test_case 'topic' do + sub_test_case "topic" do setup do @publisher = mock! @pubsub_mock = mock! @@ -72,77 +86,79 @@ def create_driver(conf = CONFIG) end test '"autocreate_topic" is enabled' do - d = create_driver(%[ + d = create_driver(%( project project-test topic topic-test key key-test autocreate_topic true - ]) + )) @publisher.publish.once @pubsub_mock.topic("topic-test").once { nil } @pubsub_mock.create_topic("topic-test").once { @publisher } d.run(default_tag: "test") do - d.feed(@time, {"a" => "b"}) + d.feed(@time, { "a" => "b" }) end end - test '40x error occurred on connecting to Pub/Sub' do + test "40x error occurred on connecting to Pub/Sub" do d = create_driver - @pubsub_mock.topic('topic-test').once do - raise Google::Cloud::NotFoundError.new('TEST') + @pubsub_mock.topic("topic-test").once do + raise Google::Cloud::NotFoundError, "TEST" end assert_raise Google::Cloud::NotFoundError do d.run(default_tag: "test") do - d.feed(@time, {"a" => "b"}) + d.feed(@time, { "a" => "b" }) end end end - test '50x error occurred on connecting to Pub/Sub' do + test "50x error occurred on connecting to Pub/Sub" do d = create_driver - @pubsub_mock.topic('topic-test').once do - raise Google::Cloud::UnavailableError.new('TEST') + @pubsub_mock.topic("topic-test").once do + raise Google::Cloud::UnavailableError, "TEST" end assert_raise Fluent::GcloudPubSub::RetryableError do d.run(default_tag: "test") do - d.feed(@time, {"a" => "b"}) + d.feed(@time, { "a" => "b" }) end end end - test 'topic is nil' do + test "topic is nil" do d = create_driver - @pubsub_mock.topic('topic-test').once { nil } + @pubsub_mock.topic("topic-test").once { nil } assert_raise Fluent::GcloudPubSub::Error do d.run(default_tag: "test") do - d.feed(@time, {"a" => "b"}) + d.feed(@time, { "a" => "b" }) end end end test 'messages exceeding "max_message_size" are not published' do - d = create_driver(%[ + d = create_driver(%( project project-test topic topic-test key key-test max_message_size 1000 - ]) + )) @publisher.publish.times(0) d.run(default_tag: "test") do - d.feed(@time, {"a" => "a" * 1000}) + d.feed(@time, { "a" => "a" * 1000 }) end end end - sub_test_case 'publish' do + # Rubocop will erroneously correct the MessagePack.unpack call, for which there's no `unpack1` equivalent method. + # rubocop:disable Style/UnpackFirst + sub_test_case "publish" do setup do @publisher = mock! @pubsub_mock = mock!.topic(anything) { @publisher } @@ -155,25 +171,25 @@ def create_driver(conf = CONFIG) d.run(default_tag: "test") do # max_messages is default 1000 1001.times do |i| - d.feed(@time, {"a" => i}) + d.feed(@time, { "a" => i }) end end end test 'messages are divided into "max_total_size"' do - d = create_driver(%[ + d = create_driver(%( project project-test topic topic-test key key-test max_messages 100000 max_total_size 1000 - ]) + )) @publisher.publish.times(2) d.run(default_tag: "test") do # 400 * 4 / max_total_size = twice 4.times do - d.feed(@time, {"a" => "a" * 400}) + d.feed(@time, { "a" => "a" * 400 }) end end end @@ -183,44 +199,110 @@ def create_driver(conf = CONFIG) d = create_driver @publisher.publish.once d.run(default_tag: "test") do - d.feed(@time, {"a" => "あああ".force_encoding("ASCII-8BIT")}) + d.feed(@time, { "a" => "あああ".dup.force_encoding("ASCII-8BIT") }) end end - test 'reraise unexpected errors' do + test "reraise unexpected errors" do d = create_driver @publisher.publish.once { raise ReRaisedError } assert_raises ReRaisedError do d.run(default_tag: "test") do - d.feed([{'a' => 1, 'b' => 2}]) + d.feed([{ "a" => 1, "b" => 2 }]) end end end - test 'reraise RetryableError' do + test "reraise RetryableError" do d = create_driver - @publisher.publish.once { raise Google::Cloud::UnavailableError.new('TEST') } + @publisher.publish.once { raise Google::Cloud::UnavailableError, "TEST" } assert_raises Fluent::GcloudPubSub::RetryableError do d.run(default_tag: "test") do - d.feed([{'a' => 1, 'b' => 2}]) + d.feed([{ "a" => 1, "b" => 2 }]) end end end - test 'inject section' do - d = create_driver(%[ + test "inject section" do + d = create_driver(%( project project-test topic topic-test key key-test tag_key tag - ]) + )) @publisher.publish.once - d.run(default_tag: 'test') do - d.feed({"foo" => "bar"}) + d.run(default_tag: "test") do + d.feed({ "foo" => "bar" }) + end + assert_equal({ "tag" => "test", "foo" => "bar" }, JSON.parse(MessagePack.unpack(d.formatted.first)[0])) + end + + test "compressed batch" do + d = create_driver(%( + project project-test + topic topic-test + key key-test + compress_batches true + max_messages 2 + )) + + # This is a little hacky: we're doing assertions via matchers. + # The RR library doesn't seem to provide an easy alternative to this. The + # downside of this approach is that you will not receive a nice diff if + # the expectation fails. + first_batch = [ + '{"foo":"bar"}' + "\n", + '{"foo":123}' + "\n", + ] + @publisher.publish(ZlibCompressedBatch.new(first_batch), { compression_algorithm: "zlib" }).once + + second_batch = [ + '{"msg":"last"}' + "\n", + ] + @publisher.publish(ZlibCompressedBatch.new(second_batch), { compression_algorithm: "zlib" }).once + + d.run(default_tag: "test") do + d.feed({ "foo" => "bar" }) + d.feed({ "foo" => 123 }) + d.feed({ "msg" => "last" }) end - assert_equal({"tag" => 'test', "foo" => "bar"}, JSON.parse(MessagePack.unpack(d.formatted.first)[0])) end end + # rubocop:enable Style/UnpackFirst +end + +private + +# A matcher for a compressed batch of messages +# https://www.rubydoc.info/gems/rr/1.2.1/RR/WildcardMatchers +class ZlibCompressedBatch + attr_reader :expected_messages + + def initialize(expected_messages) + @expected_messages = expected_messages + end + + def wildcard_match?(other) + return true if self == other + + return false unless other.is_a?(String) + + decompressed = Zlib::Inflate.inflate(other) + other_messages = decompressed.split(30.chr) + + other_messages == expected_messages + end + + def ==(other) + other.is_a?(self.class) && + other.expected_messages == expected_messages + end + + alias eql? == + + def inspect + "contains compressed messages: #{expected_messages}" + end end diff --git a/test/test_helper.rb b/test/test_helper.rb index 190f6d0..1920c13 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -1,31 +1,33 @@ -require 'rubygems' -require 'bundler' +# frozen_string_literal: true + +require "rubygems" +require "bundler" begin Bundler.setup(:default, :development) rescue Bundler::BundlerError => e - $stderr.puts e.message - $stderr.puts "Run `bundle install` to install missing gems" + warn e.message + warn "Run `bundle install` to install missing gems" exit e.status_code end -require 'test/unit' +require "test/unit" require "test/unit/rr" -$LOAD_PATH.unshift(File.join(File.dirname(__FILE__), '..', 'lib')) +$LOAD_PATH.unshift(File.join(File.dirname(__FILE__), "..", "lib")) $LOAD_PATH.unshift(File.dirname(__FILE__)) -require 'fluent/test' -unless ENV.has_key?('VERBOSE') +require "fluent/test" +unless ENV.key?("VERBOSE") nulllogger = Object.new - nulllogger.instance_eval {|obj| + nulllogger.instance_eval do |_obj| def method_missing(method, *args) # pass end - } + end $log = nulllogger end -require 'fluent/plugin/in_gcloud_pubsub' -require 'fluent/plugin/out_gcloud_pubsub' +require "fluent/plugin/in_gcloud_pubsub" +require "fluent/plugin/out_gcloud_pubsub" class Test::Unit::TestCase end