Skip to content

Commit

Permalink
out_forward: Fix a race condition on handshake
Browse files Browse the repository at this point in the history
Fix the following failed test it's caused by using a same unpacker from
multiple threads:

https://github.com/fluent/fluentd/runs/2725344894

```
2021-06-02T06:50:53.7067710Z Failure: test: Node with security is thread-safe on multi threads(ForwardOutputTest):
2021-06-02T06:50:53.7069440Z   '2021-06-02 06:50:53 +0000 [warn]: connection refused to test: invalid format for PONG message' happens.
2021-06-02T06:50:53.7070450Z   <false> expected but was
2021-06-02T06:50:53.7071110Z   <true>
2021-06-02T06:50:53.7072550Z /Users/runner/work/fluentd/fluentd/test/plugin/test_out_forward.rb:1017:in `block in <class:ForwardOutputTest>'
2021-06-02T06:50:53.7074360Z      1014:     end
2021-06-02T06:50:53.7075180Z      1015:
2021-06-02T06:50:53.7075860Z      1016:     logs = d.logs
2021-06-02T06:50:53.7078420Z   => 1017:     assert_false(logs.any? { |log| log.include?("invalid format for PONG message") || log.include?("shared key mismatch") }, "'#{logs.last.strip}' happens")
2021-06-02T06:50:53.7079580Z      1018:   end
2021-06-02T06:50:53.7080160Z      1019:
2021-06-02T06:50:53.7081050Z      1020:   def create_target_input_driver(response_stub: nil, disconnect: false, conf: target_config)
```

Signed-off-by: Takuro Ashie <ashie@clear-code.com>
  • Loading branch information
ashie committed Jun 2, 2021
1 parent 1be5323 commit 9f9a67d
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 4 deletions.
5 changes: 2 additions & 3 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -565,8 +565,6 @@ def initialize(sender, server, failure:, connection_manager:, ack_handler:)
username: server.username || '',
)

@unpacker = Fluent::MessagePackFactory.msgpack_unpacker

@resolved_host = nil
@resolved_time = 0
@resolved_once = false
Expand Down Expand Up @@ -613,7 +611,8 @@ def establish_connection(sock, ri)
sleep @sender.read_interval
next
end
@unpacker.feed_each(buf) do |data|
unpacker = Fluent::MessagePackFactory.thread_local_msgpack_unpacker
unpacker.feed_each(buf) do |data|
if @handshake.invoke(sock, ri, data) == :established
@log.debug "connection established", host: @host, port: @port
end
Expand Down
4 changes: 3 additions & 1 deletion test/plugin/test_out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1014,7 +1014,9 @@ def try_write(chunk)
end

logs = d.logs
assert_false(logs.any? { |log| log.include?("invalid format for PONG message") || log.include?("shared key mismatch") }, "'#{logs.last.strip}' happens")
assert_false(logs.any? { |log|
log.include?("invalid format for PONG message") || log.include?("shared key mismatch")
}, "Actual log:\n#{logs.join}")
end

def create_target_input_driver(response_stub: nil, disconnect: false, conf: target_config)
Expand Down

0 comments on commit 9f9a67d

Please sign in to comment.