Skip to content

Commit

Permalink
Support unixtime_micros and unixtime_nanos to inject time
Browse files Browse the repository at this point in the history
If we collect docker container logs using the fluentd logging
driver, we need to sort the logs by the time to merge logs from
various containers. Although we can now specify unixtime_millis,
logs with line feed sometimes have the same time in milliseconds.
As a result, the logs are sorted in the wrong order.
This commit will resolve the problem by introducing new time types
of unixtime with higher precision.

Signed-off-by: abicky <takeshi.arabiki@gmail.com>
  • Loading branch information
abicky committed Dec 31, 2020
1 parent f7caa67 commit 829e90f
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 1 deletion.
4 changes: 3 additions & 1 deletion lib/fluent/plugin_helper/inject.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ module InjectParams
config_param :time_key, :string, default: nil

# To avoid defining :time_type twice
config_param :time_type, :enum, list: [:float, :unixtime, :unixtime_millis, :string], default: :float
config_param :time_type, :enum, list: [:float, :unixtime, :unixtime_millis, :unixtime_micros, :unixtime_nanos, :string], default: :float

Fluent::TimeMixin::TIME_PARAMETERS.each do |name, type, opts|
config_param(name, type, **opts)
Expand Down Expand Up @@ -133,6 +133,8 @@ def configure(conf)
@_inject_time_formatter = case @inject_config.time_type
when :float then ->(time){ time.to_r.truncate(+6).to_f } # microsecond floating point value
when :unixtime_millis then ->(time) { time.respond_to?(:nsec) ? time.to_i * 1_000 + time.nsec / 1_000_000 : time * 1_000 }
when :unixtime_micros then ->(time) { time.respond_to?(:nsec) ? time.to_i * 1_000_000 + time.nsec / 1_000 : time * 1_000_000 }
when :unixtime_nanos then ->(time) { time.respond_to?(:nsec) ? time.to_i * 1_000_000_000 + time.nsec : time * 1_000_000_000 }
when :unixtime then ->(time){ time.to_i }
else
localtime = @inject_config.localtime && !@inject_config.utc
Expand Down
28 changes: 28 additions & 0 deletions test/plugin_helper/test_inject.rb
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,34 @@ def config_inject_section(hash = {})
assert_equal record.merge({"timedata" => time_in_unix * 1_000}), @d.inject_values_to_record('tag', time_in_unix, record)
end

test 'injects time as unix time micros into specified key' do
time_in_unix = Time.parse("2016-06-21 08:10:11 +0900").to_i
time_subsecond = 320_101_224
time = Fluent::EventTime.new(time_in_unix, time_subsecond)
unixtime_micros = 1466464211320101

@d.configure(config_inject_section("time_key" => "timedata", "time_type" => "unixtime_micros"))
@d.start

record = {"key1" => "value1", "key2" => 2}
assert_equal record.merge({"timedata" => unixtime_micros}), @d.inject_values_to_record('tag', time, record)
assert_equal record.merge({"timedata" => time_in_unix * 1_000_000}), @d.inject_values_to_record('tag', time_in_unix, record)
end

test 'injects time as unix time nanos into specified key' do
time_in_unix = Time.parse("2016-06-21 08:10:11 +0900").to_i
time_subsecond = 320_101_224
time = Fluent::EventTime.new(time_in_unix, time_subsecond)
unixtime_nanos = 1466464211320101224

@d.configure(config_inject_section("time_key" => "timedata", "time_type" => "unixtime_nanos"))
@d.start

record = {"key1" => "value1", "key2" => 2}
assert_equal record.merge({"timedata" => unixtime_nanos}), @d.inject_values_to_record('tag', time, record)
assert_equal record.merge({"timedata" => time_in_unix * 1_000_000_000}), @d.inject_values_to_record('tag', time_in_unix, record)
end

test 'injects time as unix time into specified key' do
time_in_unix = Time.parse("2016-06-21 08:10:11 +0900").to_i
time_subsecond = 320_101_224
Expand Down

0 comments on commit 829e90f

Please sign in to comment.