Skip to content

Commit

Permalink
Add connection lifespan metrics (line#2796)
Browse files Browse the repository at this point in the history
Motivation:
An open and close event on connections can be listening using `ConnectionPoolListner` only on the client-side.
However, there is no way to get the status of connections on the server-side.
This PR will provide `armeria.{server,client}.connections.lifespan` metrics
for better monitoring of the connection lifetime.

Modifications:
* Add `keepAliveTimer` to KeepAliveHandler.
* Record connections lifespan when a connection is closed.

Result:
You can now get a connection lifetime metric from
`armeria.{server,client}.connections.lifespan`.
  • Loading branch information
ikhoon committed Jun 17, 2020
1 parent f5e2a55 commit 74af186
Show file tree
Hide file tree
Showing 18 changed files with 225 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.linecorp.armeria.common.RequestHeaders;
import com.linecorp.armeria.internal.common.KeepAliveHandler;

import io.micrometer.core.instrument.Timer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -35,9 +36,10 @@ final class Http1ClientKeepAliveHandler extends KeepAliveHandler {
private final Http1ResponseDecoder decoder;

Http1ClientKeepAliveHandler(Channel channel, ClientHttp1ObjectEncoder encoder, Http1ResponseDecoder decoder,
long idleTimeoutMillis, long pingIntervalMillis) {
Timer keepAliveTimer, long idleTimeoutMillis, long pingIntervalMillis) {
// TODO(ikhoon): Should set maxConnectionAgeMillis by https://github.com/line/armeria/pull/2741
super(channel, "client", idleTimeoutMillis, pingIntervalMillis, /* maxConnectionAgeMillis */ 0);
super(channel, "client", keepAliveTimer, idleTimeoutMillis,
pingIntervalMillis, /* maxConnectionAgeMillis */ 0);
httpSession = HttpSession.get(requireNonNull(channel, "channel"));
this.encoder = requireNonNull(encoder, "encoder");
this.decoder = requireNonNull(decoder, "decoder");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@

import javax.annotation.Nullable;

import com.google.common.collect.ImmutableList;

import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.metric.MoreMeters;
import com.linecorp.armeria.internal.common.AbstractHttp2ConnectionHandler;
import com.linecorp.armeria.internal.common.Http2KeepAliveHandler;

import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
Expand All @@ -36,15 +42,18 @@ final class Http2ClientConnectionHandler extends AbstractHttp2ConnectionHandler

Http2ClientConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
Http2Settings initialSettings, Channel channel,
HttpClientFactory clientFactory) {
HttpClientFactory clientFactory, SessionProtocol protocol) {

super(decoder, encoder, initialSettings);
this.clientFactory = clientFactory;

if (clientFactory.idleTimeoutMillis() > 0 || clientFactory.pingIntervalMillis() > 0) {
keepAliveHandler = new Http2ClientKeepAliveHandler(channel, encoder.frameWriter(),
clientFactory.idleTimeoutMillis(),
clientFactory.pingIntervalMillis());
final Timer keepAliveTimer =
MoreMeters.newTimer(clientFactory.meterRegistry(), "armeria.client.connections.lifespan",
ImmutableList.of(Tag.of("protocol", protocol.uriText())));
keepAliveHandler = new Http2ClientKeepAliveHandler(
channel, encoder.frameWriter(), keepAliveTimer,
clientFactory.idleTimeoutMillis(), clientFactory.pingIntervalMillis());
} else {
keepAliveHandler = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.linecorp.armeria.client;

import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.internal.common.AbstractHttp2ConnectionHandlerBuilder;

import io.netty.channel.Channel;
Expand All @@ -27,16 +28,19 @@ final class Http2ClientConnectionHandlerBuilder
Http2ClientConnectionHandlerBuilder> {

private final HttpClientFactory clientFactory;
private final SessionProtocol protocol;

Http2ClientConnectionHandlerBuilder(Channel ch, HttpClientFactory clientFactory) {
Http2ClientConnectionHandlerBuilder(Channel ch, HttpClientFactory clientFactory, SessionProtocol protocol) {
super(ch);
this.clientFactory = clientFactory;
this.protocol = protocol;
}

@Override
protected Http2ClientConnectionHandler build(Http2ConnectionDecoder decoder,
Http2ConnectionEncoder encoder,
Http2Settings initialSettings) throws Exception {
return new Http2ClientConnectionHandler(decoder, encoder, initialSettings, channel(), clientFactory);
return new Http2ClientConnectionHandler(
decoder, encoder, initialSettings, channel(), clientFactory, protocol);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@

import com.linecorp.armeria.internal.common.Http2KeepAliveHandler;

import io.micrometer.core.instrument.Timer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2FrameWriter;

final class Http2ClientKeepAliveHandler extends Http2KeepAliveHandler {
Http2ClientKeepAliveHandler(Channel channel, Http2FrameWriter frameWriter,
Http2ClientKeepAliveHandler(Channel channel, Http2FrameWriter frameWriter, Timer keepAliveTimer,
long idleTimeoutMillis, long pingIntervalMillis) {

// TODO(ikhoon): Should set maxConnectionAgeMillis by https://github.com/line/armeria/pull/2741
super(channel, frameWriter, "client", idleTimeoutMillis,
pingIntervalMillis, /* maxConnectionAgeMillis */ 0);
super(channel, frameWriter, "client", keepAliveTimer,
idleTimeoutMillis, pingIntervalMillis, /* maxConnectionAgeMillis */ 0);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import com.linecorp.armeria.common.util.AsyncCloseable;
import com.linecorp.armeria.common.util.AsyncCloseableSupport;

import io.micrometer.core.instrument.MeterRegistry;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
Expand Down Expand Up @@ -85,6 +86,7 @@ final class HttpChannelPool implements AsyncCloseable {

// Fields for creating a new connection:
private final Bootstrap[] bootstraps;
private final MeterRegistry meterRegistry;
private final int connectTimeoutMillis;
private final boolean useHttp1Pipelining;
private final long idleTimeoutMillis;
Expand Down Expand Up @@ -131,6 +133,7 @@ protected void initChannel(Channel ch) throws Exception {
SessionProtocol.HTTP, SessionProtocol.HTTPS,
SessionProtocol.H1, SessionProtocol.H1C,
SessionProtocol.H2, SessionProtocol.H2C);
meterRegistry = clientFactory.meterRegistry();
connectTimeoutMillis = (Integer) baseBootstrap.config().options()
.get(ChannelOption.CONNECT_TIMEOUT_MILLIS);
useHttp1Pipelining = clientFactory.useHttp1Pipelining();
Expand Down Expand Up @@ -414,8 +417,8 @@ private void initSession(SessionProtocol desiredProtocol, ChannelFuture connectF
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);

ch.pipeline().addLast(
new HttpSessionHandler(this, ch, sessionPromise, timeoutFuture, useHttp1Pipelining,
idleTimeoutMillis, pingIntervalMillis, proxyConfig));
new HttpSessionHandler(this, ch, sessionPromise, timeoutFuture, meterRegistry,
useHttp1Pipelining, idleTimeoutMillis, pingIntervalMillis, proxyConfig));
}

private void notifyConnect(SessionProtocol desiredProtocol, PoolKey key, Future<Channel> future,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
return;
}

addBeforeSessionHandler(p, newHttp2ConnectionHandler(ch));
addBeforeSessionHandler(p, newHttp2ConnectionHandler(ch, H2));
protocol = H2;
} else {
if (httpPreference != HttpPreference.HTTP1_REQUIRED) {
Expand Down Expand Up @@ -285,7 +285,7 @@ private void configureAsHttp(Channel ch) {
}

if (attemptUpgrade) {
final Http2ClientConnectionHandler http2Handler = newHttp2ConnectionHandler(ch);
final Http2ClientConnectionHandler http2Handler = newHttp2ConnectionHandler(ch, H2C);
if (clientFactory.useHttp2Preface()) {
pipeline.addLast(new DowngradeHandler());
pipeline.addLast(http2Handler);
Expand Down Expand Up @@ -598,8 +598,8 @@ static void retryWithH1C(ChannelHandlerContext ctx) {
ctx.close();
}

private Http2ClientConnectionHandler newHttp2ConnectionHandler(Channel ch) {
return new Http2ClientConnectionHandlerBuilder(ch, clientFactory)
private Http2ClientConnectionHandler newHttp2ConnectionHandler(Channel ch, SessionProtocol protocol) {
return new Http2ClientConnectionHandlerBuilder(ch, clientFactory, protocol)
.server(false)
.validateHeaders(false)
.initialSettings(http2Settings())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,21 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.ImmutableList;

import com.linecorp.armeria.client.proxy.ProxyConfig;
import com.linecorp.armeria.common.ClosedSessionException;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.metric.MoreMeters;
import com.linecorp.armeria.common.stream.CancelledSubscriptionException;
import com.linecorp.armeria.common.util.SafeCloseable;
import com.linecorp.armeria.internal.common.InboundTrafficController;
import com.linecorp.armeria.internal.common.RequestContextUtil;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.Channel;
Expand Down Expand Up @@ -70,6 +76,7 @@ final class HttpSessionHandler extends ChannelDuplexHandler implements HttpSessi
private final SocketAddress remoteAddress;
private final Promise<Channel> sessionPromise;
private final ScheduledFuture<?> sessionTimeoutFuture;
private final MeterRegistry meterRegistry;
private final boolean useHttp1Pipelining;
private final long idleTimeoutMillis;
private final long pingIntervalMillis;
Expand Down Expand Up @@ -113,13 +120,14 @@ final class HttpSessionHandler extends ChannelDuplexHandler implements HttpSessi

HttpSessionHandler(HttpChannelPool channelPool, Channel channel,
Promise<Channel> sessionPromise, ScheduledFuture<?> sessionTimeoutFuture,
boolean useHttp1Pipelining, long idleTimeoutMillis, long pingIntervalMillis,
ProxyConfig proxyConfig) {
MeterRegistry meterRegistry, boolean useHttp1Pipelining,
long idleTimeoutMillis, long pingIntervalMillis, ProxyConfig proxyConfig) {
this.channelPool = requireNonNull(channelPool, "channelPool");
this.channel = requireNonNull(channel, "channel");
remoteAddress = channel.remoteAddress();
this.sessionPromise = requireNonNull(sessionPromise, "sessionPromise");
this.sessionTimeoutFuture = requireNonNull(sessionTimeoutFuture, "sessionTimeoutFuture");
this.meterRegistry = meterRegistry;
this.useHttp1Pipelining = useHttp1Pipelining;
this.idleTimeoutMillis = idleTimeoutMillis;
this.pingIntervalMillis = pingIntervalMillis;
Expand Down Expand Up @@ -314,9 +322,13 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
final ClientHttp1ObjectEncoder requestEncoder = new ClientHttp1ObjectEncoder(channel, protocol);
final Http1ResponseDecoder responseDecoder = ctx.pipeline().get(Http1ResponseDecoder.class);
if (idleTimeoutMillis > 0 || pingIntervalMillis > 0) {
final Timer keepAliveTimer =
MoreMeters.newTimer(meterRegistry, "armeria.client.connections.lifespan",
ImmutableList.of(Tag.of("protocol", protocol.uriText())));
final Http1ClientKeepAliveHandler keepAliveHandler =
new Http1ClientKeepAliveHandler(channel, requestEncoder, responseDecoder,
idleTimeoutMillis, pingIntervalMillis);
new Http1ClientKeepAliveHandler(
channel, requestEncoder, responseDecoder,
keepAliveTimer, idleTimeoutMillis, pingIntervalMillis);
requestEncoder.setKeepAliveHandler(keepAliveHandler);
responseDecoder.setKeepAliveHandler(ctx, keepAliveHandler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import com.linecorp.armeria.common.Flags;

import io.micrometer.core.instrument.Timer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
Expand Down Expand Up @@ -70,9 +71,9 @@ public abstract class Http2KeepAliveHandler extends KeepAliveHandler {
private long lastPingPayload;

protected Http2KeepAliveHandler(Channel channel, Http2FrameWriter frameWriter, String name,
long idleTimeoutMillis, long pingIntervalMillis,
Timer keepAliveTimer, long idleTimeoutMillis, long pingIntervalMillis,
long maxConnectionAgeMillis) {
super(channel, name, idleTimeoutMillis, pingIntervalMillis, maxConnectionAgeMillis);
super(channel, name, keepAliveTimer, idleTimeoutMillis, pingIntervalMillis, maxConnectionAgeMillis);
this.channel = requireNonNull(channel, "channel");
this.frameWriter = requireNonNull(frameWriter, "frameWriter");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

import com.linecorp.armeria.common.util.Exceptions;

import io.micrometer.core.instrument.Timer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
Expand All @@ -71,6 +72,7 @@ public abstract class KeepAliveHandler {

private final Channel channel;
private final String name;
private final Timer keepAliveTimer;

@Nullable
private ScheduledFuture<?> connectionIdleTimeout;
Expand All @@ -96,10 +98,11 @@ public abstract class KeepAliveHandler {
@Nullable
private Future<?> shutdownFuture;

protected KeepAliveHandler(Channel channel, String name,
protected KeepAliveHandler(Channel channel, String name, Timer keepAliveTimer,
long idleTimeoutMillis, long pingIntervalMillis, long maxConnectionAgeMillis) {
this.channel = channel;
this.name = name;
this.keepAliveTimer = keepAliveTimer;

if (idleTimeoutMillis <= 0) {
connectionIdleTimeNanos = 0;
Expand Down Expand Up @@ -127,8 +130,13 @@ public final void initialize(ChannelHandlerContext ctx) {
return;
}
isInitialized = true;
lastConnectionIdleTime = lastPingIdleTime = System.nanoTime();

final long connectionStartTimeNanos = System.nanoTime();
ctx.channel().closeFuture().addListener(unused -> {
keepAliveTimer.record(System.nanoTime() - connectionStartTimeNanos, TimeUnit.NANOSECONDS);
});

lastConnectionIdleTime = lastPingIdleTime = connectionStartTimeNanos;
if (connectionIdleTimeNanos > 0) {
connectionIdleTimeout = executor().schedule(new ConnectionIdleTimeoutTask(ctx),
connectionIdleTimeNanos, TimeUnit.NANOSECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

import com.linecorp.armeria.internal.common.KeepAliveHandler;

import io.micrometer.core.instrument.Timer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;

class Http1ServerKeepAliveHandler extends KeepAliveHandler {
Http1ServerKeepAliveHandler(Channel channel, long idleTimeoutMillis, long maxConnectionAgeMillis) {
super(channel, "server", idleTimeoutMillis, 0, maxConnectionAgeMillis);
Http1ServerKeepAliveHandler(Channel channel, Timer keepAliveTimer,
long idleTimeoutMillis, long maxConnectionAgeMillis) {
super(channel, "server", keepAliveTimer, idleTimeoutMillis, 0, maxConnectionAgeMillis);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.linecorp.armeria.internal.common.Http2KeepAliveHandler;
import com.linecorp.armeria.internal.common.KeepAliveHandler;

import io.micrometer.core.instrument.Timer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
Expand All @@ -38,16 +39,16 @@ final class Http2ServerConnectionHandler extends AbstractHttp2ConnectionHandler

Http2ServerConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
Http2Settings initialSettings, Channel channel, ServerConfig config,
GracefulShutdownSupport gracefulShutdownSupport, String scheme) {
Timer keepAliveTimer, GracefulShutdownSupport gracefulShutdownSupport,
String scheme) {

super(decoder, encoder, initialSettings);
this.gracefulShutdownSupport = gracefulShutdownSupport;

if (config.idleTimeoutMillis() > 0 || config.pingIntervalMillis() > 0) {
keepAliveHandler = new Http2ServerKeepAliveHandler(channel, encoder().frameWriter(),
config.idleTimeoutMillis(),
config.pingIntervalMillis(),
config.maxConnectionAgeMillis());
keepAliveHandler = new Http2ServerKeepAliveHandler(
channel, encoder().frameWriter(), keepAliveTimer,
config.idleTimeoutMillis(), config.pingIntervalMillis(), config.maxConnectionAgeMillis());
} else {
keepAliveHandler = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.linecorp.armeria.internal.common.AbstractHttp2ConnectionHandlerBuilder;

import io.micrometer.core.instrument.Timer;
import io.netty.channel.Channel;
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
Expand All @@ -27,14 +28,15 @@ final class Http2ServerConnectionHandlerBuilder
Http2ServerConnectionHandlerBuilder> {

private final ServerConfig config;
private final Timer keepAliveTimer;
private final GracefulShutdownSupport gracefulShutdownSupport;
private final String scheme;

Http2ServerConnectionHandlerBuilder(Channel ch, ServerConfig config,
GracefulShutdownSupport gracefulShutdownSupport,
String scheme) {
Http2ServerConnectionHandlerBuilder(Channel ch, ServerConfig config, Timer keepAliveTimer,
GracefulShutdownSupport gracefulShutdownSupport, String scheme) {
super(ch);
this.config = config;
this.keepAliveTimer = keepAliveTimer;
this.gracefulShutdownSupport = gracefulShutdownSupport;
this.scheme = scheme;
}
Expand All @@ -44,6 +46,6 @@ protected Http2ServerConnectionHandler build(Http2ConnectionDecoder decoder,
Http2ConnectionEncoder encoder,
Http2Settings initialSettings) throws Exception {
return new Http2ServerConnectionHandler(decoder, encoder, initialSettings, channel(),
config, gracefulShutdownSupport, scheme);
config, keepAliveTimer, gracefulShutdownSupport, scheme);
}
}

0 comments on commit 74af186

Please sign in to comment.