Skip to content

Commit

Permalink
Publish to rubygems (#1)
Browse files Browse the repository at this point in the history
* Set a default ruby version

* Remove deprecated has_rdoc option

This has been deprecated [since 2011][0]

[0]: https://github.com/rubygems/rubygems/blob/1aebd7ddd69c9a38aa9daa3aa89f396d59c9e2a4/History.txt#L3296-L3297

* Add rubocop configuration

* Add rubocop fixups and configuration

Use the rubocop generated configuration to ignore any major changes that
can't be auto-corrected.

* Adjust supported Ruby versions

Ruby 2.4 is now end-of-life. As a result of this, the `google-protobuf`
gem is no longer releasing artifacts compatible with this version.
(protocolbuffers/protobuf#7453)

In CI we encounter this error from `bundle install`:
```
google-protobuf-3.12.0-x86_64-linux requires ruby version >= 2.5, which is
incompatible with the current version, ruby 2.4.6p354
```

Therefore remove Ruby 2.4.6 as a tested version in Travis CI.

Additionally remove the constraint on patch versions in the Travis
config, so that we'll use the latest patch version available for each
release branch.

* Add Prometheus metrics

Augment the existing operations with Prometheus metrics in order to
provide observability around the operations that the plugin is
performing.

Introduce a new metrics helper to prevent attempting to register the
same metric more than once in a multi-threaded or multi-instance
context.

* Add compress_batches feature

As per the README updates, this can be used to compress a number of
input records into a single Pub/Sub message, therefore saving on costs.

* Include gocardless branch in Travis config

* Minor README fixups

Clarify that the published bytes value is before compression.

Correct the name of the name of the compression ratio metric.

* publish to rubygems

Co-authored-by: Ben Wheatley <contact@benwh.com>
Co-authored-by: Calvin Aditya Jonathan <calvin.aditya@bukalapak.com>
  • Loading branch information
3 people committed Jan 5, 2021
1 parent e2d6ada commit 174c354
Show file tree
Hide file tree
Showing 16 changed files with 930 additions and 326 deletions.
46 changes: 46 additions & 0 deletions .rubocop.yml
@@ -0,0 +1,46 @@
---
inherit_from: .rubocop_todo.yml

AllCops:
# Matches the minimum version in .travis.yml
TargetRubyVersion: 2.4

Style/StringLiterals:
EnforcedStyle: "double_quotes"

# New cops: https://docs.rubocop.org/en/latest/versioning/
Layout/EmptyLinesAroundAttributeAccessor:
Enabled: true

Layout/SpaceAroundMethodCallOperator:
Enabled: true

Lint/RaiseException:
Enabled: true

Lint/StructNewOverride:
Enabled: true

Style/ExponentialNotation:
Enabled: true

Style/HashEachMethods:
Enabled: true

Style/HashTransformKeys:
Enabled: true

Style/HashTransformValues:
Enabled: true

Style/SlicingWithRange:
Enabled: true

Style/TrailingCommaInHashLiteral:
EnforcedStyleForMultiline: comma

Style/TrailingCommaInArrayLiteral:
EnforcedStyleForMultiline: comma

Style/TrailingCommaInArguments:
EnforcedStyleForMultiline: comma
98 changes: 98 additions & 0 deletions .rubocop_todo.yml
@@ -0,0 +1,98 @@
---
# This configuration was generated by
# `rubocop --auto-gen-config`
# on 2020-05-12 17:34:19 +0100 using RuboCop version 0.83.0.
# The point is for the user to remove these configuration records
# one by one as the offenses are removed from the code base.
# Note that changes in the inspected code, or installation of new
# versions of RuboCop, may require this file to be generated again.

# Offense count: 3
# Configuration parameters: IgnoredMethods.
Metrics/AbcSize:
Max: 50

Metrics/BlockLength:
Enabled: false

Metrics/ClassLength:
Enabled: false

Metrics/MethodLength:
Max: 30

# Offense count: 1
# Configuration parameters: IgnoredPatterns.
# SupportedStyles: snake_case, camelCase
Naming/MethodName:
EnforcedStyle: snake_case

# Offense count: 3
# Cop supports --auto-correct.
# Configuration parameters: AutoCorrect, EnforcedStyle.
# SupportedStyles: nested, compact
Style/ClassAndModuleChildren:
Exclude:
- 'lib/fluent/plugin/in_gcloud_pubsub.rb'
- 'lib/fluent/plugin/out_gcloud_pubsub.rb'
- 'test/test_helper.rb'

# Offense count: 6
Style/Documentation:
Exclude:
- 'spec/**/*'
- 'test/**/*'
- 'lib/fluent/plugin/gcloud_pubsub/client.rb'
- 'lib/fluent/plugin/in_gcloud_pubsub.rb'
- 'lib/fluent/plugin/out_gcloud_pubsub.rb'

# Offense count: 1
# Configuration parameters: AllowedVariables.
Style/GlobalVars:
Exclude:
- 'test/test_helper.rb'

# Offense count: 1
# Configuration parameters: MinBodyLength.
Style/GuardClause:
Exclude:
- 'lib/fluent/plugin/gcloud_pubsub/client.rb'

# Offense count: 2
# Cop supports --auto-correct.
Style/IfUnlessModifier:
Exclude:
- 'lib/fluent/plugin/gcloud_pubsub/client.rb'

# Offense count: 1
Style/MethodMissingSuper:
Exclude:
- 'test/test_helper.rb'

# Offense count: 1
Style/MissingRespondToMissing:
Exclude:
- 'test/test_helper.rb'

# Offense count: 260
# Cop supports --auto-correct.
# Configuration parameters: EnforcedStyle, ConsistentQuotesInMultiline.
# SupportedStyles: single_quotes, double_quotes
Style/StringLiterals:
Exclude:
- 'Gemfile'
- 'Rakefile'
- 'fluent-plugin-gcloud-pubsub-custom.gemspec'
- 'lib/fluent/plugin/gcloud_pubsub/client.rb'
- 'lib/fluent/plugin/in_gcloud_pubsub.rb'
- 'lib/fluent/plugin/out_gcloud_pubsub.rb'
- 'test/plugin/test_in_gcloud_pubsub.rb'
- 'test/plugin/test_out_gcloud_pubsub.rb'
- 'test/test_helper.rb'

# Offense count: 36
# Cop supports --auto-correct.
# Configuration parameters: AutoCorrect, AllowHeredoc, AllowURI, URISchemes, IgnoreCopDirectives, IgnoredPatterns.
# URISchemes: http, https
Layout/LineLength:
Max: 120
1 change: 1 addition & 0 deletions .ruby-version
@@ -0,0 +1 @@
2.6.5
13 changes: 8 additions & 5 deletions .travis.yml
@@ -1,20 +1,23 @@
language: ruby

rvm:
- 2.4.6
- 2.5.5
- 2.6.3
- 2.5
- 2.6
- 2.7
- ruby-head

gemfile:
- Gemfile
- Gemfile

branches:
only:
- master
- gocardless

before_install: gem update bundler
script: bundle exec rake test
script:
- bundle exec rake test
- bundle exec rubocop

sudo: false

Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
@@ -1,5 +1,9 @@
## ChangeLog

### Release 1.3.3 - 2021/01/05

- Add support for payload compression with `compress_batches`

### Release 1.3.2 - 2019/08/16

- Input plugin
Expand Down
4 changes: 3 additions & 1 deletion Gemfile
@@ -1,3 +1,5 @@
source 'https://rubygems.org'
# frozen_string_literal: true

source "https://rubygems.org"

gemspec
76 changes: 74 additions & 2 deletions README.md
Expand Up @@ -60,6 +60,7 @@ Use `gcloud_pubsub` output plugin.
max_messages 1000
max_total_size 9800000
max_message_size 4000000
compress_batches false
<buffer>
@type memory
flush_interval 1s
Expand Down Expand Up @@ -92,7 +93,11 @@ Use `gcloud_pubsub` output plugin.
- `max_message_size` (optional, default: `4000000` = `4MB`)
- Messages exceeding `max_message_size` are not published because Pub/Sub clients cannot receive it.
- `attribute_keys` (optional, default: `[]`)
- Publishing the set fields as attributes.
- Extract these fields from the record and send them as attributes on the Pub/Sub message. Cannot be set if compress_batches is enabled.
- `metric_prefix` (optional, default: `fluentd_output_gcloud_pubsub`)
- The prefix for Prometheus metric names
- `compress_batches` (optional, default: `false`)
- If set to `true`, messages will be batched and compressed before publication. See [message compression](#message-compression) for details.

### Pull messages

Expand Down Expand Up @@ -147,18 +152,85 @@ Use `gcloud_pubsub` input plugin.
- `pull_threads` (optional, default: `1`)
- Set number of threads to pull messages.
- `attribute_keys` (optional, default: `[]`)
- Specify the key of the attribute to be emitted as the field of record.
- Acquire these fields from attributes on the Pub/Sub message and merge them into the record.
- `parse_error_action` (optional, default: `exception`)
- Set error type when parsing messages fails.
- `exception`: Raise exception. Messages are not acknowledged.
- `warning`: Only logging as warning.
- `metric_prefix` (optional, default: `fluentd_input_gcloud_pubsub`)
- The prefix for Prometheus metric names
- `enable_rpc` (optional, default: `false`)
- If `true` is specified, HTTP RPC to stop or start pulling message is enabled.
- `rpc_bind` (optional, default: `0.0.0.0`)
- Bind IP address for HTTP RPC.
- `rpc_port` (optional, default: `24680`)
- Port for HTTP RPC.

## Message compression

The `compress_batches` option can be used to enable the compression of messages
_before_ publication to Pub/Sub.

This works by collecting the buffered messages, taking up to `max_total_size` or
`max_message_size` input records, then compressing them with Zlib (i.e.
gzip/Deflate) before publishing them as a single message to the Pub/Sub topic.

When transporting large volumes of records via Pub/Sub, e.g. multiple Terabytes
per month, this can lead to significant cost savings, as typically the CPU time
required to compress the messages will be minimal in comparison to the Pub/Sub
costs.

The compression ratio achievable will vary largely depending on the homogeneity
of the input records, but typically will be 50% at the very minimum and often
around 80-90%.

In order to achieve good compression, consider the following:
- Ensure that the buffer is being filled with a reasonable batch of messages: do
not use `flush_mode immediate`, and keep the `flush_interval` value
sufficiently high. Use the Prometheus metrics to determine how many records
are being published per message.
- Keep the `max_messages` and `max_message_size` values high (the defaults are
optimal).
- If there are many different sources of messages being mixed and routed to a
single `gcloud_pubsub` output, use multiple outputs (which will each have
their own buffer) through tagging or [labelling][fluentd-labels].

[fluentd-labels]: https://docs.fluentd.org/quickstart/life-of-a-fluentd-event#labels

The receiving end must be able to decode these compressed batches of messages,
which it can determine via an attribute set on the Pub/Sub message. The
`gcloud_pubsub` input plugin will do this transparently, decompressing any
messages which contain a batch of records and normally processing any messages
which represent just a single record.
Therefore, as long as all of the receivers are updated with support for
compressed batches first, it's then possible to gradually roll out this feature.

## Prometheus metrics

The input and output plugins expose several metrics in order to monitor
performance:

- `fluentd_output_gcloud_pubsub_compression_enabled`
- Gauge: Whether compression/batching is enabled
- `fluentd_output_gcloud_pubsub_messages_published_per_batch`
- Histogram: Number of records published to Pub/Sub per buffer flush
- `fluentd_output_gcloud_pubsub_messages_published_bytes`
- Histogram: Total size in bytes of the records published to Pub/Sub,
**before** compression.
- `fluentd_output_gcloud_pubsub_messages_compression_duration_seconds`
- Histogram: Time taken to compress a batch of messages
- `fluentd_output_gcloud_pubsub_messages_compressed_size_per_original_size_ratio`
- Histogram: Compression ratio achieved on a batch of messages, expressed in
terms of space saved.

- `fluentd_input_gcloud_pubsub_pull_errors_total`
- Counter: Errors encountered while pulling or processing messages (split by a
`retryable` label)
- `fluentd_input_gcloud_pubsub_messages_pulled`
- Histogram: Number of Pub/Sub messages pulled by the subscriber on each invocation
- `fluentd_input_gcloud_pubsub_messages_pulled_bytes`
- Histogram: Total size in bytes of the Pub/Sub messages pulled by the subscriber on each invocation

## Contributing

1. Fork it
Expand Down
12 changes: 7 additions & 5 deletions Rakefile
@@ -1,12 +1,14 @@
require 'bundler'
# frozen_string_literal: true

require "bundler"
Bundler::GemHelper.install_tasks

require 'rake/testtask'
require "rake/testtask"

Rake::TestTask.new(:test) do |test|
test.libs << 'lib' << 'test'
test.test_files = FileList['test/plugin/test_*.rb']
test.libs << "lib" << "test"
test.test_files = FileList["test/plugin/test_*.rb"]
test.verbose = true
end

task :default => [:build]
task default: [:build]
30 changes: 18 additions & 12 deletions fluent-plugin-gcloud-pubsub-custom.gemspec
@@ -1,26 +1,32 @@
# encoding: utf-8
$:.push File.expand_path('../lib', __FILE__)
# frozen_string_literal: true

$LOAD_PATH.push File.expand_path("lib", __dir__)

Gem::Specification.new do |gem|
gem.name = "fluent-plugin-gcloud-pubsub-custom"
gem.description = "Google Cloud Pub/Sub input/output plugin for Fluentd event collector"
gem.name = "fluent-plugin-gcloud-pubsub-custom-compress-batches"
gem.description = "Google Cloud Pub/Sub input/output plugin for Fluentd event collector - with payload compression. Forked from https://github.com/gocardless/fluent-plugin-gcloud-pubsub-custom"
gem.license = "MIT"
gem.homepage = "https://github.com/mia-0032/fluent-plugin-gcloud-pubsub-custom"
gem.summary = gem.description
gem.version = "1.3.2"
gem.authors = ["Yoshihiro MIYAI"]
gem.email = "msparrow17@gmail.com"
gem.has_rdoc = false
gem.homepage = "https://github.com/calvinaditya95/fluent-plugin-gcloud-pubsub-custom"
gem.summary = "Google Cloud Pub/Sub input/output plugin for Fluentd event collector - with payload compression"
gem.version = "1.3.3"
gem.authors = ["Calvin Aditya"]
gem.email = "calvin.aditya95@gmail.com"
gem.files = `git ls-files`.split("\n")
gem.test_files = `git ls-files -- {test,spec,features}/*`.split("\n")
gem.executables = `git ls-files -- bin/*`.split("\n").map{ |f| File.basename(f) }
gem.require_paths = ['lib']
gem.executables = `git ls-files -- bin/*`.split("\n").map { |f| File.basename(f) }
gem.require_paths = ["lib"]

gem.add_runtime_dependency "fluentd", [">= 0.14.15", "< 2"]
gem.add_runtime_dependency "google-cloud-pubsub", "~> 0.30.0"

# Use the same version constraint as fluent-plugin-prometheus currently specifies
gem.add_runtime_dependency "prometheus-client", "< 0.10"

gem.add_development_dependency "bundler"
gem.add_development_dependency "pry"
gem.add_development_dependency "pry-byebug"
gem.add_development_dependency "rake"
gem.add_development_dependency "rubocop", "~>0.83"
gem.add_development_dependency "test-unit"
gem.add_development_dependency "test-unit-rr"
end

0 comments on commit 174c354

Please sign in to comment.