Skip to content

Commit

Permalink
Extract as a method
Browse files Browse the repository at this point in the history
Signed-off-by: Yuta Iwama <ganmacs@gmail.com>
  • Loading branch information
ganmacs committed Apr 13, 2020
1 parent eff66ca commit 8adcb1b
Showing 1 changed file with 13 additions and 21 deletions.
34 changes: 13 additions & 21 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -578,12 +578,7 @@ def standby?

def verify_connection
connect do |sock, ri|
if ri.state != :established
establish_connection(sock, ri)
if ri.state != :established
raise "Failed to establish connection to #{@host}:#{@port}"
end
end
ensure_established_connection(sock, ri)
end
end

Expand Down Expand Up @@ -652,14 +647,7 @@ def send_data_actual(sock, tag, chunk)
def send_data(tag, chunk)
ack = @ack_handler && @ack_handler.create_ack(chunk.unique_id, self)
connect(nil, ack: ack) do |sock, ri|
if ri.state != :established
establish_connection(sock, ri)

if ri.state != :established
raise ConnectionClosedError, "failed to establish connection with node #{@name}"
end
end

ensure_established_connection(sock, ri)
send_data_actual(sock, tag, chunk)
end

Expand All @@ -685,13 +673,7 @@ def send_heartbeat
case @sender.heartbeat_type
when :transport
connect(dest_addr) do |sock, ri|
if ri.state != :established
establish_connection(sock, ri)

if ri.state != :established
raise ConnectionClosedError, "failed to establish connection with node #{@name}"
end
end
ensure_established_connection(sock, ri)

## don't send any data to not cause a compatibility problem
# sock.write FORWARD_TCP_HEARTBEAT_DATA
Expand Down Expand Up @@ -784,6 +766,16 @@ def heartbeat(detect=true)

private

def ensure_established_connection(sock, request_info)
if request_info.state != :established
establish_connection(sock, request_info)

if request_info.state != :established
raise ConnectionClosedError, "failed to establish connection with node #{@name}"
end
end
end

def connect(host = nil, ack: false, &block)
@connection_manager.connect(host: host || resolved_host, port: port, hostname: @hostname, ack: ack, &block)
end
Expand Down

0 comments on commit 8adcb1b

Please sign in to comment.