Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

InstrumentedEE10Handler not recording metrics on Jetty 12 with Jersey CompletableFuture #3917

Open
mihalyr opened this issue Jan 20, 2024 · 13 comments · May be fixed by #3928
Open

InstrumentedEE10Handler not recording metrics on Jetty 12 with Jersey CompletableFuture #3917

mihalyr opened this issue Jan 20, 2024 · 13 comments · May be fixed by #3928
Labels

Comments

@mihalyr
Copy link

mihalyr commented Jan 20, 2024

Hi,

I upgraded from Jetty 11 to 12 and now I have to use InstrumentedEE10Handler for recording request metrics. I'm running into a problem with this setup where the active request start is recorded, but the request finish is never and thus most metrics aren't being collected.

Could you please take a look at the below minimal reproducer to check if I'm doing something wrong? Please note that this used to work with Jetty 11 before with InstrumentedHttpChannelListener, but that class is not available anymore for Jetty 12 and I have to use InstrumentedEE10Handler which I cannot figure out how to configure properly.

Please consider this minimal reproducer with two classes, a Jersey resource and a JUnit 5 test class (I am using JDK21 locally on Linux - Fedora 38):

import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import java.util.concurrent.CompletableFuture;

@Path("")
public class PingResource {
  @GET
  public CompletableFuture<String> ping() {
    return CompletableFuture.completedFuture("pong");
  }
}
import static com.codahale.metrics.MetricRegistry.name;
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import io.dropwizard.metrics.jetty12.ee10.InstrumentedEE10Handler;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import org.eclipse.jetty.ee10.servlet.ServletContextHandler;
import org.eclipse.jetty.ee10.servlet.ServletHandler;
import org.eclipse.jetty.server.Server;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.servlet.ServletContainer;
import org.junit.jupiter.api.Test;

class Jetty12DropwizardTest {
  @Test
  void test() throws Exception {
    // Jetty Server on random free port
    var server = new Server(0);

    // Setup Jersey with our simple async resource returning CompletableFuture
    var jerseyConfig = new ResourceConfig()
        .registerClasses(PingResource.class);
    var jersey = new ServletContainer(jerseyConfig);

    // Configure the handler
    var contextHandler = new ServletContextHandler();
    contextHandler.setContextPath("/");
    contextHandler.addServlet(jersey, "/*");

    // Add handler instrumentation
    var metrics = new MetricRegistry();
    var instrumentedHandler = new InstrumentedEE10Handler(metrics);
    contextHandler.insertHandler(instrumentedHandler);

    // Tell the server to use our handler and start it
    server.setHandler(contextHandler);
    server.start();
    try (var client = HttpClient.newHttpClient()) {
      // Ping the server and wait for the response
      var response = client.send(
          HttpRequest.newBuilder().uri(server.getURI()).GET().build(),
          HttpResponse.BodyHandlers.ofString());
      assertEquals(200, response.statusCode(), "response code");
      assertEquals("pong", response.body(), "response body");

      // Print metric counts
      metrics.getMetrics().forEach((name, metric) -> System.out.println(name + ": " + printCount(metric)));

      // No active requests after the request succeeded
      var activeRequestsCounter = metrics.counter(name(ServletHandler.class, "active-requests"));
      assertEquals(0, activeRequestsCounter.getCount(), "active requests");

      // request recorded
      var requestsTimer = metrics.timer(name(ServletHandler.class, "requests"));
      assertEquals(1, requestsTimer.getCount(), "requests");

      // 200 response recorded
      var response200Meter = metrics.meter(name(ServletHandler.class, "2xx-responses"));
      assertEquals(1, response200Meter.getCount(), "2xx responses");
    } finally {
      server.stop();
    }
  }

  String printCount(Metric metric) {
    return switch (metric) {
      case Counter c -> "count=" + c.getCount();
      case Meter m -> "meter=" + m.getCount();
      case Timer t -> "timer=" + t.getCount();
      case Histogram h -> "histogram=" + h.getCount();
      case Gauge<?> g -> "gauge=" + g.getValue();
      default -> metric.toString();
    };
  }
}

I expect to see 1xx-responses and all the other stats like requests recorded just like with InstrumentedHttpChannelListener on Jetty 11, but metrics are not being recorded except for an always increasing active-requests metric, which should be 0 in case there are no active requests.

From a brief debugging session I can see that AbstractInstrumentedHandler#updateResponses, the method responsible for collecting metrics after each request, is never called. In InstrumentedEE10Handler there are two places that invoke this method. One is after a synchronous request, which is invoked on state.isInitial() but this is not called, because the state is completed at that point. The another one is from the InstrumentedAsyncListener#onComplete callback, which is again not invoked for some reason.

