Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support streaming execution in the HttpClient #1085

Closed

Conversation

Randgalt
Copy link
Contributor

Support end-to-end streaming. This is useful for endpoints that return
a large list of entities. Instead of needing to queue that entire list
in memory in both the server and client the entities are streamed.
This has even greater impact when there is a client in the middle:
client -> server A -> server B -> back to client.

The flow is:

  • Client sends request to server
  • Server begins to stream entities
  • Client reads entities one by one

This necessitates giving the client ownership of the internal
input stream.

Additionally, adds JsonStreamingResponse to make it simpler/convenient to stream lists
of JSON entities.

@Randgalt
Copy link
Contributor Author

cc @electrum @findepi @losipiuk

@Randgalt Randgalt force-pushed the jordanz/http-client-streaming branch 3 times, most recently from 5b9a923 to ff758e3 Compare July 25, 2023 12:43
Copy link
Contributor

@Praveen2112 Praveen2112 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. License header is missing

@@ -0,0 +1,184 @@
package io.airlift.http.client;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need license header

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my recent airlift PRs @electrum has said we don't want license headers. The old header was Proofpoint and that doesn't apply anymore.

if (isStreaming) {
JettyResponse jettyResponse;
try {
jettyResponse = new JettyResponse(response, listener.getInputStream());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we extract this above the if statement ? Do we need to wrap between try

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that would help. In the non-streaming case it gets the entire value after allocating the JettyResponse. The streaming case doesn't do that.

}

@Test(timeOut = 10000)
public void testStreamingBlockingJson()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have an additional test case where the server returns JSONObject and an JSONArray with an invalid pattern and an empty JSONArray


JsonParser parser;
try {
parser = objectMapper.createParser(new InputStreamReader(response.getInputStream(), StandardCharsets.UTF_8));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't

new InputStreamReader(response.getInputStream(), StandardCharsets.UTF_8)

be moved to previous line using try-with-resources?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No - the need here is that the parser is not closed. The caller has the responsibility of closing.

JsonCodec<Thing> codec = jsonCodec(Thing.class);

servlet.setResponseBody(json);
servlet.addResponseHeader("Content-Type", "application/json");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if support for application/x-ndjson is something to consider

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can have a new StreamingResponseHandler for that, adding it when Airlift's other JSON hander gets support.

@Randgalt Randgalt force-pushed the jordanz/http-client-streaming branch from ff758e3 to 98fa6c4 Compare July 28, 2023 08:38
@Randgalt Randgalt force-pushed the jordanz/http-client-streaming branch 2 times, most recently from 47c8118 to 8352452 Compare July 28, 2023 09:06
@@ -215,6 +215,11 @@ public byte[] toJsonBytes(T instance)
}
}

public ObjectMapper mapper()
{
return mapper;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mapper is mutable (and even potentially shared io.airlift.json.JsonCodec#OBJECT_MAPPER_SUPPLIER). This breaks encapsulation. This is a no-go from my perspective.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll find another solution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findepi I've come up with an alternate. PTAL.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also return mapper.copy(); would do, right?

@Randgalt Randgalt force-pushed the jordanz/http-client-streaming branch from 8352452 to b13232d Compare July 31, 2023 09:09
@Randgalt Randgalt requested a review from findepi July 31, 2023 09:09
@Randgalt Randgalt force-pushed the jordanz/http-client-streaming branch from b13232d to 2e768dd Compare July 31, 2023 09:11
Add new feature to create a streaming parser over an InputStream
from a JsonCodec
@Randgalt Randgalt force-pushed the jordanz/http-client-streaming branch from 2e768dd to d3b3b61 Compare July 31, 2023 09:13
return parser.nextLongValue(defaultValue);
}

public Boolean nextBooleanValue()
Copy link
Contributor

@losipiuk losipiuk Aug 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why Boolean here (not bool) and int and long above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's how it is in JsonParser (Jackson). But, I'll improve this.

value = responseHandler.handle(request, new JettyStreamingResponse(jettyResponse, span, finalizer));
}
catch (RuntimeException e) {
throw new StreamingException(e, finalizer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why pass finalizer through StreamingException instead calling it immediately here before exception is thrown?

Copy link
Contributor Author

@Randgalt Randgalt Aug 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tbh - I don't know what I was thinking. That path is not needed and I've removed and simplified it.

}

@Override
public void close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: guard from calling more than once.

@@ -158,6 +153,12 @@
<scope>runtime</scope>
</dependency>

<dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merge Tests for streaming execution into commit which adds support for streaming responses.

@Randgalt Randgalt force-pushed the jordanz/http-client-streaming branch from d3b3b61 to 1643c30 Compare August 17, 2023 05:26
Support end-to-end streaming. This is useful for endpoints that return
a large list of entities. Instead of needing to queue that entire list
in memory in both the server and client the entities are streamed.
This has even greater impact when there is a client in the middle:
client -> server A -> server B -> back to client.

The flow is:

- Client sends request to server
- Server begins to stream entities
- Client reads entities one by one

This necessitates giving the client ownership of the internal
input stream.

Additionally, adds JsonStreamingResponse to make it simpler/convenient to stream lists
of JSON entities.
@Randgalt Randgalt force-pushed the jordanz/http-client-streaming branch from 1643c30 to c2d5ce3 Compare August 17, 2023 05:33
@wendigo
Copy link
Collaborator

wendigo commented Mar 9, 2024

@Randgalt please rebase

@Randgalt
Copy link
Contributor Author

Closing in favor of #1174

@Randgalt Randgalt closed this May 26, 2024
@Randgalt Randgalt deleted the jordanz/http-client-streaming branch May 26, 2024 13:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants