Skip to content

Commit

Permalink
Block excessive gracefulReload requests
Browse files Browse the repository at this point in the history
Closes: fluent#3341

In the previous versions, /api/config.gracefulReload call doesn't
restrict excessive API calls. It causes the following error when
already gracefulReload is executing.

  Worker 0 finished unexpectedly with signal SIGKILL

This commit mitigates such a situation by restricting a API call.
(it gives an some interval and it is customizable in system configuration
 - blocking_reload_interval parameter)

NOTE: Ideally it should wait and detects graceful reload finish, but
there is no easy way to synchronize internal state between
ServerModule(RPC::Server#mount_proc) and WorkerModule (reload_config).

Signed-off-by: Kentaro Hayashi <hayashi@clear-code.com>
  • Loading branch information
kenhys committed Jun 7, 2021
1 parent ed42e6f commit 3194d93
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 9 deletions.
27 changes: 20 additions & 7 deletions lib/fluent/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ def before_run
@rpc_endpoint = nil
@rpc_server = nil
@counter = nil
@blocking_reload_interval = config[:blocking_reload_interval]
@block_reload_until = nil

if config[:rpc_endpoint]
@rpc_endpoint = config[:rpc_endpoint]
Expand Down Expand Up @@ -83,6 +85,7 @@ def after_run

def run_rpc_server
@rpc_server = RPC::Server.new(@rpc_endpoint, $log)
@block_reload_until = Fluent::Clock.now + @blocking_reload_interval

# built-in RPC for signals
@rpc_server.mount_proc('/api/processes.interruptWorkers') { |req, res|
Expand Down Expand Up @@ -134,13 +137,21 @@ def run_rpc_server

@rpc_server.mount_proc('/api/config.gracefulReload') { |req, res|
$log.debug "fluentd RPC got /api/config.gracefulReload request"
if Fluent.windows?
supervisor_sigusr2_handler
if @block_reload_until > Fluent::Clock.now
$log.warn "block gracefulReload until: #{Time.now + @block_reload_until - Fluent::Clock.now} " +
"remaining: #{(@block_reload_until - Fluent::Clock.now).to_i} seconds"
[422, nil, {'message': 'failed to reload config'}]
else
Process.kill :USR2, $$
end
@block_reload_until = Fluent::Clock.now + @blocking_reload_interval
$log.debug("accept next gracefulReload after: #{Time.now + @block_reload_until - Fluent::Clock.now}")
if Fluent.windows?
supervisor_sigusr2_handler
else
Process.kill :USR2, $$
end

nil
nil
end
}

@rpc_server.mount_proc('/api/config.getDump') { |req, res|
Expand Down Expand Up @@ -457,7 +468,8 @@ def self.load_config(path, params = {})
config_path: path,
main_cmd: params['main_cmd'],
signame: params['signame'],
disable_shared_socket: params['disable_shared_socket']
disable_shared_socket: params['disable_shared_socket'],
blocking_reload_interval: params['blocking_reload_interval']
}
if daemonize
se_config[:pid_path] = pid_path
Expand Down Expand Up @@ -817,7 +829,8 @@ def supervise
'counter_server' => @system_config.counter_server,
'log_format' => @system_config.log.format,
'log_time_format' => @system_config.log.time_format,
'disable_shared_socket' => @system_config.disable_shared_socket
'disable_shared_socket' => @system_config.disable_shared_socket,
'blocking_reload_interval' => @system_config.blocking_reload_interval
}

se = ServerEngine.create(ServerModule, WorkerModule){
Expand Down
6 changes: 5 additions & 1 deletion lib/fluent/system_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@ class SystemConfig
:log_event_verbose, :ignore_repeated_log_interval, :ignore_same_log_interval,
:without_source, :rpc_endpoint, :enable_get_dump, :process_name,
:file_permission, :dir_permission, :counter_server, :counter_client,
:strict_config_value, :enable_msgpack_time_support, :disable_shared_socket
:strict_config_value, :enable_msgpack_time_support, :disable_shared_socket,
:blocking_reload_interval
]

DEFAULT_BLOCKING_RELOAD_INTERVAL = 15

config_param :workers, :integer, default: 1
config_param :root_dir, :string, default: nil
config_param :log_level, :enum, list: [:trace, :debug, :info, :warn, :error, :fatal], default: 'info'
Expand All @@ -46,6 +49,7 @@ class SystemConfig
config_param :strict_config_value, :bool, default: nil
config_param :enable_msgpack_time_support, :bool, default: nil
config_param :disable_shared_socket, :bool, default: nil
config_param :blocking_reload_interval, :time, default: DEFAULT_BLOCKING_RELOAD_INTERVAL
config_param :file_permission, default: nil do |v|
v.to_i(8)
end
Expand Down
2 changes: 2 additions & 0 deletions test/config/test_system_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def parse_text(text)
assert_nil(sc.enable_msgpack_time_support)
assert_equal(:text, sc.log.format)
assert_equal('%Y-%m-%d %H:%M:%S %z', sc.log.time_format)
assert_equal(Fluent::SystemConfig::DEFAULT_BLOCKING_RELOAD_INTERVAL, sc.blocking_reload_interval)
end

data(
Expand All @@ -93,6 +94,7 @@ def parse_text(text)
'without_source' => ['without_source', true],
'strict_config_value' => ['strict_config_value', true],
'enable_msgpack_time_support' => ['enable_msgpack_time_support', true],
'blocking_reload_interval' => ['blocking_reload_interval', 10],
)
test "accepts parameters" do |(k, v)|
conf = parse_text(<<-EOS)
Expand Down
83 changes: 82 additions & 1 deletion test/test_supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
class SupervisorTest < ::Test::Unit::TestCase
class DummyServer
include Fluent::ServerModule
attr_accessor :rpc_endpoint, :enable_get_dump
attr_accessor :rpc_endpoint, :enable_get_dump, :blocking_reload_interval
def config
{}
end
Expand Down Expand Up @@ -230,6 +230,7 @@ def test_rpc_server

server = DummyServer.new
server.rpc_endpoint = sys_conf.rpc_endpoint
server.blocking_reload_interval = sys_conf.blocking_reload_interval
server.enable_get_dump = sys_conf.enable_get_dump

server.run_rpc_server
Expand Down Expand Up @@ -273,6 +274,7 @@ def server.config
}
end
server.rpc_endpoint = sys_conf.rpc_endpoint
server.blocking_reload_interval = sys_conf.blocking_reload_interval

