Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply newer service_discovery plugin helper methods #3362

Merged
merged 4 commits into from
May 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
43 changes: 13 additions & 30 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -227,31 +227,14 @@ def configure(conf)
socket_cache: socket_cache,
)

configs = []

# rewrite for using server as sd_static
conf.elements(name: 'server').each do |s|
s.name = 'service'
end

unless conf.elements(name: 'service').empty?
# To copy `services` element only
new_elem = Fluent::Config::Element.new('static_service_discovery', {}, {}, conf.elements(name: 'service'))
configs << { type: :static, conf: new_elem }
end

conf.elements(name: 'service_discovery').each_with_index do |c, i|
configs << { type: @service_discovery[i][:@type], conf: c }
end

service_discovery_create_manager(
service_discovery_configure(
:out_forward_service_discovery_watcher,
configurations: configs,
static_default_service_directive: 'server',
load_balancer: LoadBalancer.new(log),
custom_build_method: method(:build_node),
)

discovery_manager.services.each do |server|
service_discovery_services.each do |server|
# it's only for test
@nodes << server
unless @heartbeat_type == :none
Expand All @@ -273,7 +256,7 @@ def configure(conf)
end
end

if discovery_manager.services.empty?
if service_discovery_services.empty?
raise Fluent::ConfigError, "forward output plugin requires at least one node is required. Add <server> or <service_discovery>"
end

Expand Down Expand Up @@ -306,7 +289,7 @@ def start

unless @heartbeat_type == :none
if @heartbeat_type == :udp
@usock = socket_create_udp(discovery_manager.services.first.host, discovery_manager.services.first.port, nonblock: true)
@usock = socket_create_udp(service_discovery_services.first.host, service_discovery_services.first.port, nonblock: true)
server_create_udp(:out_forward_heartbeat_receiver, 0, socket: @usock, max_bytes: @read_length, &method(:on_udp_heatbeat_response_recv))
end
timer_execute(:out_forward_heartbeat_request, @heartbeat_interval, &method(:on_heartbeat_timer))
Expand All @@ -318,7 +301,7 @@ def start
end

if @verify_connection_at_startup
discovery_manager.services.each do |node|
service_discovery_services.each do |node|
begin
node.verify_connection
rescue StandardError => e
Expand Down Expand Up @@ -374,7 +357,7 @@ def write(chunk)
return if chunk.empty?
tag = chunk.metadata.tag

discovery_manager.select_service { |node| node.send_data(tag, chunk) }
service_discovery_select_service { |node| node.send_data(tag, chunk) }
end

def try_write(chunk)
Expand All @@ -384,7 +367,7 @@ def try_write(chunk)
return
end
tag = chunk.metadata.tag
discovery_manager.select_service { |node| node.send_data(tag, chunk) }
service_discovery_select_service { |node| node.send_data(tag, chunk) }
last_ack if @require_ack_response && @suspend_flush
end

Expand Down Expand Up @@ -434,7 +417,7 @@ def create_transfer_socket(host, port, hostname, &block)

def statistics
stats = super
services = discovery_manager.services
services = service_discovery_services
healthy_nodes_count = 0
registed_nodes_count = services.size
services.each do |s|
Expand Down Expand Up @@ -471,7 +454,7 @@ def build_node(server)

def on_heartbeat_timer
need_rebuild = false
discovery_manager.services.each do |n|
service_discovery_services.each do |n|
begin
log.trace "sending heartbeat", host: n.host, port: n.port, heartbeat_type: @heartbeat_type
n.usock = @usock if @usock
Expand All @@ -486,16 +469,16 @@ def on_heartbeat_timer
end

if need_rebuild
discovery_manager.rebalance
service_discovery_rebalance
end
end

def on_udp_heatbeat_response_recv(data, sock)
sockaddr = Socket.pack_sockaddr_in(sock.remote_port, sock.remote_host)
if node = discovery_manager.services.find { |n| n.sockaddr == sockaddr }
if node = service_discovery_services.find { |n| n.sockaddr == sockaddr }
# log.trace "heartbeat arrived", name: node.name, host: node.host, port: node.port
if node.heartbeat
discovery_manager.rebalance
service_discovery_rebalance
end
else
log.warn("Unknown heartbeat response received from #{sock.remote_host}:#{sock.remote_port}. It may service out")
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin_helper/service_discovery.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def start
def service_discovery_configure(title, static_default_service_directive: nil, load_balancer: nil, custom_build_method: nil, interval: 3)
configs = @service_discovery_configs.map(&:corresponding_config_element)
if static_default_service_directive
configs << Fluent::Config::Element.new(
configs.prepend Fluent::Config::Element.new(
'service_discovery',
'',
{'@type' => 'static'},
Expand Down
17 changes: 11 additions & 6 deletions test/plugin/test_out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -277,11 +277,16 @@ def try_write(chunk)
</service_discovery>
])

assert_equal 2, d.instance.discovery_manager.services.size
assert_equal '127.0.0.1', d.instance.discovery_manager.services[0].host
assert_equal 1234, d.instance.discovery_manager.services[0].port
assert_equal '127.0.0.1', d.instance.discovery_manager.services[1].host
assert_equal 1235, d.instance.discovery_manager.services[1].port

assert_equal(
[
{ host: '127.0.0.1', port: 1234 },
{ host: '127.0.0.1', port: 1235 },
],
d.instance.discovery_manager.services.collect do |service|
{ host: service.host, port: service.port }
end
)
end

test 'pass username and password as empty string to HandshakeProtocol' do
Expand Down Expand Up @@ -1073,7 +1078,7 @@ def create_target_input_driver(response_stub: nil, disconnect: false, conf: TARG
test 'when out_forward has @id' do
# cancel https://github.com/fluent/fluentd/blob/077508ac817b7637307434d0c978d7cdc3d1c534/lib/fluent/plugin_id.rb#L43-L53
# it always return true in test
mock.proxy(Fluent::Plugin).new_sd(:static, parent: anything) { |v|
mock.proxy(Fluent::Plugin).new_sd('static', parent: anything) { |v|
stub(v).plugin_id_for_test? { false }
}.once

Expand Down