Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use EventTime.now instead of Engine.now #2988

Merged
merged 2 commits into from
May 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/fluent/plugin/in_dummy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@ def emit(num)
begin
if @size > 1
num.times do
router.emit_array(@tag, Array.new(@size) { [Fluent::Engine.now, generate] })
router.emit_array(@tag, Array.new(@size) { [Fluent::EventTime.now, generate] })
end
else
num.times { router.emit(@tag, Fluent::Engine.now, generate) }
num.times { router.emit(@tag, Fluent::EventTime.now, generate) }
end
rescue => _
# ignore all errors not to stop emits by emit errors
Expand Down
4 changes: 2 additions & 2 deletions lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ def on_message(msg, chunk_size, conn)
record = e[1]
next if record.nil?
time = e[0]
time = Fluent::Engine.now if time.nil? || time.to_i == 0 # `to_i == 0` for empty EventTime
time = Fluent::EventTime.now if time.nil? || time.to_i == 0 # `to_i == 0` for empty EventTime
es.add(time, record)
}
es
Expand All @@ -347,7 +347,7 @@ def on_message(msg, chunk_size, conn)
return msg[3] # retry never succeeded so return ack and drop incoming event.
end
return if record.nil?
time = Fluent::Engine.now if time.to_i == 0
time = Fluent::EventTime.now if time.to_i == 0
if @enable_field_injection
record[@source_address_key] = conn.remote_addr if @source_address_key
record[@source_hostname_key] = conn.remote_host if @source_hostname_key
Expand Down
4 changes: 2 additions & 2 deletions lib/fluent/plugin/in_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,9 @@ def on_request(path_info, params)
end
time = if param_time = params['time']
param_time = param_time.to_f
param_time.zero? ? Fluent::Engine.now : @float_time_parser.parse(param_time)
param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time)
else
record_time.nil? ? Fluent::Engine.now : record_time
record_time.nil? ? Fluent::EventTime.now : record_time
end
rescue
return ["400 Bad Request", {'Content-Type'=>'text/plain'}, "400 Bad Request\n#{$!}\n"]
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/in_monitor_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def start
opts = {with_config: false, with_retry: false}
timer_execute(:in_monitor_agent_emit, @emit_interval, repeat: true) {
es = Fluent::MultiEventStream.new
now = Fluent::Engine.now
now = Fluent::EventTime.now
plugins_info_all(opts).each { |record|
es.add(now, record)
}
Expand Down
4 changes: 2 additions & 2 deletions lib/fluent/plugin/in_unix.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def on_message(msg)
record = e[1]
next if record.nil?
time = e[0]
time = (now ||= Engine.now) if time.to_i == 0
time = (now ||= EventTime.now) if time.to_i == 0
es.add(time, record)
}
router.emit_stream(tag, es)
Expand All @@ -106,7 +106,7 @@ def on_message(msg)
return if record.nil?

time = msg[1]
time = Engine.now if time.to_i == 0
time = EventTime.now if time.to_i == 0
router.emit(tag, time, record)
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/out_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def configure(conf)
dummy_record_keys = get_placeholders_keys(@path_template) || ['message']
dummy_record = Hash[dummy_record_keys.zip(['data'] * dummy_record_keys.size)]

test_chunk1 = chunk_for_test(dummy_tag, Fluent::Engine.now, dummy_record)
test_chunk1 = chunk_for_test(dummy_tag, Fluent::EventTime.now, dummy_record)
test_path = extract_placeholders(@path_template, test_chunk1)
unless ::Fluent::FileUtil.writable_p?(test_path)
raise Fluent::ConfigError, "out_file: `#{test_path}` is not writable"
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ def resolved_host
@resolved_host ||= resolve_dns!

else
now = Fluent::Engine.now
now = Fluent::EventTime.now
rh = @resolved_host
if !rh || now - @resolved_time >= @sender.expire_dns_cache
rh = @resolved_host = resolve_dns!
Expand Down
4 changes: 2 additions & 2 deletions lib/fluent/test/filter_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ def initialize(klass, tag = 'filter.test', &block)
attr_reader :filtered
attr_accessor :tag

def emit(record, time = Engine.now)
def emit(record, time = EventTime.now)
emit_with_tag(@tag, record, time)
end
alias_method :filter, :emit

def emit_with_tag(tag, record, time = Engine.now)
def emit_with_tag(tag, record, time = EventTime.now)
@events[tag] ||= MultiEventStream.new
@events[tag].add(time, record)
end
Expand Down
6 changes: 3 additions & 3 deletions lib/fluent/test/output_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def initialize(klass, tag='test', &block)

attr_accessor :tag

def emit(record, time=Engine.now)
def emit(record, time=EventTime.now)
es = OneEventStream.new(time, record)
@instance.emit_events(@tag, es)
end
Expand All @@ -62,7 +62,7 @@ def @instance.buffer

attr_accessor :tag

def emit(record, time=Engine.now)
def emit(record, time=EventTime.now)
@entries << [time, record]
self
end
Expand Down Expand Up @@ -110,7 +110,7 @@ def initialize(klass, tag='test', &block)

attr_accessor :tag

def emit(record, time=Engine.now)
def emit(record, time=EventTime.now)
@entries << [time, record]
self
end
Expand Down
3 changes: 1 addition & 2 deletions test/plugin/test_in_unix.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@ def test_time
d = create_driver

time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC")
Fluent::Engine.now = time

d.expect_emit "tag1", time, {"a"=>1}
d.expect_emit "tag2", time, {"a"=>2}

d.run do
d.expected_emits.each {|tag,_time,record|
send_data Fluent::MessagePackFactory.msgpack_packer.write([tag, 0, record]).to_s
send_data Fluent::MessagePackFactory.msgpack_packer.write([tag, _time, record]).to_s
}
end
end
Expand Down