I'm not sure if I am missing something or the new Dropwizard instrumentation is just not covering this case of async processing with CompletableFutures returned from Jersey resources.

@joschi
Copy link
Member

joschi commented Jan 21, 2024

@zUniQueX Would you have a chance to look at this, please? 😃

@zUniQueX
Copy link
Member

@mihalyr Thanks for reporting this! I have a workaround for this problem. Fixing it properly will probably take some time.

As you've correctly pointed out, the finally block would reset the state (e.g. active requests), if the request won't be handled by the current handler chain. The 'real' response update is performed by the asynchronous listener.

Prior to Jetty 12, Jersey has put a request into asynchronous processing e.g. in the JettyHttpContainer. But the servlet classes aren't available in this container in 3.1.x (Jetty 12) anymore. So the logic for asynchronous processing was removed there. I'm not sure how the normal ServletContainer handles this, but while debugging I haven't seen that the request was updated for asynchronous processing.

If the request isn't processed asynchronously, the async listeners won't be invoked by Jetty. So our InstrumentedAsyncListener will not be able to set the metrics because it simply isn't called.

To work around this problem, one can manually set a request to asynchronous processing. This will allow the async listener to be called and the test works fine. Just add the following code to your definition of ServletContextHandler:

contextHandler.addFilter((request, response, chain) -> {
    AsyncContext asyncContext = request.startAsync();
    chain.doFilter(request, response);
    asyncContext.complete();
}, "/*", EnumSet.allOf(DispatcherType.class));

@zUniQueX zUniQueX added the bug label Jan 22, 2024
@zUniQueX
Copy link
Member

@mihalyr @joschi After further investigation I've pushed a first fix for this problem: zUniQueX@4548985.

I'm not happy with this yet and will do some additional tests over the next days. Especially the statement Callback.from(metricUpdater, callback) isn't very good. This will prevent sending the response before the metrics are collected. When changing this to the better Callback.from(callback, metricUpdater) the test case isn't stable because the metrics are just being collected when the response gets processed. Additionally, I'm not sure, if Jetty will resume suspended requests on handler level. This will make the async flag redundant.

What do you think about the time of metrics collection? Should we focus on fast response processing or on accurate metrics?

@mihalyr
Copy link
Author

mihalyr commented Jan 23, 2024

Let me add a bit of more detail, which I think makes a difference here.

This problem seems to only affect the case when the Jersey resource returns an already completed future like in my example in the issue description:

  @GET
  public CompletableFuture<String> ping() {
    return CompletableFuture.completedFuture("pong");
  }

However, if I change this to a future that will complete with a delay - like in most real world scenarios, the metrics seem to be working:

  @GET
  public CompletableFuture<String> ping() {
    var future = new CompletableFuture<String>();
    CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS).execute(() -> future.complete("pong"));
    return future;
  }

I still don't think this is sufficient, because you can return a completed future from the method in case the async processing already completes or is not even necessary. The metric library should handle both cases, IMO.

@mihalyr
Copy link
Author

mihalyr commented Jan 23, 2024

To work around this problem, one can manually set a request to asynchronous processing.

@zUniQueX unfortunately the suggested workaround with the filter only works if the future is already completed, but breaks async processing in case the future will complete with a delay.

@mihalyr
Copy link
Author

mihalyr commented Jan 24, 2024

@zUniQueX I quickly tried the fix from your branch and it seems to be working in both cases - completed and delayed. However, I am not that familiar with the implementation in detail. I agree that running the metric updater callback after the request handling would be a better choice as you suggested. I have some concerns regarding the async flag and race conditions, because once the request is being handled from one thread you check isSuspended then set async.set(true) and another concurrent thread might call the metric updater callback any time between to read the async flag, which might happen before you set it to true - is this a valid concern?

@mihalyr
Copy link
Author

mihalyr commented Jan 24, 2024

I'm afraid my concern was valid about the race condition, I slightly modified my test to be able to run repeatedly and used a resource method that completes the response future with 1ms delay, then ran it 100k times and got 32 failures due to not recorded metrics. This sounds small, but it depends on the test setup, the point is that there is a race condition which makes the metric collection a bit flaky.

@zUniQueX
Copy link
Member

@mihalyr After diving into the Jetty implementation the last few days, I'm now pretty sure we cannot provide accurate data for the activeSuspended and the asyncDispatches metrics.