server.run_rpc_server

Expand All @@ -283,6 +285,85 @@ def server.config
assert_equal('{"ok":true}', response)
end

sub_test_case "gracefulReload" do
SUCCEEDED_RELOADING_RESPONSE = {"ok": true}.to_json
FAILED_RELOADING_RESPONSE = {"message": "failed to reload config", "ok":false}.to_json

def setup
opts = Fluent::Supervisor.default_options
@sv = Fluent::Supervisor.new(opts)
@server = DummyServer.new
@port = unused_port
end

def teardown
@server.stop_rpc_server
end

def test_fail_graceful_reload_immediately
conf_data = <<-EOC
<system>
rpc_endpoint 0.0.0.0:#{@port}
</system>
EOC
conf = Fluent::Config.parse(conf_data, "(test)", "(test_dir)", true)
sys_conf = @sv.__send__(:build_system_config, conf)
@server.rpc_endpoint = sys_conf.rpc_endpoint
@server.blocking_reload_interval = sys_conf.blocking_reload_interval
@server.run_rpc_server
uri = URI.parse("http://127.0.0.1:#{@port}/api/config.gracefulReload")
response = Net::HTTP.get(uri)
assert_equal(FAILED_RELOADING_RESPONSE, response)
end

def test_graceful_reload_until_succeeds
conf_data = <<-EOC
<system>
rpc_endpoint 0.0.0.0:#{@port}
</system>
EOC
conf = Fluent::Config.parse(conf_data, "(test)", "(test_dir)", true)
sys_conf = @sv.__send__(:build_system_config, conf)
@server.rpc_endpoint = sys_conf.rpc_endpoint
@server.blocking_reload_interval = sys_conf.blocking_reload_interval
@server.run_rpc_server
uri = URI.parse("http://127.0.0.1:#{@port}/api/config.gracefulReload")
responses = []
threads = []
Fluent::SystemConfig::DEFAULT_BLOCKING_RELOAD_INTERVAL.times do |i|
threads << Thread.new do
sleep i
responses << JSON.parse(Net::HTTP.get(uri))
end
end
threads.each { |t| t.join }
warn_logs = $log.out.logs.collect { |log| log if log =~ /\[warn\]/ }.compact
sleep 1 # Ensure remaining time has passed
assert_equal([true, true, SUCCEEDED_RELOADING_RESPONSE],
[responses.none? { |response| response["ok"] == true },
warn_logs.all? { |log| log =~ /remaining:/ },
Net::HTTP.get(uri)])
end

def test_blocking_reload_interval
conf_data = <<-EOC
<system>
rpc_endpoint 0.0.0.0:#{@port}
blocking_reload_interval 3
</system>
EOC
conf = Fluent::Config.parse(conf_data, "(test)", "(test_dir)", true)
sys_conf = @sv.__send__(:build_system_config, conf)
@server.rpc_endpoint = sys_conf.rpc_endpoint
@server.blocking_reload_interval = sys_conf.blocking_reload_interval
@server.install_supervisor_signal_handlers
@server.run_rpc_server
sleep 3 # Ensure remaining time has passed
uri = URI.parse("http://127.0.0.1:#{@port}/api/config.gracefulReload")
assert_equal(SUCCEEDED_RELOADING_RESPONSE, Net::HTTP.get(uri))
end
end

def test_load_config
tmp_dir = "#{TMP_DIR}/dir/test_load_config.conf"
conf_info_str = %[
Expand Down

5 comments on commit 3194d93

@alex-vmw
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @kenhys, a few comments/questions about this change:

  1. There are current products out there, which are relying on the current behavior of gracefulReload. By blocking reloads that are close to each other will have an adverse effect on those products if they don't currently have a retry mechanism implemented.
  2. Would setting blocking_reload_interval=0 preserve existing behavior?
  3. Is it possible for fluentd to have some kind of a queue for gracefulReloads, so that it can accept the rpc reload call and execute it only when a previous reload finished.
  4. In my personal opinion, picking up new configuration with a reload/restart is the most important thing, so if item 3 above is not possible to implement, the current behavior might actually be preferred as it does NOT result in a possibility of not picking up latest configuration changes.

@kenhys
Copy link
Owner Author

@kenhys kenhys commented on 3194d93 Jun 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... if they don't currently have a retry mechanism implemented.

There is a case that affects, but worker should not be killed unexpectedly.

Would setting blocking_reload_interval=0 preserve existing behavior?

Yes.

Is it possible for fluentd to have some kind of a queue for gracefulReloads, so that it can accept the rpc reload call and execute it only when a previous reload finished.

Maybe yes, but I've chosen a more simple approach.

... the current behavior might actually be preferred as it does NOT result in a possibility of not picking up latest configuration changes.

Hmm, even though the worker is unexpectedly killed, as a new worker loads new configuration, the current behavior may be enough πŸ€”

@alex-vmw
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe yes, but I've chosen a more simple approach.

The problem is that a simple approach might just be too simple and can lead to issues where latest config is not picked up because reload was blocked and whatever requested the reload did not retry again.

@ashie
Copy link

@ashie ashie commented on 3194d93 Jun 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that a simple approach might just be too simple and can lead to issues where latest config is not picked up because reload was blocked and whatever requested the reload did not retry again.

To tell the truth, I'm not so positive for this pull request because of almost same reason with @alex-vmw
(I'm sorry for my late response for that...)

The problem of fluent#3341 is that a worker is killed unexpectedly by SIGKILL, so that the issue might be still happen even if we introduce this change.
I think we should investigate the root cause of fluent#3341, and I also think that fluent#3380 might fix this issue.

In addition, we should check the consistency of gracefulReload.
When gracefulReload is triggered, a new thread for reloading is created at both supervisor process and worker process.

https://github.com/fluent/fluentd/blob/980425ec8ee967854e91f2aa7371fc9c11a0c640/lib/fluent/supervisor.rb#L279-L296

https://github.com/fluent/fluentd/blob/980425ec8ee967854e91f2aa7371fc9c11a0c640/lib/fluent/supervisor.rb#L918-L940

So that multiple graceReload request might break the consistency.
On the other hand, waiting a previous thread may be enough to keep consistency.
If the consistency is kept, I think excessive requests should be acceptable (as for it, a user should take responsibility).

@alex-vmw
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the consistency is kept, I think excessive requests should be acceptable

If there is a queue for requests that are executed only after a previous one is finished, then consistency would be kept, right?

as for it, a user should take responsibility).

Not quite sure what you mean by this, can you elaborate a bit more?

Please sign in to comment.