Skip to content

Commit

Permalink
Merge pull request #3362 from fluent/update-out-forward
Browse files Browse the repository at this point in the history
Apply newer service_discovery plugin helper methods
  • Loading branch information
ashie committed May 10, 2021
2 parents 5d9c9ac + c04ab95 commit 22c2b4a
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 37 deletions.
43 changes: 13 additions & 30 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -227,31 +227,14 @@ def configure(conf)
socket_cache: socket_cache,
)

configs = []

# rewrite for using server as sd_static
conf.elements(name: 'server').each do |s|
s.name = 'service'
end

unless conf.elements(name: 'service').empty?
# To copy `services` element only
new_elem = Fluent::Config::Element.new('static_service_discovery', {}, {}, conf.elements(name: 'service'))
configs << { type: :static, conf: new_elem }
end

conf.elements(name: 'service_discovery').each_with_index do |c, i|
configs << { type: @service_discovery[i][:@type], conf: c }
end

service_discovery_create_manager(
service_discovery_configure(
:out_forward_service_discovery_watcher,
configurations: configs,
static_default_service_directive: 'server',
load_balancer: LoadBalancer.new(log),
custom_build_method: method(:build_node),
)

discovery_manager.services.each do |server|
service_discovery_services.each do |server|
# it's only for test
@nodes << server
unless @heartbeat_type == :none
Expand All @@ -273,7 +256,7 @@ def configure(conf)
end
end

if discovery_manager.services.empty?
if service_discovery_services.empty?
raise Fluent::ConfigError, "forward output plugin requires at least one node is required. Add <server> or <service_discovery>"
end

Expand Down Expand Up @@ -306,7 +289,7 @@ def start

unless @heartbeat_type == :none
if @heartbeat_type == :udp
@usock = socket_create_udp(discovery_manager.services.first.host, discovery_manager.services.first.port, nonblock: true)
@usock = socket_create_udp(service_discovery_services.first.host, service_discovery_services.first.port, nonblock: true)
server_create_udp(:out_forward_heartbeat_receiver, 0, socket: @usock, max_bytes: @read_length, &method(:on_udp_heatbeat_response_recv))
end
timer_execute(:out_forward_heartbeat_request, @heartbeat_interval, &method(:on_heartbeat_timer))
Expand All @@ -318,7 +301,7 @@ def start
end

if @verify_connection_at_startup
discovery_manager.services.each do |node|
service_discovery_services.each do |node|
begin
node.verify_connection
rescue StandardError => e
Expand Down Expand Up @@ -374,7 +357,7 @@ def write(chunk)
return if chunk.empty?
tag = chunk.metadata.tag

discovery_manager.select_service { |node| node.send_data(tag, chunk) }
service_discovery_select_service { |node| node.send_data(tag, chunk) }
end

def try_write(chunk)
Expand All @@ -384,7 +367,7 @@ def try_write(chunk)
return
end
tag = chunk.metadata.tag
discovery_manager.select_service { |node| node.send_data(tag, chunk) }
service_discovery_select_service { |node| node.send_data(tag, chunk) }
last_ack if @require_ack_response && @suspend_flush
end

Expand Down Expand Up @@ -434,7 +417,7 @@ def create_transfer_socket(host, port, hostname, &block)

def statistics
stats = super
services = discovery_manager.services
services = service_discovery_services
healthy_nodes_count = 0
registed_nodes_count = services.size
services.each do |s|
Expand Down Expand Up @@ -471,7 +454,7 @@ def build_node(server)

def on_heartbeat_timer
need_rebuild = false
discovery_manager.services.each do |n|
service_discovery_services.each do |n|
begin
log.trace "sending heartbeat", host: n.host, port: n.port, heartbeat_type: @heartbeat_type
n.usock = @usock if @usock
Expand All @@ -486,16 +469,16 @@ def on_heartbeat_timer
end

if need_rebuild
discovery_manager.rebalance
service_discovery_rebalance
end
end

def on_udp_heatbeat_response_recv(data, sock)
sockaddr = Socket.pack_sockaddr_in(sock.remote_port, sock.remote_host)
if node = discovery_manager.services.find { |n| n.sockaddr == sockaddr }
if node = service_discovery_services.find { |n| n.sockaddr == sockaddr }
# log.trace "heartbeat arrived", name: node.name, host: node.host, port: node.port
if node.heartbeat
discovery_manager.rebalance
service_discovery_rebalance
end
else
log.warn("Unknown heartbeat response received from #{sock.remote_host}:#{sock.remote_port}. It may service out")
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin_helper/service_discovery.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def start
def service_discovery_configure(title, static_default_service_directive: nil, load_balancer: nil, custom_build_method: nil, interval: 3)
configs = @service_discovery_configs.map(&:corresponding_config_element)
if static_default_service_directive
configs << Fluent::Config::Element.new(
configs.prepend Fluent::Config::Element.new(
'service_discovery',
'',
{'@type' => 'static'},
Expand Down
17 changes: 11 additions & 6 deletions test/plugin/test_out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -277,11 +277,16 @@ def try_write(chunk)
</service_discovery>
])

assert_equal 2, d.instance.discovery_manager.services.size
assert_equal '127.0.0.1', d.instance.discovery_manager.services[0].host
assert_equal 1234, d.instance.discovery_manager.services[0].port
assert_equal '127.0.0.1', d.instance.discovery_manager.services[1].host
assert_equal 1235, d.instance.discovery_manager.services[1].port

assert_equal(
[
{ host: '127.0.0.1', port: 1234 },
{ host: '127.0.0.1', port: 1235 },
],
d.instance.discovery_manager.services.collect do |service|
{ host: service.host, port: service.port }
end
)
end

test 'pass username and password as empty string to HandshakeProtocol' do
Expand Down Expand Up @@ -1073,7 +1078,7 @@ def create_target_input_driver(response_stub: nil, disconnect: false, conf: TARG
test 'when out_forward has @id' do
# cancel https://github.com/fluent/fluentd/blob/077508ac817b7637307434d0c978d7cdc3d1c534/lib/fluent/plugin_id.rb#L43-L53
# it always return true in test
mock.proxy(Fluent::Plugin).new_sd(:static, parent: anything) { |v|
mock.proxy(Fluent::Plugin).new_sd('static', parent: anything) { |v|
stub(v).plugin_id_for_test? { false }
}.once

Expand Down

0 comments on commit 22c2b4a

Please sign in to comment.