Skip to content

Commit

Permalink
Start spike on fluent ConnectionFactory configuration API
Browse files Browse the repository at this point in the history
The ConnectionFactory class has dozens of parameters.
It can be overwhelming to configure, especially
for tricky topics like TLS where the parameters are among
the other dozens, without clear way to find them.

This commit introduces an API to simplify the configuration
of ConnectionFactory. It is fluent, uses modern API (e.g. Duration
for timeout), and groups common settings in sub-API.

The configuration API will be introduced in 5.x, marked as experimental,
and refined in minor releases.

The traditional setter-based API will be marked deprecated 6.x and
removed in 7.x.

Benefits of the new configuration API:
* fluent, the method calls can be chained and formatted in a logical
way (it does not have to be 1 line = 1 parameter).
* the different timeout settings use the Duration type, instead of
int. It is no longer necessary to know the unit (seconds or
milliseconds).
* parameters for the same topic are grouped into dedicated configuration
API (TLS, NIO, OAuth2, recovery, etc). It makes it much easier to
configure those parts, as available settings will show up automatically
in the IDE auto-completion, and not among the other dozens of settings.
* more opinionated but easier configuration. OAuth2 is an example:
no need to use builder classes with very long names, the refresh service
part is also in the #oauth2() sub-configuration, whereas it's a
separate setter in ConnectionFactory.

References #608, #1139
  • Loading branch information