As you've correctly pointed out, the async flag might cause race conditions. Since processing of the request is asynchronous, the request can already be completed when the call to handle(Request, Response, Callback) returns. So setting the flag in the finally block may have no effect and updating activeSuspended would be wrong. Additionally, a handler only gets called once. So the current check for the initial processing can be removed from the AbstractInstrumentedHandler and the else block with the check for the suspension state is redundant.

The only way to get correct data would be to track the _state and _requestState variables of the ServletChannelState. Those variables are updated without notifying any listener, hence we cannot track the asynchronous states.

Having that said, I'd like to completely remove the aforementioned metrics from the handler. When including the other proposed change with updating the other metrics after the Callback completion, all metrics should contain accurate values.

@joschi When integrating this change, we wouldn't need any servlet related classes in the handler anymore and could remove the metrics-jetty12-ee10 module. That's a very intrusive change, but I also don't want to have a broken implementation as the last stable release. Since the InstrumentedEE10Handler apparently hasn't worked correctly at any time, I'd propose a fix for the next patch release if you're fine with that.

@zUniQueX zUniQueX linked a pull request Jan 30, 2024 that will close this issue
@zUniQueX
Copy link
Member

@mihalyr I've pushed a fix for this one in #3928. If you have additional tests for this issue, I'd appreciate feedback on my current solution.

@mihalyr
Copy link
Author

mihalyr commented Mar 12, 2024

Hi @zUniQueX, sorry for the late response here, I got a bit busy with work and other things.

I ran my tests and it shows me 78 failed out of 100k runs (it takes about 2 and a half mins on my laptop), which is not that bad, but this is only a synthetic case, not sure how would it behave in the wild.

I was also thinking if I'm doing something wrong in my test case here. Could you please take a look?

import static com.codahale.metrics.MetricRegistry.name;
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import io.dropwizard.metrics.jetty12.ee10.InstrumentedEE10Handler;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.ee10.servlet.ServletContextHandler;
import org.eclipse.jetty.ee10.servlet.ServletHandler;
import org.eclipse.jetty.server.Server;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.servlet.ServletContainer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;

class Jetty12DropwizardTest {

  static Server server;
  static MetricRegistry metrics;
  static HttpClient client;
  static AtomicInteger counter = new AtomicInteger();

  @BeforeAll
  static void beforeAll() throws Exception {
    // Jetty Server on random free port
    server = new Server(0);

    // Setup Jersey with our simple async resource returning CompletableFuture
    var jerseyConfig = new ResourceConfig()
        .registerClasses(PingResource.class);
    var jersey = new ServletContainer(jerseyConfig);

    // Configure the handler
    var contextHandler = new ServletContextHandler();
    contextHandler.setContextPath("/");
    contextHandler.addServlet(jersey, "/*");

    // Add handler instrumentation
    metrics = new MetricRegistry();
    var instrumentedHandler = new InstrumentedEE10Handler(metrics);
    contextHandler.insertHandler(instrumentedHandler);

    // Tell the server to use our handler and start it
    server.setHandler(contextHandler);
    server.start();

    client = HttpClient.newHttpClient();
  }

  @AfterAll
  static void afterAll() throws Exception {
    client.close();
    server.stop();
  }

  @RepeatedTest(100_000)
  void test() throws Exception {
    counter.incrementAndGet();

    // Ping the server and wait for the response
    var response = client.send(
        HttpRequest.newBuilder().uri(server.getURI()).GET().build(),
        HttpResponse.BodyHandlers.ofString());
    assertEquals(200, response.statusCode(), "response code");
    assertEquals("pong", response.body(), "response body");

    // Print metric counts
    // metrics.getMetrics().forEach((name, metric) -> System.out.println(name + ": " + printCount(metric)));

    // No active requests after the request succeeded
    var activeRequestsCounter = metrics.counter(name(ServletHandler.class, "active-requests"));
    assertEquals(0, activeRequestsCounter.getCount(), "active requests");

    // request recorded
    var requestsTimer = metrics.timer(name(ServletHandler.class, "requests"));
    assertEquals(counter.get(), requestsTimer.getCount(), "requests");

    // 200 response recorded
    var response200Meter = metrics.meter(name(ServletHandler.class, "2xx-responses"));
    assertEquals(counter.get(), response200Meter.getCount(), "2xx responses");
  }

  String printCount(Metric metric) {
    return switch (metric) {
      case Counter c -> "count=" + c.getCount();
      case Meter m -> "meter=" + m.getCount();
      case Timer t -> "timer=" + t.getCount();
      case Histogram h -> "histogram=" + h.getCount();
      case Gauge<?> g -> "gauge=" + g.getValue();
      default -> metric.toString();
    };
  }
}

And the resource is the same as before:

