Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add log throttling per file #2702

Closed
wants to merge 5 commits into from

Conversation

rewiko
Copy link
Contributor

@rewiko rewiko commented Nov 18, 2019

What this PR does / why we need it:
Running in a big cluster with high volume of log, it would be nice to throttle the log shipping to avoid network saturation and make it easier to calculate the max throughput per node for example in a Kubernetes cluster.

Tail plugin is watching files and every second reading from the last pointer to the end of the file.
This change allow to stop reading the file after X number of logs lines read and update the pointer in the pos file as usual.

This commit adds log throttling per bytes for each files, should work only when watch_timer is enabled and the stat_watcher (inotify) is disabled.

In order to have this feature for any watch configuration (timer or inotify..), I've updated to add a sleep when you have been reaching the bytes read limit, the sleep would block process and affect other file ingestion, I've added a basic thread array to have a multi-threading ingestion.
However I've noticed you are relying on cool.io, and I was wondering if I should use this library instead.

Would you be interested in this feature?

Some discussions before submitting the PR.

Docs Changes:

adding read_lines_limit_per_notify which by default is set to -1, so no throttling involve by default.

Release Note:

Remove debug log

Signed-off-by: Anthony Comtois <anthony.comtois@sky.uk>
Signed-off-by: Anthony Comtois <anthony.comtois@sky.uk>
…otify

Signed-off-by: Anthony Comtois <anthony.comtois@sky.uk>
Signed-off-by: Anthony Comtois <anthony.comtois@sky.uk>
Signed-off-by: Anthony Comtois <anthony.comtois@sky.uk>
Copy link
Member

@ganmacs ganmacs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test fails. so could you fix the test first?

@@ -201,7 +204,15 @@ def start
end

refresh_watchers unless @skip_refresh_on_startup
timer_execute(:in_tail_refresh_watchers, @refresh_interval, &method(:refresh_watchers))

@threads['in_tail_refresh_watchers'] = Thread.new(@refresh_interval) do |refresh_interval|
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@threads['in_tail_refresh_watchers'].priority = 10 # Default is zero; higher-priority threads will run before lower-priority threads.

@threads.each { |thr|
thr.join
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it blocks here, all code after this is blocking.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@threads is hash. so thr is Array.


log.debug "Thread refresh_watchers"
@threads.each { |thr|
log.debug "Thread #{thr[0]} #{thr[1].status}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent

@@ -356,6 +392,7 @@ def update_watcher(path, pe)
end
end
rotated_tw = @tails[path]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary

}
end

def stop_watchers(paths, immediate: false, unwatched: false, remove_watcher: true)
paths.each { |path|
tw = remove_watcher ? @tails.delete(path) : @tails[path]
if remove_watcher
@threads[path].exit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread#exit is dangerous. could you finish this thread in a proper way?

if @lines.size >= @watcher.read_lines_limit

number_bytes_read += bytes_to_read
limit_bytes_per_second_reached = (number_bytes_read >= @watcher.read_bytes_limit_per_second and @watcher.read_bytes_limit_per_second > 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
limit_bytes_per_second_reached = (number_bytes_read >= @watcher.read_bytes_limit_per_second and @watcher.read_bytes_limit_per_second > 0)
limit_bytes_per_second_reached = (number_bytes_read >= @watcher.read_bytes_limit_per_second && @watcher.read_bytes_limit_per_second > 0)

# sleep to stop reading files when we reach the read bytes per second limit, to throttle the log ingestion
time_spent_reading = Time.new - start_reading
@watcher.log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}")
if (time_spent_reading < 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (time_spent_reading < 1)
if time_spent_reading < 1

log.warn "Skip #{path} because unexpected setup error happens: #{e}"
next
begin
tw = setup_watcher(path, pe)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be a race condition. before passing pe to setup_watcher, L334 should be called. but the current code does not ensure it.

end
end
if @threads[path].nil?
log.debug "Add Thread #{path}"
@threads[path] = Thread.new(path) do |path|
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you change these codes to run on new thread?

@fifo.read_lines(@lines)
if @lines.size >= @watcher.read_lines_limit

number_bytes_read += bytes_to_read
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IO#readpartial does not alway read bytes_to_read bytes. Is this code ok?

@rewiko
Copy link
Contributor Author

rewiko commented Nov 27, 2019

test fails. so could you fix the test first?

Hello,
I will definitely add more test and fix those failing, I've created the PR to be able to discuss in term of design and making sure you would be interested by this kind of feature.

I'm gonna spend more time to add more tests and review your comments.
Thanks

@ganmacs
Copy link
Member

ganmacs commented Nov 27, 2019

Ok. I like this feature :)
but, There are some considering points.

Creating threads per file is not acceptable. Because if there are 1000 files to monitor, it creates 1000 threads. https://github.com/fluent/fluentd/pull/2702/files#diff-1da710c9dcc8d0fc57996df7a9d39695R331
This patch restricts the bytes size per file, right? I think that restring all data(not per file) could be better for the situation like "to avoid network saturation and make it easier to calculate the max throughput per node".

@rewiko
Copy link
Contributor Author

rewiko commented Nov 27, 2019

Hi @ganmacs,

Creating threads per file is not acceptable. Because if there are 1000 files to monitor, it creates 1000 threads.

That was one of my worries in term of performance, I've been testing with 200 files and a high number of bytes per file, the number of thread hasn't been an issue but I agree having one thread per file is not ideal, I was thinking about using a thread pool but that would need a bigger re-architecture and I'm not sure the inotify will work as expect.

I think that restring all data(not per file) could be better for the situation like "to avoid network saturation and make it easier to calculate the max throughput per node".

I agree, however we are more interested to throttle log in a Kubernetes environment per container which means per file, to do not affect container which are sending at a decent rate.

In the commit, I've been implementing the log throttling per file but this implementation work nicely only with you use the timer because you it will stop reading and if it did not get notified again then some logs might not be read.
Maybe I could reuse that concept and more breaking I can push the function with a sleep and a notify which will be consume by a thread pool. In that case only the file throttled will create thread.

@github-actions
Copy link

This PR has been automatically marked as stale because it has been open 90 days with no activity. Remove stale label or comment or this PR will be closed in 30 days

@github-actions github-actions bot added the stale label Dec 18, 2020
@repeatedly
Copy link
Member

cosmo0920 is now working on this feature on #3185 with newer in_tail code.
Thanks for the idea!

@repeatedly repeatedly closed this Dec 18, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants