Skip to content

Commit

Permalink
[branch-2.9][broker] Full-support set ssl provider, ciphers and proto…
Browse files Browse the repository at this point in the history
…cols (#15226)
  • Loading branch information
nodece committed Apr 21, 2022
1 parent 0aad9a1 commit 6b6a32d
Show file tree
Hide file tree
Showing 22 changed files with 452 additions and 91 deletions.
8 changes: 5 additions & 3 deletions conf/broker.conf
Expand Up @@ -594,13 +594,15 @@ tlsCiphers=
# authentication.
tlsRequireTrustedClientCertOnConnect=false

# Specify the TLS provider for the broker service:
# When using TLS authentication with CACert, the valid value is either OPENSSL or JDK.
# When using TLS authentication with KeyStore, available values can be SunJSSE, Conscrypt and etc.
tlsProvider=

### --- KeyStore TLS config variables --- ###
# Enable TLS with KeyStore type configuration in broker.
tlsEnabledWithKeyStore=false

# TLS Provider for KeyStore type
tlsProvider=

# TLS KeyStore type configuration in broker: JKS, PKCS12
tlsKeyStoreType=JKS

Expand Down
8 changes: 5 additions & 3 deletions conf/standalone.conf
Expand Up @@ -357,13 +357,15 @@ tlsCiphers=
# authentication.
tlsRequireTrustedClientCertOnConnect=false

# Specify the TLS provider for the broker service:
# When using TLS authentication with CACert, the valid value is either OPENSSL or JDK.
# When using TLS authentication with KeyStore, available values can be SunJSSE, Conscrypt and etc.
tlsProvider=

### --- KeyStore TLS config variables --- ###
# Enable TLS with KeyStore type configuration in broker.
tlsEnabledWithKeyStore=false

# TLS Provider for KeyStore type
tlsProvider=

# TLS KeyStore type configuration in broker: JKS, PKCS12
tlsKeyStoreType=JKS

Expand Down
Expand Up @@ -2237,7 +2237,9 @@ public class ServiceConfiguration implements PulsarConfiguration {

@FieldContext(
category = CATEGORY_KEYSTORE_TLS,
doc = "TLS Provider for KeyStore type"
doc = "TLS Provider for Specify the SSL provider for the broker service: \n"
+ "When using TLS authentication with CACert, the valid value is either OPENSSL or JDK.\n"
+ "When using TLS authentication with KeyStore, available values can be SunJSSE, Conscrypt and etc."
)
private String tlsProvider = null;

Expand Down
Expand Up @@ -28,6 +28,7 @@
import io.netty.handler.flow.FlowControlHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.SslProvider;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import lombok.Builder;
Expand Down Expand Up @@ -92,10 +93,18 @@ public PulsarChannelInitializer(PulsarService pulsar, PulsarChannelOptions opts)
serviceConfig.getTlsProtocols(),
serviceConfig.getTlsCertRefreshCheckDurationSec());
} else {
sslCtxRefresher = new NettyServerSslContextBuilder(serviceConfig.isTlsAllowInsecureConnection(),
serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(),
SslProvider sslProvider = null;
if (serviceConfig.getTlsProvider() != null) {
sslProvider = SslProvider.valueOf(serviceConfig.getTlsProvider());
}
sslCtxRefresher = new NettyServerSslContextBuilder(
sslProvider,
serviceConfig.isTlsAllowInsecureConnection(),
serviceConfig.getTlsTrustCertsFilePath(),
serviceConfig.getTlsCertificateFilePath(),
serviceConfig.getTlsKeyFilePath(),
serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(),
serviceConfig.getTlsCiphers(),
serviceConfig.getTlsProtocols(),
serviceConfig.isTlsRequireTrustedClientCertOnConnect(),
serviceConfig.getTlsCertRefreshCheckDurationSec());
}
Expand Down
Expand Up @@ -21,6 +21,7 @@
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslProvider;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -137,21 +138,32 @@ public boolean keepAlive(InetSocketAddress remoteAddress, Request ahcRequest,
JsseSslEngineFactory sslEngineFactory = new JsseSslEngineFactory(sslCtx);
confBuilder.setSslEngineFactory(sslEngineFactory);
} else {
SslProvider sslProvider = null;
if (conf.getSslProvider() != null) {
sslProvider = SslProvider.valueOf(conf.getSslProvider());
}
SslContext sslCtx = null;
if (authData.hasDataForTls()) {
sslCtx = authData.getTlsTrustStoreStream() == null
? SecurityUtility.createAutoRefreshSslContextForClient(
conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
conf.getTlsTrustCertsFilePath(), authData.getTlsCerificateFilePath(),
authData.getTlsPrivateKeyFilePath(), null, autoCertRefreshTimeSeconds, delayer)
sslProvider,
conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
conf.getTlsTrustCertsFilePath(), authData.getTlsCerificateFilePath(),
authData.getTlsPrivateKeyFilePath(), null, autoCertRefreshTimeSeconds, delayer)
: SecurityUtility.createNettySslContextForClient(
conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
authData.getTlsTrustStoreStream(), authData.getTlsCertificates(),
authData.getTlsPrivateKey());
sslProvider,
conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
authData.getTlsTrustStoreStream(), authData.getTlsCertificates(),
authData.getTlsPrivateKey(),
conf.getTlsCiphers(),
conf.getTlsProtocols());
} else {
sslCtx = SecurityUtility.createNettySslContextForClient(
sslProvider,
conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
conf.getTlsTrustCertsFilePath());
conf.getTlsTrustCertsFilePath(),
conf.getTlsCiphers(),
conf.getTlsProtocols());
}
confBuilder.setSslContext(sslCtx);
}
Expand Down
Expand Up @@ -18,24 +18,24 @@
*/
package org.apache.pulsar.client.impl;

import io.netty.channel.EventLoopGroup;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslProvider;
import java.io.Closeable;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.security.GeneralSecurityException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import io.netty.channel.EventLoopGroup;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.ssl.SslContext;
import javax.net.ssl.SSLContext;
import lombok.extern.slf4j.Slf4j;

import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
Expand Down Expand Up @@ -111,25 +111,33 @@ public boolean keepAlive(InetSocketAddress remoteAddress, Request ahcRequest,
JsseSslEngineFactory sslEngineFactory = new JsseSslEngineFactory(sslCtx);
confBuilder.setSslEngineFactory(sslEngineFactory);
} else {
SslProvider sslProvider = null;
if (conf.getSslProvider() != null) {
sslProvider = SslProvider.valueOf(conf.getSslProvider());
}
SslContext sslCtx = null;
if (authData.hasDataForTls()) {
sslCtx = authData.getTlsTrustStoreStream() == null
? SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
conf.getTlsTrustCertsFilePath(), authData.getTlsCertificates(),
authData.getTlsPrivateKey())
: SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
authData.getTlsTrustStoreStream(), authData.getTlsCertificates(),
authData.getTlsPrivateKey());
}
else {
? SecurityUtility.createNettySslContextForClient(sslProvider,
conf.isTlsAllowInsecureConnection(),
conf.getTlsTrustCertsFilePath(), authData.getTlsCertificates(),
authData.getTlsPrivateKey(), conf.getTlsCiphers(), conf.getTlsProtocols())
: SecurityUtility.createNettySslContextForClient(sslProvider,
conf.isTlsAllowInsecureConnection(),
authData.getTlsTrustStoreStream(), authData.getTlsCertificates(),
authData.getTlsPrivateKey(), conf.getTlsCiphers(), conf.getTlsProtocols());
} else {
sslCtx = SecurityUtility.createNettySslContextForClient(
sslProvider,
conf.isTlsAllowInsecureConnection(),
conf.getTlsTrustCertsFilePath());
conf.getTlsTrustCertsFilePath(), conf.getTlsCiphers(), conf.getTlsProtocols());
}
confBuilder.setSslContext(sslCtx);
}

confBuilder.setUseInsecureTrustManager(conf.isTlsAllowInsecureConnection());
} catch (GeneralSecurityException e) {
throw new PulsarClientException.InvalidConfigurationException(e);
} catch (Exception e) {
throw new PulsarClientException.InvalidConfigurationException(e);
}
Expand Down
Expand Up @@ -18,6 +18,13 @@
*/
package org.apache.pulsar.client.impl;

import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.SslProvider;
import java.net.InetSocketAddress;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
Expand All @@ -36,13 +43,6 @@
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder;

import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;

