Skip to content

Commit

Permalink
Remove needless thread poll implementation
Browse files Browse the repository at this point in the history
If we don't use `#sleep` to prevent reading bytes beyond limits,
it is not needed to implement thread pool mechanism.
We just need to get out reading file loop when bytes limit is reached.

Signed-off-by: Hiroshi Hatake <hatake@calyptia.com>
  • Loading branch information
cosmo0920 committed May 11, 2021
1 parent d3a3023 commit 478b487
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 133 deletions.
82 changes: 3 additions & 79 deletions lib/fluent/plugin/in_tail.rb
Expand Up @@ -109,8 +109,6 @@ def initialize
config_param :path_timezone, :string, default: nil
desc 'Follow inodes instead of following file names. Guarantees more stable delivery and allows to use * in path pattern with rotating files'
config_param :follow_inodes, :bool, default: false
desc 'Specify max size of thread pool'
config_param :max_thread_pool_size, :integer, default: 1

config_section :parse, required: false, multi: true, init: true, param_name: :parser_configs do
config_argument :usage, :string, default: 'in_tail_parser'
Expand Down Expand Up @@ -182,24 +180,14 @@ def configure(conf)
# parser is already created by parser helper
@parser = parser_create(usage: parser_config['usage'] || @parser_configs.first.usage)
@capability = Fluent::Capability.new(:current_process)
# Need to create thread pool because #sleep should pause entire thread on enabling log throttling feature.
@thread_pool = nil
@enable_read_bytes_limit_thread_pool = false
if @read_bytes_limit_per_second > 0
if @max_thread_pool_size < 0
raise Fluent::ConfigError, "Specify positive number"
end
if !@enable_watch_timer
raise Fluent::ConfigError, "Need to enable watch timer when using log throttling feature"
end
if @read_bytes_limit_per_second < 8192
log.warn "Should specify greater equal than 8192. Use 8192 for read_bytes_limit_per_second"
@read_bytes_limit_per_second = 8192
end
if @max_thread_pool_size < 2
raise Fluent::ConfigError, "Specify 2 or more on max_thread_pool_size"
end
@enable_read_bytes_limit_thread_pool = true
end
end

Expand Down Expand Up @@ -447,19 +435,9 @@ def construct_watcher(target_info)
end

def start_watchers(targets_info)
if @enable_read_bytes_limit_thread_pool
@thread_pool = TailThread::Pool.new(@max_thread_pool_size) do |pool|
targets_info.each_value {|target_info|
pool.run {
construct_watcher(target_info)
}
}
end
else
targets_info.each_value {|target_info|
construct_watcher(target_info)
}
end
targets_info.each_value {|target_info|
construct_watcher(target_info)
}
end

def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watcher: true)
Expand All @@ -478,10 +456,6 @@ def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watch
end
end
}
if @thread_pool
@thread_pool.stop
@thread_pool = nil
end
end

def close_watcher_handles
Expand Down Expand Up @@ -712,56 +686,6 @@ def on_timer
end
end

class TailThread
class Pool
def initialize(max_size, &session)
@max_size = max_size
@queue = Queue.new
@threads = []
session.call(self)
ensure
terminate
end

def run(&task)
@queue.push(task)
@threads << create_thread if @threads.size < @max_size
end

def terminate
until @queue.num_waiting == @threads.size
sleep 0.01
end
@queue.close
begin
Timeout.timeout(1) do
@threads.each{|th| th.join }
end
rescue Timeout::Error
@threads.each {|th| th.kill }
end
end

def stop
return unless running?

terminate
end

protected def create_thread
Thread.start(@queue) do |q|
while task = q.pop
task.call
end
end
end

protected def running?
!@queue.closed?
end
end
end

class TailWatcher
def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build)
@path = target_info.path
Expand Down
26 changes: 0 additions & 26 deletions test/plugin/in_tail/test_tailthread_pool.rb

This file was deleted.

34 changes: 6 additions & 28 deletions test/plugin/test_in_tail.rb
Expand Up @@ -8,7 +8,6 @@
require 'timecop'
require 'tmpdir'
require 'securerandom'
require 'etc'

