Skip to content

Commit

Permalink
Merge pull request #3479 from cosmo0920/use-metrics-plugin-mechanism-…
Browse files Browse the repository at this point in the history
…on-plugin-base-classes

input: filter: output: bare_output: multi_output: Use metrics plugin mechanism on plugin base classes
  • Loading branch information
cosmo0920 committed Aug 2, 2021
2 parents ad45a8d + aa79a4c commit b28f7d7
Show file tree
Hide file tree
Showing 15 changed files with 391 additions and 91 deletions.
15 changes: 9 additions & 6 deletions lib/fluent/compat/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ def configure(conf)
# original implementation of v0.12 BufferedOutput
def emit(tag, es, chain, key="")
# this method will not be used except for the case that plugin calls super
@emit_count += 1
@emit_count_metrics.inc
data = format_stream(tag, es)
if @buffer.emit(key, data, chain)
submit_flush
Expand All @@ -337,22 +337,23 @@ def format_stream(tag, es)
# because v0.12 BufferedOutput may overrides #format_stream, but original #handle_stream_simple method doesn't consider about it
def handle_stream_simple(tag, es, enqueue: false)
if @overrides_emit
current_emit_count = @emit_count
current_emit_count = @emit_count_metrics.get
size = es.size
key = data = nil
begin
emit(tag, es, NULL_OUTPUT_CHAIN)
key, data = self.last_emit_via_buffer
ensure
@emit_count = current_emit_count
@emit_count_metrics.set(current_emit_count)
self.last_emit_via_buffer = nil
end
# on-the-fly key assignment can be done, and it's not configurable if Plugin#emit does it dynamically
meta = @buffer.metadata(variables: (key && !key.empty? ? {key: key} : nil))
write_guard do
@buffer.write({meta => data}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue)
end
@counter_mutex.synchronize{ @emit_records += size }
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
return [meta]
end

Expand All @@ -363,7 +364,8 @@ def handle_stream_simple(tag, es, enqueue: false)
write_guard do
@buffer.write({meta => bulk}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue)
end
@counter_mutex.synchronize{ @emit_records += size }
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
return [meta]
end

Expand All @@ -373,7 +375,8 @@ def handle_stream_simple(tag, es, enqueue: false)
write_guard do
@buffer.write({meta => data}, enqueue: enqueue)
end
@counter_mutex.synchronize{ @emit_records += size }
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
[meta]
end

Expand Down
57 changes: 49 additions & 8 deletions lib/fluent/plugin/bare_output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,37 +23,78 @@
module Fluent
module Plugin
class BareOutput < Base
include PluginHelper::Mixin # for metrics

# DO NOT USE THIS plugin for normal output plugin. Use Output instead.
# This output plugin base class is only for meta-output plugins
# which cannot be implemented on MultiOutput.
# E.g,: forest, config-expander

helpers_internal :metrics

include PluginId
include PluginLoggerMixin
include PluginHelper::Mixin

attr_reader :num_errors, :emit_count, :emit_records

def process(tag, es)
raise NotImplementedError, "BUG: output plugins MUST implement this method"
end

def num_errors
@num_errors_metrics.get
end

def emit_count
@emit_count_metrics.get
end

def emit_size
@emit_size_metrics.get
end

def emit_records
@emit_records_metrics.get
end

def initialize
super
@counter_mutex = Mutex.new
# TODO: well organized counters
@num_errors = 0
@emit_count = 0
@emit_records = 0
@num_errors_metrics = nil
@emit_count_metrics = nil
@emit_records_metrics = nil
@emit_size_metrics = nil
end

def configure(conf)
super

@num_errors_metrics = metrics_create(namespace: "fluentd", subsystem: "bare_output", name: "num_errors", help_text: "Number of count num errors")
@emit_count_metrics = metrics_create(namespace: "fluentd", subsystem: "bare_output", name: "emit_records", help_text: "Number of count emits")
@emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "bare_output", name: "emit_records", help_text: "Number of emit records")
@emit_size_metrics = metrics_create(namespace: "fluentd", subsystem: "bare_output", name: "emit_size", help_text: "Total size of emit events")
@enable_size_metrics = !!system_config.enable_size_metrics
end

def statistics
stats = {
'num_errors' => @num_errors_metrics.get,
'emit_records' => @emit_records_metrics.get,
'emit_count' => @emit_count_metrics.get,
'emit_size' => @emit_size_metrics.get,
}

{ 'bare_output' => stats }
end

def emit_sync(tag, es)
@counter_mutex.synchronize{ @emit_count += 1 }
@emit_count_metrics.inc
begin
process(tag, es)
@counter_mutex.synchronize{ @emit_records += es.size }
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
rescue
@counter_mutex.synchronize{ @num_errors += 1 }
@num_errors_metrics.inc
raise
end
end
Expand Down
26 changes: 17 additions & 9 deletions lib/fluent/plugin/filter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,39 +28,47 @@ class Filter < Base
include PluginLoggerMixin
include PluginHelper::Mixin

helpers_internal :event_emitter
helpers_internal :event_emitter, :metrics

attr_reader :has_filter_with_time

def initialize
super
@has_filter_with_time = has_filter_with_time?
@emit_records = 0
@emit_size = 0
@emit_records_metrics = nil
@emit_size_metrics = nil
@counter_mutex = Mutex.new
@enable_size_metrics = false
end

def emit_records
@emit_records_metrics.get
end

def emit_size
@emit_size_metrics.get
end

