Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sigdump func to fluent-ctl #3680

Merged
merged 15 commits into from
May 20, 2022
3 changes: 3 additions & 0 deletions lib/fluent/command/ctl.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class Ctl
restart: "HUP",
flush: "USR1",
reload: "USR2",
dump: "CONT",
}
WINSVC_CONTROL_CODE_MAP = {
shutdown: SERVICE_CONTROL_STOP,
Expand All @@ -44,13 +45,15 @@ class Ctl
restart: 128,
flush: 129,
reload: SERVICE_CONTROL_PARAMCHANGE,
dump: 130,
}
else
COMMAND_MAP = {
shutdown: :TERM,
restart: :HUP,
flush: :USR1,
reload: :USR2,
dump: :CONT,
}
end

Expand Down
112 changes: 92 additions & 20 deletions lib/fluent/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ def run_rpc_server
end
nil
}
@rpc_server.mount_proc('/api/processes.dump') { |req, res|
$log.debug "fluentd RPC got /api/processes.dump request"
if Fluent.windows?
supervisor_sigcont_handler
else
Process.kill :CONT, $$
end
nil
}
ashie marked this conversation as resolved.
Show resolved Hide resolved
@rpc_server.mount_proc('/api/plugins.flushBuffers') { |req, res|
$log.debug "fluentd RPC got /api/plugins.flushBuffers request"
if Fluent.windows?
Expand Down Expand Up @@ -186,6 +195,11 @@ def install_supervisor_signal_handlers
$log.debug 'fluentd supervisor process got SIGUSR2'
supervisor_sigusr2_handler
end

trap :CONT do
$log.debug 'fluentd supervisor process got CONT'
supervisor_sigcont_handler
end
end

if Fluent.windows?
Expand Down Expand Up @@ -215,44 +229,50 @@ def install_windows_event_handler
Thread.new do
ipc = Win32::Ipc.new(nil)
events = [
Win32::Event.new("#{@pid_signame}_STOP_EVENT_THREAD"),
Win32::Event.new("#{@pid_signame}"),
Win32::Event.new("#{@pid_signame}_HUP"),
Win32::Event.new("#{@pid_signame}_USR1"),
Win32::Event.new("#{@pid_signame}_USR2"),
{win32_event: Win32::Event.new("#{@pid_signame}_STOP_EVENT_THREAD"), action: :stop_event_thread},
{win32_event: Win32::Event.new("#{@pid_signame}"), action: :stop},
{win32_event: Win32::Event.new("#{@pid_signame}_HUP"), action: :hup},
{win32_event: Win32::Event.new("#{@pid_signame}_USR1"), action: :usr1},
{win32_event: Win32::Event.new("#{@pid_signame}_USR2"), action: :usr2},
{win32_event: Win32::Event.new("#{@pid_signame}_CONT"), action: :cont},
]
if @signame
signame_events = [
Win32::Event.new("#{@signame}"),
Win32::Event.new("#{@signame}_HUP"),
Win32::Event.new("#{@signame}_USR1"),
Win32::Event.new("#{@signame}_USR2"),
{win32_event: Win32::Event.new("#{@signame}"), action: :stop},
{win32_event: Win32::Event.new("#{@signame}_HUP"), action: :hup},
{win32_event: Win32::Event.new("#{@signame}_USR1"), action: :usr1},
{win32_event: Win32::Event.new("#{@signame}_USR2"), action: :usr2},
{win32_event: Win32::Event.new("#{@signame}_CONT"), action: :cont},
]
events.concat(signame_events)
end
begin
loop do
idx = ipc.wait_any(events, Windows::Synchronize::INFINITE)
if idx > 0 && idx <= events.length
$log.debug("Got Win32 event \"#{events[idx - 1].name}\"")
ipc_idx = ipc.wait_any(events.map {|e| e[:win32_event]}, Windows::Synchronize::INFINITE)
event_idx = ipc_idx - 1

if event_idx >= 0 && event_idx < events.length
$log.debug("Got Win32 event \"#{events[event_idx][:win32_event].name}\"")
else
$log.warn("Unexpected reutrn value of Win32::Ipc#wait_any: #{idx}")
$log.warn("Unexpected return value of Win32::Ipc#wait_any: #{ipc_idx}")
end
case idx
when 2, 6
case events[event_idx][:action]
when :stop
stop(true)
when 3, 7
when :hup
supervisor_sighup_handler
when 4, 8
when :usr1
supervisor_sigusr1_handler
when 5, 9
when :usr2
supervisor_sigusr2_handler
when 1
when :cont
supervisor_sigcont_handler
when :stop_event_thread
break
end
end
ensure
events.each { |event| event.close }
events.each { |event| event[:win32_event].close }
end
end
end
Expand Down Expand Up @@ -302,6 +322,37 @@ def supervisor_sigusr2_handler
$log.error "Failed to reload config file: #{e}"
end

def supervisor_sigcont_handler
daipom marked this conversation as resolved.
Show resolved Hide resolved
if Fluent.windows?
$log.info "dump file. [pid:#{Process.pid}]"

# Sigdump outputs under `/tmp` dir without `SIGDUMP_PATH` specified
# but `/tmp` dir may not exist on Windows by default.
# Since this function is mainly for Windows, this case should be saved.
# (Don't want to couple tightly with the Sigdump library but want to save this case especially.)
unless ENV['SIGDUMP_PATH']
dump_dir = '/tmp'
unless Dir.exist?(dump_dir)
FileUtils.mkdir_p(dump_dir, mode: Fluent::DEFAULT_DIR_PERMISSION)
ashie marked this conversation as resolved.
Show resolved Hide resolved
end
end
end

# Need new thread to dump in UNIX-like.
# (For some reason it seems to work fine on Windows with the original thread.)
Thread.new do
# As for UNIX-like, `kill CONT` signal is trapped by the supervisor process,
# so we have to dump manually for the supervisor process.
# (Normally, the dump is automatic with the signal by using `require 'sigdump/setup'`.)
require 'sigdump'
Sigdump.dump
end

send_signal_to_workers(:CONT)
rescue => e
$log.error "failed to dump: #{e}"
end

def kill_worker
if config[:worker_pid]
pids = config[:worker_pid].clone
Expand Down Expand Up @@ -358,6 +409,14 @@ def send_command_to_workers(signal)
restart(true)
when :USR2
reload
when :CONT
dump_all_windows_workers
end
end

def dump_all_windows_workers
@monitors.each do |m|
m.send_command("DUMP\n")
end
end
end
Expand Down Expand Up @@ -896,6 +955,9 @@ def install_main_process_command_handlers
when "RELOAD"
$log.debug "fluentd main process get #{cmd} command"
reload_config
when "DUMP"
$log.debug "fluentd main process get #{cmd} command"
dump
else
$log.warn "fluentd main process get unknown command [#{cmd}]"
end
Expand Down Expand Up @@ -945,6 +1007,16 @@ def reload_config
end
end

def dump
raise "[BUG] The `dump` function of workers is for Windows ONLY." unless Fluent.windows?
# May need to create a new thread for UNIX-like.
$log.info("dump file. [pid:#{Process.pid}]")
require 'sigdump'
Sigdump.dump
rescue => e
$log.error("failed to dump: #{e}")
end

def logging_with_console_output
yield $log
unless @log.stdout?
Expand Down
2 changes: 2 additions & 0 deletions lib/fluent/winsvc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ def service_user_defined_control(code)
set_event("#{@service_name}_HUP")
when 129
set_event("#{@service_name}_USR1")
when 130
set_event("#{@service_name}_CONT")
end
end

Expand Down
12 changes: 9 additions & 3 deletions test/plugin/test_in_object_space.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,19 @@ def waiting(seconds, instance)
end

class FailObject
def self.class
raise "error"
end
end

def setup
Fluent::Test.setup
# Overriding this behavior in the global scope will have an unexpected influence on other tests.
# So this should be overridden here and be removed in `teardown`.
def FailObject.class
raise "FailObject error for tests in ObjectSpaceInputTest."
end
end

def teardown
FailObject.singleton_class.remove_method(:class)
end

TESTCONFIG = %[
Expand Down
26 changes: 26 additions & 0 deletions test/test_supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,32 @@ def server.config
$log.out.reset if $log && $log.out && $log.out.respond_to?(:reset)
end

def test_supervisor_event_dump_windows
omit "Only for Windows, alternative to UNIX signals" unless Fluent.windows?

ENV['SIGDUMP_PATH'] = TMP_DIR + "/sigdump.log"

server = DummyServer.new
def server.config
{:signame => "TestFluentdEvent"}
end
server.install_windows_event_handler
begin
sleep 0.1 # Wait for starting windows event thread
event = Win32::Event.open("TestFluentdEvent_CONT")
event.set
event.close
sleep 1.0 # Wait for dumping
ensure
server.stop_windows_event_thread
end

result_filepaths = Dir.glob("#{TMP_DIR}/*")
assert {result_filepaths.length > 0}
ensure
ENV.delete('SIGDUMP_PATH')
end

data(:ipv4 => ["0.0.0.0", "127.0.0.1", false],
:ipv6 => ["[::]", "[::1]", true],
:localhost_ipv4 => ["localhost", "127.0.0.1", false])
Expand Down