Skip to content

Commit

Permalink
Deal with unsupported DisposableServer operations
Browse files Browse the repository at this point in the history
Update `NettyWebServer` to deal with any `UnsupportedOperationException`
thrown from `DisposableServer`. Specifically, this commit allows the
`NettyWebServer` to work with domain socket backed servers which cannot
provide a port.

Fixes gh-24529
  • Loading branch information
philwebb committed Dec 16, 2020
1 parent a1ea5b4 commit 7fd4c53
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 22 deletions.
Expand Up @@ -69,11 +69,17 @@ public NettyReactiveWebServerFactory(int port) {
public WebServer getWebServer(HttpHandler httpHandler) {
HttpServer httpServer = createHttpServer();
ReactorHttpHandlerAdapter handlerAdapter = new ReactorHttpHandlerAdapter(httpHandler);
NettyWebServer webServer = new NettyWebServer(httpServer, handlerAdapter, this.lifecycleTimeout, getShutdown());
NettyWebServer webServer = createNettyWebServer(httpServer, handlerAdapter, this.lifecycleTimeout,
getShutdown());
webServer.setRouteProviders(this.routeProviders);
return webServer;
}

NettyWebServer createNettyWebServer(HttpServer httpServer, ReactorHttpHandlerAdapter handlerAdapter,
Duration lifecycleTimeout, Shutdown shutdown) {
return new NettyWebServer(httpServer, handlerAdapter, lifecycleTimeout, shutdown);
}

/**
* Returns a mutable collection of the {@link NettyServerCustomizer}s that will be
* applied to the Netty server builder.
Expand Down
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;

import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.unix.Errors.NativeIoException;
Expand Down Expand Up @@ -106,11 +107,44 @@ public void start() throws WebServerException {
});
throw new WebServerException("Unable to start Netty", ex);
}
logger.info("Netty started on port(s): " + getPort());
if (this.disposableServer != null) {
logger.info("Netty started" + getStartedOnMessage(this.disposableServer));
}
startDaemonAwaitThread(this.disposableServer);
}
}

private String getStartedOnMessage(DisposableServer server) {
StringBuilder message = new StringBuilder();
tryAppend(message, "port %s", () -> server.port());
tryAppend(message, "path %s", () -> server.path());
return (message.length() > 0) ? " on " + message : "";
}

private void tryAppend(StringBuilder message, String format, Supplier<Object> supplier) {
try {
Object value = supplier.get();
message.append((message.length() != 0) ? " " : "");
message.append(String.format(format, value));
}
catch (UnsupportedOperationException ex) {
}
}

DisposableServer startHttpServer() {
HttpServer server = this.httpServer;
if (this.routeProviders.isEmpty()) {
server = server.handle(this.handler);
}
else {
server = server.route(this::applyRouteProviders);
}
if (this.lifecycleTimeout != null) {
return server.bindNow(this.lifecycleTimeout);
}
return server.bindNow();
}

private boolean isPermissionDenied(Throwable bindExceptionCause) {
try {
if (bindExceptionCause instanceof NativeIoException) {
Expand All @@ -131,20 +165,6 @@ public void shutDownGracefully(GracefulShutdownCallback callback) {
this.gracefulShutdown.shutDownGracefully(callback);
}

private DisposableServer startHttpServer() {
HttpServer server = this.httpServer;
if (this.routeProviders.isEmpty()) {
server = server.handle(this.handler);
}
else {
server = server.route(this::applyRouteProviders);
}
if (this.lifecycleTimeout != null) {
return server.bindNow(this.lifecycleTimeout);
}
return server.bindNow();
}

private void applyRouteProviders(HttpServerRoutes routes) {
for (NettyRouteProvider provider : this.routeProviders) {
routes = provider.apply(routes);
Expand Down Expand Up @@ -190,7 +210,12 @@ public void stop() throws WebServerException {
@Override
public int getPort() {
if (this.disposableServer != null) {
return this.disposableServer.port();
try {
return this.disposableServer.port();
}
catch (UnsupportedOperationException ex) {
return -1;
}
}
return 0;
}
Expand Down
Expand Up @@ -17,13 +17,19 @@
package org.springframework.boot.web.embedded.netty;

import java.net.ConnectException;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.Arrays;

import io.netty.channel.Channel;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableChannel;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import reactor.test.StepVerifier;

Expand All @@ -34,6 +40,7 @@
import org.springframework.boot.web.server.Ssl;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;

Expand All @@ -52,11 +59,6 @@
*/
class NettyReactiveWebServerFactoryTests extends AbstractReactiveWebServerFactoryTests {

@Override
protected NettyReactiveWebServerFactory getFactory() {
return new NettyReactiveWebServerFactory(0);
}

@Test
void exceptionIsThrownWhenPortIsAlreadyInUse() {
AbstractReactiveWebServerFactory factory = getFactory();
Expand All @@ -68,6 +70,14 @@ void exceptionIsThrownWhenPortIsAlreadyInUse() {
.satisfies(this::portMatchesRequirement).withCauseInstanceOf(Throwable.class);
}

@Test
void getPortWhenDisposableServerPortOperationIsUnsupportedReturnsMinusOne() {
NettyReactiveWebServerFactory factory = new NoPortNettyReactiveWebServerFactory(0);
this.webServer = factory.getWebServer(new EchoHandler());
this.webServer.start();
assertThat(this.webServer.getPort()).isEqualTo(-1);
}

private void portMatchesRequirement(PortInUseException exception) {
assertThat(exception.getPort()).isEqualTo(this.webServer.getPort());
}
Expand Down Expand Up @@ -143,4 +153,102 @@ protected Mono<String> testSslWithAlias(String alias) {
.retrieve().bodyToMono(String.class);
}

@Override
protected NettyReactiveWebServerFactory getFactory() {
return new NettyReactiveWebServerFactory(0);
}

static class NoPortNettyReactiveWebServerFactory extends NettyReactiveWebServerFactory {

NoPortNettyReactiveWebServerFactory(int port) {
super(port);
}

@Override
NettyWebServer createNettyWebServer(HttpServer httpServer, ReactorHttpHandlerAdapter handlerAdapter,
Duration lifecycleTimeout, Shutdown shutdown) {
return new NoPortNettyWebServer(httpServer, handlerAdapter, lifecycleTimeout, shutdown);
}

}

static class NoPortNettyWebServer extends NettyWebServer {

NoPortNettyWebServer(HttpServer httpServer, ReactorHttpHandlerAdapter handlerAdapter, Duration lifecycleTimeout,
Shutdown shutdown) {
super(httpServer, handlerAdapter, lifecycleTimeout, shutdown);
}

@Override
DisposableServer startHttpServer() {
return new NoPortDisposableServer(super.startHttpServer());
}

}

static class NoPortDisposableServer implements DisposableServer {

private final DisposableServer delegate;

NoPortDisposableServer(DisposableServer delegate) {
this.delegate = delegate;
}

@Override
public SocketAddress address() {
return this.delegate.address();
}

@Override
public String host() {
return this.delegate.host();
}

@Override
public String path() {
return this.delegate.path();
}

@Override
public Channel channel() {
return this.delegate.channel();
}

@Override
public void dispose() {
this.delegate.dispose();
}

@Override
public void disposeNow() {
this.delegate.disposeNow();
}

@Override
public void disposeNow(Duration timeout) {
this.delegate.disposeNow(timeout);
}

@Override
public CoreSubscriber<Void> disposeSubscriber() {
return this.delegate.disposeSubscriber();
}

@Override
public boolean isDisposed() {
return this.delegate.isDisposed();
}

@Override
public Mono<Void> onDispose() {
return this.delegate.onDispose();
}

@Override
public DisposableChannel onDispose(Disposable onDispose) {
return this.delegate.onDispose(onDispose);
}

}

}

0 comments on commit 7fd4c53

Please sign in to comment.