@Slf4j
public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> {

Expand Down Expand Up @@ -93,19 +93,36 @@ public PulsarChannelInitializer(ClientConfigurationData conf, Supplier<ClientCnx

sslContextSupplier = new ObjectCache<SslContext>(() -> {
try {
SslProvider sslProvider = null;
if (conf.getSslProvider() != null) {
sslProvider = SslProvider.valueOf(conf.getSslProvider());
}

// Set client certificate if available
AuthenticationDataProvider authData = conf.getAuthentication().getAuthData();
if (authData.hasDataForTls()) {
return authData.getTlsTrustStoreStream() == null
? SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
conf.getTlsTrustCertsFilePath(),
authData.getTlsCertificates(), authData.getTlsPrivateKey())
: SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
authData.getTlsTrustStoreStream(),
authData.getTlsCertificates(), authData.getTlsPrivateKey());
? SecurityUtility.createNettySslContextForClient(
sslProvider,
conf.isTlsAllowInsecureConnection(),
conf.getTlsTrustCertsFilePath(),
authData.getTlsCertificates(),
authData.getTlsPrivateKey(),
conf.getTlsCiphers(),
conf.getTlsProtocols())
: SecurityUtility.createNettySslContextForClient(sslProvider,
conf.isTlsAllowInsecureConnection(),
authData.getTlsTrustStoreStream(),
authData.getTlsCertificates(), authData.getTlsPrivateKey(),
conf.getTlsCiphers(),
conf.getTlsProtocols());
} else {
return SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
conf.getTlsTrustCertsFilePath());
return SecurityUtility.createNettySslContextForClient(
sslProvider,
conf.isTlsAllowInsecureConnection(),
conf.getTlsTrustCertsFilePath(),
conf.getTlsCiphers(),
conf.getTlsProtocols());
}
} catch (Exception e) {
throw new RuntimeException("Failed to create TLS context", e);
Expand Down
Expand Up @@ -19,10 +19,12 @@
package org.apache.pulsar.common.util;

import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslProvider;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.cert.X509Certificate;
import java.util.Set;
import javax.net.ssl.SSLException;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
Expand All @@ -33,35 +35,50 @@
@Slf4j
public class NettyClientSslContextRefresher extends SslContextAutoRefreshBuilder<SslContext> {
private volatile SslContext sslNettyContext;
private boolean tlsAllowInsecureConnection;
private final boolean tlsAllowInsecureConnection;
protected final FileModifiedTimeUpdater tlsTrustCertsFilePath;
private AuthenticationDataProvider authData;
protected final FileModifiedTimeUpdater tlsCertsFilePath;
protected final FileModifiedTimeUpdater tlsPrivateKeyFilePath;
private final AuthenticationDataProvider authData;
private final SslProvider sslProvider;
private final Set<String> ciphers;
private final Set<String> protocols;

public NettyClientSslContextRefresher(boolean allowInsecure,
public NettyClientSslContextRefresher(SslProvider sslProvider, boolean allowInsecure,
String trustCertsFilePath,
AuthenticationDataProvider authData,
Set<String> ciphers,
Set<String> protocols,
long delayInSeconds)
throws IOException, GeneralSecurityException {
super(delayInSeconds);
this.tlsAllowInsecureConnection = allowInsecure;
this.tlsTrustCertsFilePath = new FileModifiedTimeUpdater(trustCertsFilePath);
this.authData = authData;
this.tlsCertsFilePath = new FileModifiedTimeUpdater(
authData != null ? authData.getTlsCerificateFilePath() : null);
this.tlsPrivateKeyFilePath = new FileModifiedTimeUpdater(
authData != null ? authData.getTlsPrivateKeyFilePath() : null);
this.sslProvider = sslProvider;
this.ciphers = ciphers;
this.protocols = protocols;
}

@Override
public synchronized SslContext update()
throws SSLException, FileNotFoundException, GeneralSecurityException, IOException {
if (authData != null && authData.hasDataForTls()) {
this.sslNettyContext = authData.getTlsTrustStoreStream() == null
? SecurityUtility.createNettySslContextForClient(this.tlsAllowInsecureConnection,
tlsTrustCertsFilePath.getFileName(), (X509Certificate[]) authData.getTlsCertificates(),
authData.getTlsPrivateKey())
: SecurityUtility.createNettySslContextForClient(this.tlsAllowInsecureConnection,
authData.getTlsTrustStoreStream(), (X509Certificate[]) authData.getTlsCertificates(),
authData.getTlsPrivateKey());
? SecurityUtility.createNettySslContextForClient(this.sslProvider, this.tlsAllowInsecureConnection,
tlsTrustCertsFilePath.getFileName(), (X509Certificate[]) authData.getTlsCertificates(),
authData.getTlsPrivateKey(), this.ciphers, this.protocols)
: SecurityUtility.createNettySslContextForClient(this.sslProvider, this.tlsAllowInsecureConnection,
authData.getTlsTrustStoreStream(), (X509Certificate[]) authData.getTlsCertificates(),
authData.getTlsPrivateKey(), this.ciphers, this.protocols);
} else {
this.sslNettyContext = SecurityUtility.createNettySslContextForClient(this.tlsAllowInsecureConnection,
this.tlsTrustCertsFilePath.getFileName());
this.sslNettyContext =
SecurityUtility.createNettySslContextForClient(this.sslProvider, this.tlsAllowInsecureConnection,
this.tlsTrustCertsFilePath.getFileName(), this.ciphers, this.protocols);
}
return this.sslNettyContext;
}
Expand All @@ -73,6 +90,8 @@ public SslContext getSslContext() {

@Override
public boolean needUpdate() {
return tlsTrustCertsFilePath.checkAndRefresh();
return tlsTrustCertsFilePath.checkAndRefresh() || tlsCertsFilePath.checkAndRefresh()
|| tlsPrivateKeyFilePath.checkAndRefresh();

}
}

0 comments on commit 6b6a32d

Please sign in to comment.