class TailInputTest < Test::Unit::TestCase
include FlexMock::TestCase
Expand Down Expand Up @@ -103,7 +102,6 @@ def create_target_info(path)
CONFIG_ENABLE_WATCH_TIMER = config_element("", "", { "enable_watch_timer" => false })
CONFIG_DISABLE_STAT_WATCHER = config_element("", "", { "enable_stat_watcher" => false })
CONFIG_OPEN_ON_EVERY_UPDATE = config_element("", "", { "open_on_every_update" => true })
CONFIG_MAX_THREAD_POOL_SIZE = config_element("", "", { "max_thread_pool_size" => (Etc.nprocessors / 1.5).ceil })
COMMON_FOLLOW_INODE_CONFIG = config_element("ROOT", "", {
"path" => "#{TMP_DIR}/tail.txt*",
"pos_file" => "#{TMP_DIR}/tail.pos",
Expand Down Expand Up @@ -159,7 +157,6 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true)
assert_equal "#{TMP_DIR}/tail.pos", d.instance.pos_file
assert_equal 1000, d.instance.read_lines_limit
assert_equal -1, d.instance.read_bytes_limit_per_second
assert_equal 1, d.instance.max_thread_pool_size
assert_equal false, d.instance.ignore_repeated_permission_error
assert_nothing_raised do
d.instance.have_read_capability?
Expand Down Expand Up @@ -201,21 +198,14 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true)

sub_test_case "log throttling per file" do
test "w/o watcher timer is invalid" do
conf = CONFIG_ENABLE_WATCH_TIMER + CONFIG_MAX_THREAD_POOL_SIZE + config_element("ROOT", "", {"read_bytes_limit_per_second" => "8k"})
assert_raise(Fluent::ConfigError) do
create_driver(conf)
end
end

test "w/o 2 or more thread pool size is invalid" do
conf = config_element("ROOT", "", {"read_bytes_limit_per_second" => "8k"})
conf = CONFIG_ENABLE_WATCH_TIMER + config_element("ROOT", "", {"read_bytes_limit_per_second" => "8k"})
assert_raise(Fluent::ConfigError) do
create_driver(conf)
end
end

test "valid" do
conf = CONFIG_MAX_THREAD_POOL_SIZE + config_element("ROOT", "", {"read_bytes_limit_per_second" => "8k"})
conf = config_element("ROOT", "", {"read_bytes_limit_per_second" => "8k"})
assert_raise(Fluent::ConfigError) do
create_driver(conf)
end
Expand Down Expand Up @@ -355,14 +345,6 @@ def test_emit_with_read_lines_limit(data)
cleanup_file("#{TMP_DIR}/tail.txt")
end

def count_thread_pool_object
num = 0
ObjectSpace.each_object(Fluent::Plugin::TailInput::TailThread::Pool) { |obj|
num += 1
}
num
end

data("flat 8192 bytes, 2 events" => [:flat, 100, 8192, 2],
"flat 8192 bytes, 2 events w/o stat watcher" => [:flat_without_stat, 100, 8192, 2],
"flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20],
Expand All @@ -383,13 +365,13 @@ def test_emit_with_read_bytes_limit_per_second(data)
config_style, limit, limit_bytes, num_events = data
case config_style
when :flat
config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + CONFIG_MAX_THREAD_POOL_SIZE
config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes })
when :parse
config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG + CONFIG_MAX_THREAD_POOL_SIZE
config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG
when :flat_without_stat
config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + CONFIG_MAX_THREAD_POOL_SIZE
config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes })
when :parse_without_stat
config = CONFIG_READ_FROM_HEAD + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG + CONFIG_MAX_THREAD_POOL_SIZE
config = CONFIG_READ_FROM_HEAD + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG
end
d = create_driver(config)
msg = 'test' * 2000 # in_tail reads 8192 bytes at once.
Expand All @@ -403,10 +385,6 @@ def test_emit_with_read_bytes_limit_per_second(data)
}
end

assert do
count_thread_pool_object >= 1
end

events = d.events
assert_true(events.length <= num_events)
assert_equal({"message" => msg}, events[0][2])
Expand Down

0 comments on commit 478b487

Please sign in to comment.