acogoluegnes committed Jan 29, 2024
1 parent 7a01c8e commit 25b9036
Show file tree
Hide file tree
Showing 3 changed files with 294 additions and 7 deletions.
16 changes: 9 additions & 7 deletions src/main/java/com/rabbitmq/client/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,9 @@ public ConnectionFactory setVirtualHost(String virtualHost) {
public ConnectionFactory setUri(URI uri)
throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException
{
if ("amqp".equals(uri.getScheme().toLowerCase())) {
if ("amqp".equalsIgnoreCase(uri.getScheme())) {
// nothing special to do
} else if ("amqps".equals(uri.getScheme().toLowerCase())) {
} else if ("amqps".equalsIgnoreCase(uri.getScheme())) {
setPort(DEFAULT_AMQP_OVER_SSL_PORT);
// SSL context factory not set yet, we use the default one
if (this.sslContextFactory == null) {
Expand Down Expand Up @@ -1253,7 +1253,7 @@ public Connection newConnection(ExecutorService executor, AddressResolver addres
ConnectionParams params = params(executor);
// set client-provided via a client property
if (clientProvidedName != null) {
Map<String, Object> properties = new HashMap<String, Object>(params.getClientProperties());
Map<String, Object> properties = new HashMap<>(params.getClientProperties());
properties.put("connection_name", clientProvidedName);
params.setClientProperties(properties);
}
Expand All @@ -1277,16 +1277,14 @@ public Connection newConnection(ExecutorService executor, AddressResolver addres
conn.start();
this.metricsCollector.newConnection(conn);
return conn;
} catch (IOException e) {
} catch (IOException | TimeoutException e) {
lastException = e;
} catch (TimeoutException te) {
lastException = te;
}
}
if (lastException != null) {
if (lastException instanceof IOException) {
throw (IOException) lastException;
} else if (lastException instanceof TimeoutException) {
} else {
throw (TimeoutException) lastException;
}
}
Expand Down Expand Up @@ -1762,4 +1760,8 @@ public ConnectionFactory setTrafficListener(TrafficListener trafficListener) {
this.trafficListener = trafficListener;
return this;
}

public static ConnectionFactoryConfiguration configure() {
return null;
}
}
207 changes: 207 additions & 0 deletions src/main/java/com/rabbitmq/client/ConnectionFactoryConfiguration.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package com.rabbitmq.client;

import com.rabbitmq.client.impl.CredentialsProvider;
import com.rabbitmq.client.impl.CredentialsRefreshService;
import com.rabbitmq.client.impl.ErrorOnWriteListener;
import com.rabbitmq.client.impl.nio.ByteBufferFactory;
import com.rabbitmq.client.impl.nio.NioContext;
import com.rabbitmq.client.impl.nio.NioQueue;
import com.rabbitmq.client.impl.recovery.RecoveredQueueNameSupplier;
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
import com.rabbitmq.client.observation.ObservationCollector;
import java.net.HttpURLConnection;
import java.net.URI;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import javax.net.SocketFactory;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManager;

public interface ConnectionFactoryConfiguration {

ConnectionFactoryConfiguration host(String name);
ConnectionFactoryConfiguration port(int port);
ConnectionFactoryConfiguration username(String username);
ConnectionFactoryConfiguration password(String username);
ConnectionFactoryConfiguration virtualHost(String virtualHost);
ConnectionFactoryConfiguration uri(URI uri);
ConnectionFactoryConfiguration uri(String uri);

ConnectionFactoryConfiguration requestedChannelMax(int requestedChannelMax);
ConnectionFactoryConfiguration requestedFrameMax(int requestedFrameMax);
ConnectionFactoryConfiguration requestedHeartbeat(Duration heartbeat);
ConnectionFactoryConfiguration connectionTimeout(Duration timeout);
ConnectionFactoryConfiguration handshakeTimeout(Duration timeout);
ConnectionFactoryConfiguration shutdownTimeout(Duration timeout);
ConnectionFactoryConfiguration channelRpcTimeout(Duration timeout);

ConnectionFactoryConfiguration maxInboundMessageBodySize(int maxInboundMessageBodySize);
ConnectionFactoryConfiguration channelShouldCheckRpcResponseType(boolean channelShouldCheckRpcResponseType);
ConnectionFactoryConfiguration workPoolTimeout(Duration timeout);

ConnectionFactoryConfiguration errorOnWriteListener(ErrorOnWriteListener errorOnWriteListener);

ConnectionFactoryConfiguration trafficListener(TrafficListener trafficListener);

// TODO provide helper for client properties
ConnectionFactoryConfiguration clientProperties(Map<String, Object> clientProperties);
ConnectionFactoryConfiguration clientProperty(String name, Object value);

ConnectionFactoryConfiguration saslConfig(SaslConfig saslConfig);

ConnectionFactoryConfiguration socketFactory(SocketFactory socketFactory);

ConnectionFactoryConfiguration socketConfigurator(SocketConfigurator socketConfigurator);

ConnectionFactoryConfiguration sharedExecutor(ExecutorService executorService);
ConnectionFactoryConfiguration shutdownExecutor(ExecutorService executorService);
ConnectionFactoryConfiguration heartbeatExecutor(ExecutorService executorService);
ConnectionFactoryConfiguration threadFactory(ThreadFactory threadFactory);

ConnectionFactoryConfiguration exceptionHandler(ExceptionHandler exceptionHandler);

ConnectionFactoryConfiguration metricsCollector(MetricsCollector metricsCollector);
ConnectionFactoryConfiguration observationCollector(ObservationCollector observationCollector);

// TODO special configuration for credentials, especially for OAuth?
ConnectionFactoryConfiguration credentialsProvider(CredentialsProvider credentialsProvider);
ConnectionFactoryConfiguration credentialsRefreshService(CredentialsRefreshService credentialsRefreshService);

NioConfiguration nio();

TlsConfiguration tls();

OAuth2Configuration oauth2();

ConnectionFactory create();

interface NioConfiguration {

NioConfiguration readByteBufferSize(int readByteBufferSize);

NioConfiguration writeByteBufferSize(int writeByteBufferSize);

NioConfiguration nbIoThreads(int nbIoThreads);

NioConfiguration writeEnqueuingTimeout(Duration writeEnqueuingTimeout);

NioConfiguration writeQueueCapacity(int writeQueueCapacity);

NioConfiguration executor(ExecutorService executorService);

NioConfiguration threadFactory(ThreadFactory threadFactory);

NioConfiguration socketChannelConfigurator(SocketChannelConfigurator configurator);

NioConfiguration sslEngineConfigurator(SslEngineConfigurator configurator);

NioConfiguration connectionShutdownExecutor(ExecutorService executorService);

NioConfiguration byteBufferFactory(ByteBufferFactory byteBufferFactory);

NioConfiguration writeQueueFactory(Function<NioContext, NioQueue> writeQueueFactory);

ConnectionFactoryConfiguration configuration();


}

interface TlsConfiguration {

TlsConfiguration hostnameVerification();

TlsConfiguration hostnameVerification(boolean hostnameVerification);

TlsConfiguration sslContextFactory(SslContextFactory sslContextFactory);

TlsConfiguration protocol(String protocol);

TlsConfiguration trustManager(TrustManager trustManager);

TlsConfiguration trustEverything();

TlsConfiguration sslContext(SSLContext sslContext);

ConnectionFactoryConfiguration configuration();

}

interface RecoveryConfiguration {

RecoveryConfiguration enableConnectionRecovery();
RecoveryConfiguration enableConnectionRecovery(boolean connectionRecovery);

RecoveryConfiguration enableTopologyRecovery();
RecoveryConfiguration enableTopologyRecovery(boolean connectionRecovery);

RecoveryConfiguration topologyRecoveryExecutor(ExecutorService executorService);

RecoveryConfiguration recoveryInterval(Duration interval);

RecoveryConfiguration recoveryDelayHandler(RecoveryDelayHandler recoveryDelayHandler);

RecoveryConfiguration topologyRecoveryFilter(TopologyRecoveryFilter topologyRecoveryFilter);

RecoveryConfiguration recoveryTriggeringCondition(Predicate<ShutdownSignalException> connectionRecoveryTriggeringCondition);

RecoveryConfiguration recoveredQueueNameSupplier(RecoveredQueueNameSupplier recoveredQueueNameSupplier);

ConnectionFactoryConfiguration configuration();
}

interface OAuth2Configuration {

OAuth2Configuration tokenEndpointUri(String tokenEndpointUri);

OAuth2Configuration clientId(String clientId);

OAuth2Configuration clientSecret(String clientSecret);

OAuth2Configuration grantType(String grantType);

OAuth2Configuration parameter(String name, String value);

OAuth2Configuration connectionConfigurator(Consumer<HttpURLConnection> connectionConfigurator);

OAuth2TlsConfiguration tls();

OAuth2CredentialsRefreshConfiguration refresh();

ConnectionFactoryConfiguration configuration();
}

interface OAuth2TlsConfiguration {

OAuth2TlsConfiguration hostnameVerifier(HostnameVerifier hostnameVerifier);

OAuth2TlsConfiguration sslSocketFactory(SSLSocketFactory sslSocketFactory);

OAuth2TlsConfiguration sslContext(SSLContext sslContext);

OAuth2TlsConfiguration trustEverything();

OAuth2Configuration oauth2();

}

interface OAuth2CredentialsRefreshConfiguration {

OAuth2CredentialsRefreshConfiguration refreshDelayStrategy(Function<Duration, Duration> refreshDelayStrategy);

OAuth2CredentialsRefreshConfiguration approachingExpirationStrategy(Function<Duration, Boolean> approachingExpirationStrategy);

OAuth2CredentialsRefreshConfiguration scheduler(ScheduledThreadPoolExecutor scheduler);

OAuth2Configuration oauth2();

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.rabbitmq.client;

import com.rabbitmq.client.impl.CredentialsProvider;
import com.rabbitmq.client.impl.CredentialsRefreshService;
import com.rabbitmq.client.impl.DefaultCredentialsRefreshService;
import com.rabbitmq.client.impl.OAuth2ClientCredentialsGrantCredentialsProvider;
import com.rabbitmq.client.impl.nio.NioParams;

import javax.net.ssl.SSLContext;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;

import static com.rabbitmq.client.impl.DefaultCredentialsRefreshService.ratioRefreshDelayStrategy;

public class ConnectionFactoryConfigurationDemo {

public static void main(String[] args) throws Exception {
SSLContext sslContext = SSLContext.getDefault();

// historical configuration with ConnectionFactory setters
ConnectionFactory cf = new ConnectionFactory();
cf.setUri("amqp://rabbitmq-1:5672/foo");
cf.setChannelRpcTimeout(10_000); // unit?
Map<String, Object> clientProperties = Collections.singletonMap("foo", "bar");
cf.setClientProperties(clientProperties);
cf.useSslProtocol("TLSv1.3", new TrustEverythingTrustManager());
NioParams nioParams = new NioParams();
nioParams.setNbIoThreads(4);
cf.setNioParams(nioParams);

CredentialsProvider credentialsProvider =
new OAuth2ClientCredentialsGrantCredentialsProvider.OAuth2ClientCredentialsGrantCredentialsProviderBuilder()
.tokenEndpointUri("http://localhost:8080/uaa/oauth/token/")
.clientId("rabbit_client").clientSecret("rabbit_secret")
.grantType("password")
.parameter("username", "rabbit_super")
.parameter("password", "rabbit_super")
.tls()
.sslContext(sslContext)
.builder()
.build();
cf.setCredentialsProvider(credentialsProvider);
CredentialsRefreshService refreshService =
new DefaultCredentialsRefreshService.DefaultCredentialsRefreshServiceBuilder()
.refreshDelayStrategy(ratioRefreshDelayStrategy(0.8))
.build();
cf.setCredentialsRefreshService(refreshService);

// configuration with new configuration API
ConnectionFactory.configure()
.uri("amqp://rabbitmq-1:5672/foo")
.channelRpcTimeout(Duration.ofSeconds(10)) // Duration class instead of int
.clientProperty("foo", "bar")
.tls() // TLS configuration API
.protocol("TLSv1.3")
.trustEverything()
.configuration() // back to main configuration
.nio() // NIO configuration API
.nbIoThreads(4)
.configuration() // back to main configuration
.oauth2() // OAuth 2 configuration API
.tokenEndpointUri("http://localhost:8080/uaa/oauth/token/")
.clientId("rabbit_client").clientSecret("rabbit_secret")
.grantType("password")
.parameter("username", "rabbit_super")
.parameter("password", "rabbit_super")
.tls() // OAuth 2 TLS
.sslContext(sslContext)
.oauth2()
.refresh() // OAuth refresh configuration
.refreshDelayStrategy(ratioRefreshDelayStrategy(0.8))
.oauth2()
.configuration() // back to main configuration
.create();
}

}

0 comments on commit 25b9036

Please sign in to comment.