def configure(conf)
super

@emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "filter", name: "emit_records", help_text: "Number of count emit records")
@emit_size_metrics = metrics_create(namespace: "fluentd", subsystem: "filter", name: "emit_size", help_text: "Total size of emit events")
@enable_size_metrics = !!system_config.enable_size_metrics
end

def statistics
stats = {
'emit_records' => @emit_records,
'emit_size' => @emit_size,
'emit_records' => @emit_records_metrics.get,
'emit_size' => @emit_size_metrics.get,
}

{ 'filter' => stats }
end

def measure_metrics(es)
@counter_mutex.synchronize do
@emit_records += es.size
@emit_size += es.to_msgpack_stream.bytesize if @enable_size_metrics
end
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
end

def filter(tag, time, record)
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/in_monitor_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ def start
'buffer_queue_length' => ->(){ throw(:skip) unless instance_variable_defined?(:@buffer) && !@buffer.nil? && @buffer.is_a?(::Fluent::Plugin::Buffer); @buffer.queue.size },
'buffer_timekeys' => ->(){ throw(:skip) unless instance_variable_defined?(:@buffer) && !@buffer.nil? && @buffer.is_a?(::Fluent::Plugin::Buffer); @buffer.timekeys },
'buffer_total_queued_size' => ->(){ throw(:skip) unless instance_variable_defined?(:@buffer) && !@buffer.nil? && @buffer.is_a?(::Fluent::Plugin::Buffer); @buffer.stage_size + @buffer.queue_size },
'retry_count' => ->(){ instance_variable_defined?(:@num_errors) ? @num_errors : nil },
'retry_count' => ->(){ respond_to?(:num_errors) ? num_errors : nil },
}

def all_plugins
Expand Down
26 changes: 17 additions & 9 deletions lib/fluent/plugin/input.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,44 @@ class Input < Base
include PluginLoggerMixin
include PluginHelper::Mixin

helpers_internal :event_emitter
helpers_internal :event_emitter, :metrics

def initialize
super
@emit_records = 0
@emit_size = 0
@emit_records_metrics = nil
@emit_size_metrics = nil
@counter_mutex = Mutex.new
@enable_size_metrics = false
end

def emit_records
@emit_records_metrics.get
end

def emit_size
@emit_size_metrics.get
end

def configure(conf)
super

@emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "emit_records", help_text: "Number of count emit records")
@emit_size_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "emit_size", help_text: "Total size of emit events")
@enable_size_metrics = !!system_config.enable_size_metrics
end

def statistics
stats = {
'emit_records' => @emit_records,
'emit_size' => @emit_size,
'emit_records' => @emit_records_metrics.get,
'emit_size' => @emit_size_metrics.get,
}

{ 'input' => stats }
end

def metric_callback(es)
@counter_mutex.synchronize do
@emit_records += es.size
@emit_size += es.to_msgpack_stream.bytesize if @enable_size_metrics
end
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
end

def multi_workers_ready?
Expand Down
49 changes: 43 additions & 6 deletions lib/fluent/plugin/multi_output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class MultiOutput < Base
include PluginHelper::Mixin # for event_emitter

helpers :event_emitter # to get router from agent, which will be supplied to child plugins
helpers_internal :metrics

config_section :store, param_name: :stores, multi: true, required: true do
config_argument :arg, :string, default: ''
Expand All @@ -46,11 +47,40 @@ def initialize

@counter_mutex = Mutex.new
# TODO: well organized counters
@num_errors = 0
@emit_count = 0
@emit_records = 0
@num_errors_metrics = nil
@emit_count_metrics = nil
@emit_records_metrics = nil
@emit_size_metrics = nil
# @write_count = 0
# @rollback_count = 0
@enable_size_metrics = false
end

def num_errors
@num_errors_metrics.get
end

def emit_count
@emit_count_metrics.get
end

def emit_size
@emit_size_metrics.get
end

def emit_records
@emit_records_metrics.get
end

def statistics
stats = {
'num_errors' => @num_errors_metrics.get,
'emit_records' => @emit_records_metrics.get,
'emit_count' => @emit_count_metrics.get,
'emit_size' => @emit_size_metrics.get,
}

{ 'multi_output' => stats }
end

def multi_output?
Expand All @@ -60,6 +90,12 @@ def multi_output?
def configure(conf)
super

@num_errors_metrics = metrics_create(namespace: "fluentd", subsystem: "multi_output", name: "num_errors", help_text: "Number of count num errors")
@emit_count_metrics = metrics_create(namespace: "fluentd", subsystem: "multi_output", name: "emit_records", help_text: "Number of count emits")
@emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "multi_output", name: "emit_records", help_text: "Number of emit records")
@emit_size_metrics = metrics_create(namespace: "fluentd", subsystem: "multi_output", name: "emit_size", help_text: "Total size of emit events")
@enable_size_metrics = !!system_config.enable_size_metrics

@stores.each do |store|
store_conf = store.corresponding_config_element
type = store_conf['@type']
Expand Down Expand Up @@ -143,12 +179,13 @@ def terminate
end

def emit_sync(tag, es)
@counter_mutex.synchronize{ @emit_count += 1 }
@emit_count_metrics.inc
begin
process(tag, es)
@counter_mutex.synchronize{ @emit_records += es.size }
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
rescue
@counter_mutex.synchronize{ @num_errors += 1 }
@num_errors_metrics.inc
raise
end
end
Expand Down

0 comments on commit b28f7d7

Please sign in to comment.