import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

@Path("")
public class PingResource {
  @GET
  public CompletableFuture<String> ping() {
    // return CompletableFuture.completedFuture("pong");
    // return CompletableFuture.supplyAsync(() -> "pong");
    var future = new CompletableFuture<String>();
    CompletableFuture.delayedExecutor(1, TimeUnit.MILLISECONDS).execute(() -> future.complete("pong"));
    return future;
  }
}

@mihalyr
Copy link
Author

mihalyr commented Mar 12, 2024

One thing I can think of with regards to my test case is that if the metricUpdater runs on a different thread in the background, then it could be possible to return from a request even before the callback is finished. If this is the case, my tests are not testing the right thing.

@mihalyr
Copy link
Author

mihalyr commented Mar 12, 2024

I think, the problem was indeed with my test, it seems the metric updater callback now completes sometime after the request is processed, which can be after the client has received it. I tried different tests where I just submit everything and then check the totals at the end instead of each metric like this:

  @Test
  void testAggregate() throws Exception {
    final int iterations = 100_000;
    for (int i = 0; i < iterations; i++) {
      // Ping the server and wait for the response
      var response = client.send(
          HttpRequest.newBuilder().uri(server.getURI()).GET().build(),
          HttpResponse.BodyHandlers.ofString());
      assertEquals(200, response.statusCode(), "response code");
      assertEquals("pong", response.body(), "response body");
    }

    // Give plenty of time for any background thread to finish
    Thread.sleep(Duration.ofSeconds(5));

    // Print metric counts
    metrics.getMetrics().forEach((name, metric) -> System.out.println(name + ": " + printCount(metric)));

    // No active requests after the request succeeded
    var activeRequestsCounter = metrics.counter(name(ServletHandler.class, "active-requests"));
    assertEquals(0, activeRequestsCounter.getCount(), "active requests");

    // request recorded
    var requestsTimer = metrics.timer(name(ServletHandler.class, "requests"));
    assertEquals(iterations, requestsTimer.getCount(), "requests");

    // 200 response recorded
    var response200Meter = metrics.meter(name(ServletHandler.class, "2xx-responses"));
    assertEquals(iterations, response200Meter.getCount(), "2xx responses");
  }

This is working fine. I was also curious about the concurrent case, so modified the test a little bit to this:

  @Test
  void testAggregateAsync() throws Exception {
    final int iterations = 100_000;
    final int concurrency = Runtime.getRuntime().availableProcessors();

    var futures = new ArrayList<Future<?>>(iterations);
    try (var executor = Executors.newFixedThreadPool(concurrency)) {
      for (int i = 0; i < iterations; i++) {
        final int iteration = i;
        var future = executor.submit(() -> {
          try {
            var response = client.send(
                HttpRequest.newBuilder().uri(server.getURI()).GET().build(),
                HttpResponse.BodyHandlers.ofString());
            assertEquals(200, response.statusCode(), "response code");
            assertEquals("pong", response.body(), "response body");
          } catch (InterruptedException e) {
            System.err.println("Interrupted request " + iteration + ": " + e);
            Thread.currentThread().interrupt();
          } catch (IOException e) {
            System.err.println("Failed request " + iteration + ": " + e);
            throw new RuntimeException(e);
          } finally {
            if (iteration % 1_000 == 0) {
              System.out.println("Completed " + iteration + " requests");
            }
          }
        });
        futures.add(future);
      }
    }

    // Ensure responses are complete and successful
    for (var future : futures) {
      future.get();
    }
    System.out.println("All responses received successfully");


    // Give plenty of time for any background thread to finish
    Thread.sleep(Duration.ofSeconds(5));

    // Print metric counts
    metrics.getMetrics().forEach((name, metric) -> System.out.println(name + ": " + printCount(metric)));

    // No active requests after the request succeeded
    var activeRequestsCounter = metrics.counter(name(ServletHandler.class, "active-requests"));
    assertEquals(0, activeRequestsCounter.getCount(), "active requests");

    // request recorded
    var requestsTimer = metrics.timer(name(ServletHandler.class, "requests"));
    assertEquals(iterations, requestsTimer.getCount(), "requests");

    // 200 response recorded
    var response200Meter = metrics.meter(name(ServletHandler.class, "2xx-responses"));
    assertEquals(iterations, response200Meter.getCount(), "2xx responses");
  }

And this also completes successfully - took only 20 secs on my laptop.

@mihalyr
Copy link
Author

mihalyr commented Mar 12, 2024

@zUniQueX From my POV your patch works for the case I had problems with originally, thanks a lot for your efforts 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants