Skip to content

Commit

Permalink
Simplified flaky test.
Browse files Browse the repository at this point in the history
Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed May 2, 2024
1 parent c6f0fb6 commit 1492a14
Showing 1 changed file with 47 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.UnaryOperator;

import jakarta.servlet.AsyncContext;
import jakarta.servlet.AsyncEvent;
Expand All @@ -41,14 +39,13 @@
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpTester;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -63,6 +60,7 @@
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand All @@ -76,16 +74,14 @@ public class AsyncServletIOTest
protected AsyncIOServlet4 _servlet4 = new AsyncIOServlet4();
protected StolenAsyncReadServlet _servletStolenAsyncRead = new StolenAsyncReadServlet();
protected int _port;
protected WrappingQTP _wQTP;
protected Server _server;
protected ServletHandler _servletHandler;
protected ServerConnector _connector;

@BeforeEach
public void setUp() throws Exception
{
_wQTP = new WrappingQTP();
_server = new Server(_wQTP);
_server = new Server();

HttpConfiguration httpConfig = new HttpConfiguration();
httpConfig.setOutputBufferSize(4096);
Expand Down Expand Up @@ -794,125 +790,78 @@ public void onAllDataRead() throws IOException
@Test
public void testStolenAsyncRead() throws Exception
{
StringBuilder request = new StringBuilder(512);
request.append("POST /ctx/stolen/info HTTP/1.1\r\n")
.append("Host: localhost\r\n")
.append("Content-Type: text/plain\r\n")
.append("Content-Length: 2\r\n")
.append("\r\n")
.append("1");
int port = _port;
try (Socket socket = new Socket("localhost", port))
String request = """
POST /ctx/stolen/info HTTP/1.1
Host: localhost
Content-Type: text/plain
Content-Length: 2
1""";

try (Socket socket = new Socket("localhost", _port))
{
socket.setSoTimeout(10000);
OutputStream out = socket.getOutputStream();
out.write(request.toString().getBytes(ISO_8859_1));
out.write(request.getBytes(ISO_8859_1));
out.flush();

// wait until server is ready
_servletStolenAsyncRead.ready.await();
final CountDownLatch wait = new CountDownLatch(1);
final CountDownLatch held = new CountDownLatch(1);
// Stop any dispatches until we want them

UnaryOperator<Runnable> old = _wQTP.wrapper.getAndSet(r ->
() ->
{
try
{
held.countDown();
wait.await();
r.run();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
);

// We are an unrelated thread, let's mess with the input stream
ServletInputStream sin = _servletStolenAsyncRead.listener.in;
sin.setReadListener(_servletStolenAsyncRead.listener);

// thread should be dispatched to handle, but held by our wQTP wait.
assertTrue(held.await(10, TimeUnit.SECONDS));

// Let's steal our read
assertTrue(sin.isReady());
assertThat(sin.read(), Matchers.is((int)'1'));
assertFalse(sin.isReady());

// let the ODA call go
_wQTP.wrapper.set(old);
wait.countDown();

// ODA should not be called
// Because the read was stolen, onDataAvailable() is not called.
// The wait guarantees that the Servlet thread is out of doPost().
assertFalse(_servletStolenAsyncRead.oda.await(500, TimeUnit.MILLISECONDS));

// Send some more data
out.write((int)'2');
// Send some more data.
out.write('2');
out.flush();

// ODA should now be called!!
// onDataAvailable() should now be called.
assertTrue(_servletStolenAsyncRead.oda.await(500, TimeUnit.MILLISECONDS));

// We can not read some more
assertTrue(sin.isReady());
assertThat(sin.read(), Matchers.is((int)'2'));
ServletInputStream in = _servletStolenAsyncRead.listener.in;

// read EOF
assertTrue(sin.isReady());
assertThat(sin.read(), Matchers.is(-1));
// We can now read some more.
assertTrue(in.isReady());
assertEquals('2', in.read());

BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
// All content has been sent, must read EOF.
assertTrue(in.isReady());
assertEquals(-1, in.read());

// response line
String line = in.readLine();
LOG.debug("response-line: " + line);
assertThat(line, startsWith("HTTP/1.1 200 OK"));

// Skip headers
while (line != null)
{
line = in.readLine();
LOG.debug("header-line: " + line);
if (line.length() == 0)
break;
}
HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream());
assertNotNull(response);
assertEquals(200, response.getStatus());
}

assertTrue(_servletStolenAsyncRead.completed.await(5, TimeUnit.SECONDS));
}

@SuppressWarnings("serial")
public class StolenAsyncReadServlet extends HttpServlet
public static class StolenAsyncReadServlet extends HttpServlet
{
public CountDownLatch ready = new CountDownLatch(1);
public CountDownLatch oda = new CountDownLatch(1);
public CountDownLatch completed = new CountDownLatch(1);
public volatile StealingListener listener;
private final CountDownLatch oda = new CountDownLatch(1);
private volatile StealingListener listener;

@Override
public void doPost(final HttpServletRequest request, final HttpServletResponse response) throws IOException
{
listener = new StealingListener(request);
ready.countDown();

// Steal the read.
assertEquals('1', listener.in.read());

// Make sure the ReadListener is called when more content is available.
assertFalse(listener.in.isReady());

// Exit from doPost() so that ReadListener methods can now be invoked.
}

public class StealingListener implements ReadListener, AsyncListener
public class StealingListener implements ReadListener
{
final HttpServletRequest request;
final ServletInputStream in;
final AsyncContext asyncContext;
private final ServletInputStream in;
private final AsyncContext asyncContext;

StealingListener(HttpServletRequest request) throws IOException
public StealingListener(HttpServletRequest request) throws IOException
{
asyncContext = request.startAsync();
asyncContext.setTimeout(10000L);
asyncContext.addListener(this);
this.request = request;
asyncContext.setTimeout(0);
in = request.getInputStream();
in.setReadListener(this);
}

@Override
Expand All @@ -922,51 +871,17 @@ public void onDataAvailable()
}

@Override
public void onAllDataRead() throws IOException
public void onAllDataRead()
{
asyncContext.complete();
}

@Override
public void onError(final Throwable t)
public void onError(Throwable t)
{
t.printStackTrace();
asyncContext.complete();
}

@Override
public void onComplete(final AsyncEvent event)
{
completed.countDown();
}

@Override
public void onTimeout(final AsyncEvent event)
{
asyncContext.complete();
}

@Override
public void onError(final AsyncEvent event)
{
asyncContext.complete();
}

@Override
public void onStartAsync(AsyncEvent event)
{
}
}
}

private class WrappingQTP extends QueuedThreadPool
{
AtomicReference<UnaryOperator<Runnable>> wrapper = new AtomicReference<>(UnaryOperator.identity());

@Override
public void execute(Runnable job)
{
super.execute(wrapper.get().apply(job));
}
}
}

0 comments on commit 1492a14

Please sign in to comment.