Skip to content

Commit

Permalink
Merge pull request #2988 from fluent/use-eventtime
Browse files Browse the repository at this point in the history
Use EventTime.now instead of Engine.now
  • Loading branch information
repeatedly committed May 12, 2020
2 parents 2502cb8 + 02037cc commit 55d11dd
Show file tree
Hide file tree
Showing 10 changed files with 17 additions and 18 deletions.
4 changes: 2 additions & 2 deletions lib/fluent/plugin/in_dummy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@ def emit(num)
begin
if @size > 1
num.times do
router.emit_array(@tag, Array.new(@size) { [Fluent::Engine.now, generate] })
router.emit_array(@tag, Array.new(@size) { [Fluent::EventTime.now, generate] })
end
else
num.times { router.emit(@tag, Fluent::Engine.now, generate) }
num.times { router.emit(@tag, Fluent::EventTime.now, generate) }
end
rescue => _
# ignore all errors not to stop emits by emit errors
Expand Down
4 changes: 2 additions & 2 deletions lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ def on_message(msg, chunk_size, conn)
record = e[1]
next if record.nil?
time = e[0]
time = Fluent::Engine.now if time.nil? || time.to_i == 0 # `to_i == 0` for empty EventTime
time = Fluent::EventTime.now if time.nil? || time.to_i == 0 # `to_i == 0` for empty EventTime
es.add(time, record)
}
es
Expand All @@ -347,7 +347,7 @@ def on_message(msg, chunk_size, conn)
return msg[3] # retry never succeeded so return ack and drop incoming event.
end
return if record.nil?
time = Fluent::Engine.now if time.to_i == 0
time = Fluent::EventTime.now if time.to_i == 0
if @enable_field_injection
record[@source_address_key] = conn.remote_addr if @source_address_key
record[@source_hostname_key] = conn.remote_host if @source_hostname_key
Expand Down
4 changes: 2 additions & 2 deletions lib/fluent/plugin/in_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,9 @@ def on_request(path_info, params)
end
time = if param_time = params['time']
param_time = param_time.to_f
param_time.zero? ? Fluent::Engine.now : @float_time_parser.parse(param_time)
param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time)
else
record_time.nil? ? Fluent::Engine.now : record_time
record_time.nil? ? Fluent::EventTime.now : record_time
end
rescue
return ["400 Bad Request", {'Content-Type'=>'text/plain'}, "400 Bad Request\n#{$!}\n"]
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/in_monitor_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def start
opts = {with_config: false, with_retry: false}
timer_execute(:in_monitor_agent_emit, @emit_interval, repeat: true) {
es = Fluent::MultiEventStream.new
now = Fluent::Engine.now
now = Fluent::EventTime.now
plugins_info_all(opts).each { |record|
es.add(now, record)
}
Expand Down
4 changes: 2 additions & 2 deletions lib/fluent/plugin/in_unix.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def on_message(msg)
record = e[1]
next if record.nil?
time = e[0]
time = (now ||= Engine.now) if time.to_i == 0
time = (now ||= EventTime.now) if time.to_i == 0
es.add(time, record)
}
router.emit_stream(tag, es)
Expand All @@ -106,7 +106,7 @@ def on_message(msg)
return if record.nil?

time = msg[1]
time = Engine.now if time.to_i == 0
time = EventTime.now if time.to_i == 0
router.emit(tag, time, record)
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/out_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def configure(conf)
dummy_record_keys = get_placeholders_keys(@path_template) || ['message']
dummy_record = Hash[dummy_record_keys.zip(['data'] * dummy_record_keys.size)]

test_chunk1 = chunk_for_test(dummy_tag, Fluent::Engine.now, dummy_record)
test_chunk1 = chunk_for_test(dummy_tag, Fluent::EventTime.now, dummy_record)
test_path = extract_placeholders(@path_template, test_chunk1)
unless ::Fluent::FileUtil.writable_p?(test_path)
raise Fluent::ConfigError, "out_file: `#{test_path}` is not writable"
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ def resolved_host
@resolved_host ||= resolve_dns!

else
now = Fluent::Engine.now
now = Fluent::EventTime.now
rh = @resolved_host
if !rh || now - @resolved_time >= @sender.expire_dns_cache
rh = @resolved_host = resolve_dns!
Expand Down
4 changes: 2 additions & 2 deletions lib/fluent/test/filter_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ def initialize(klass, tag = 'filter.test', &block)
attr_reader :filtered
attr_accessor :tag

def emit(record, time = Engine.now)
def emit(record, time = EventTime.now)
emit_with_tag(@tag, record, time)
end
alias_method :filter, :emit

def emit_with_tag(tag, record, time = Engine.now)
def emit_with_tag(tag, record, time = EventTime.now)
@events[tag] ||= MultiEventStream.new
@events[tag].add(time, record)
end
Expand Down
6 changes: 3 additions & 3 deletions lib/fluent/test/output_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def initialize(klass, tag='test', &block)

attr_accessor :tag

def emit(record, time=Engine.now)
def emit(record, time=EventTime.now)
es = OneEventStream.new(time, record)
@instance.emit_events(@tag, es)
end
Expand All @@ -62,7 +62,7 @@ def @instance.buffer

attr_accessor :tag

def emit(record, time=Engine.now)
def emit(record, time=EventTime.now)
@entries << [time, record]
self
end
Expand Down Expand Up @@ -110,7 +110,7 @@ def initialize(klass, tag='test', &block)

attr_accessor :tag

def emit(record, time=Engine.now)
def emit(record, time=EventTime.now)
@entries << [time, record]
self
end
Expand Down
3 changes: 1 addition & 2 deletions test/plugin/test_in_unix.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@ def test_time
d = create_driver

time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC")
Fluent::Engine.now = time

d.expect_emit "tag1", time, {"a"=>1}
d.expect_emit "tag2", time, {"a"=>2}

d.run do
d.expected_emits.each {|tag,_time,record|
send_data Fluent::MessagePackFactory.msgpack_packer.write([tag, 0, record]).to_s
send_data Fluent::MessagePackFactory.msgpack_packer.write([tag, _time, record]).to_s
}
end
end
Expand Down

0 comments on commit 55d11dd

Please sign in to comment.