Skip to content

Commit

Permalink
in_tail: test: Add testcases for already reached bytes limits
Browse files Browse the repository at this point in the history
Signed-off-by: Hiroshi Hatake <hatake@calyptia.com>
  • Loading branch information
cosmo0920 committed May 13, 2021
1 parent 4f921a3 commit 1757297
Showing 1 changed file with 93 additions and 44 deletions.
137 changes: 93 additions & 44 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -345,53 +345,102 @@ def test_emit_with_read_lines_limit(data)
cleanup_file("#{TMP_DIR}/tail.txt")
end

data("flat 8192 bytes, 2 events" => [:flat, 100, 8192, 2],
"flat 8192 bytes, 2 events w/o stat watcher" => [:flat_without_stat, 100, 8192, 2],
"flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20],
"flat #{8192*10} bytes, 20 events w/o stat watcher" => [:flat_without_stat, 100, (8192 * 10), 20],
"parse #{8192*4} bytes, 8 events" => [:parse, 100, (8192 * 4), 8],
"parse #{8192*4} bytes, 8 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 4), 8],
"parse #{8192*10} bytes, 20 events" => [:parse, 100, (8192 * 10), 20],
"parse #{8192*10} bytes, 20 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 10), 20],
"flat 8k bytes with unit, 2 events" => [:flat, 100, "8k", 2],
"flat 8k bytes with unit, 2 events w/o stat watcher" => [:flat_without_stat, 100, "8k", 2],
"flat #{8*10}k bytes with unit, 20 events" => [:flat, 100, "#{8*10}k", 20],
"flat #{8*10}k bytes with unit, 20 events w/o stat watcher" => [:flat_without_stat, 100, "#{8*10}k", 20],
"parse #{8*4}k bytes with unit, 8 events" => [:parse, 100, "#{8*4}k", 8],
"parse #{8*4}k bytes with unit, 8 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*4}k", 8],
"parse #{8*10}k bytes with unit, 20 events" => [:parse, 100, "#{8*10}k", 20],
"parse #{8*10}k bytes with unit, 20 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*10}k", 20])
def test_emit_with_read_bytes_limit_per_second(data)
config_style, limit, limit_bytes, num_events = data
case config_style
when :flat
config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes })
when :parse
config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG
when :flat_without_stat
config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes })
when :parse_without_stat
config = CONFIG_READ_FROM_HEAD + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG
end
d = create_driver(config)
msg = 'test' * 2000 # in_tail reads 8192 bytes at once.

# We should not do shutdown here due to hard timeout.
d.run(expect_emits: 2, shutdown: false) do
File.open("#{TMP_DIR}/tail.txt", "ab") {|f|
for _x in 0..30
f.puts msg
end
}
sub_test_case "reads_bytes_per_second w/o throttled" do
data("flat 8192 bytes, 2 events" => [:flat, 100, 8192, 2, false],
"flat 8192 bytes, 2 events already read limit reached" => [:flat, 100, 8192, 2, true],
"flat 8192 bytes, 2 events w/o stat watcher" => [:flat_without_stat, 100, 8192, 2, false],
"flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20],
"flat #{8192*10} bytes, 20 events w/o stat watcher" => [:flat_without_stat, 100, (8192 * 10), 20],
"parse #{8192*4} bytes, 8 events" => [:parse, 100, (8192 * 4), 8],
"parse #{8192*4} bytes, 8 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 4), 8],
"parse #{8192*10} bytes, 20 events" => [:parse, 100, (8192 * 10), 20],
"parse #{8192*10} bytes, 20 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 10), 20],
"flat 8k bytes with unit, 2 events" => [:flat, 100, "8k", 2],
"flat 8k bytes with unit, 2 events w/o stat watcher" => [:flat_without_stat, 100, "8k", 2],
"flat #{8*10}k bytes with unit, 20 events" => [:flat, 100, "#{8*10}k", 20],
"flat #{8*10}k bytes with unit, 20 events w/o stat watcher" => [:flat_without_stat, 100, "#{8*10}k", 20],
"parse #{8*4}k bytes with unit, 8 events" => [:parse, 100, "#{8*4}k", 8],
"parse #{8*4}k bytes with unit, 8 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*4}k", 8],
"parse #{8*10}k bytes with unit, 20 events" => [:parse, 100, "#{8*10}k", 20],
"parse #{8*10}k bytes with unit, 20 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*10}k", 20])
def test_emit_with_read_bytes_limit_per_second(data)
config_style, limit, limit_bytes, num_events = data
case config_style
when :flat
config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes })
when :parse
config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG
when :flat_without_stat
config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes })
when :parse_without_stat
config = CONFIG_READ_FROM_HEAD + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG
end
d = create_driver(config)
msg = 'test' * 2000 # in_tail reads 8192 bytes at once.

# We should not do shutdown here due to hard timeout.
d.run(expect_emits: 2, shutdown: false) do
File.open("#{TMP_DIR}/tail.txt", "ab") {|f|
for x in 0..30
f.puts msg
end
}
end

events = d.events
assert_true(events.length <= num_events)
assert_equal({"message" => msg}, events[0][2])
assert_equal({"message" => msg}, events[1][2])

# Teardown in_tail plugin instance here.
d.instance.shutdown
end
end

events = d.events
assert_true(events.length <= num_events)
assert_equal({"message" => msg}, events[0][2])
assert_equal({"message" => msg}, events[1][2])
sub_test_case "reads_bytes_per_second w/ throttled already" do
data("flat 8192 bytes, 2 events" => [:flat, 100, 8192, 2],
"flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20],
"parse #{8192*4} bytes, 8 events" => [:parse, 100, (8192 * 4), 8],
"parse #{8192*10} bytes, 20 events" => [:parse, 100, (8192 * 10), 20],
"flat 8k bytes with unit, 2 events" => [:flat, 100, "8k", 2],
"flat #{8*10}k bytes with unit, 20 events" => [:flat, 100, "#{8*10}k", 20],
"parse #{8*4}k bytes with unit, 8 events" => [:parse, 100, "#{8*4}k", 8],
"parse #{8*10}k bytes with unit, 20 events" => [:parse, 100, "#{8*10}k", 20])
def test_emit_with_read_bytes_limit_per_second(data)
config_style, limit, limit_bytes, num_events = data
case config_style
when :flat
config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes })
when :parse
config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG
end
d = create_driver(config)
msg = 'test' * 2000 # in_tail reads 8192 bytes at once.

mock.proxy(d.instance).io_handler(anything, anything) do |io_handler|
require 'fluent/config/types'
limit_bytes_value = Fluent::Config.size_value(limit_bytes)
io_handler.instance_variable_set(:@number_bytes_read, limit_bytes_value)
io_handler
end

# Teardown in_tail plugin instance here.
d.instance.shutdown
# We should not do shutdown here due to hard timeout.
d.run(expect_emits: 2, shutdown: false) do
File.open("#{TMP_DIR}/tail.txt", "ab") {|f|
for x in 0..30
f.puts msg
end
}
end

events = d.events
assert_true(events.length <= num_events)
assert_equal({"message" => msg}, events[0][2])
assert_equal({"message" => msg}, events[1][2])

# Teardown in_tail plugin instance here.
d.instance.shutdown
end
end
end

Expand Down

0 comments on commit 1757297

Please sign in to comment.