-
Notifications
You must be signed in to change notification settings - Fork 325
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support streaming execution in the HttpClient #1085
Conversation
5b9a923
to
ff758e3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. License header is missing
@@ -0,0 +1,184 @@ | |||
package io.airlift.http.client; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need license header
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my recent airlift PRs @electrum has said we don't want license headers. The old header was Proofpoint and that doesn't apply anymore.
if (isStreaming) { | ||
JettyResponse jettyResponse; | ||
try { | ||
jettyResponse = new JettyResponse(response, listener.getInputStream()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we extract this above the if
statement ? Do we need to wrap between try
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that would help. In the non-streaming case it gets the entire value after allocating the JettyResponse
. The streaming case doesn't do that.
http-client/src/main/java/io/airlift/http/client/JsonStreamingResponseHandler.java
Outdated
Show resolved
Hide resolved
http-client/src/main/java/io/airlift/http/client/JsonStreamingResponseHandler.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@Test(timeOut = 10000) | ||
public void testStreamingBlockingJson() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have an additional test case where the server returns JSONObject and an JSONArray with an invalid pattern and an empty JSONArray
|
||
JsonParser parser; | ||
try { | ||
parser = objectMapper.createParser(new InputStreamReader(response.getInputStream(), StandardCharsets.UTF_8)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't
new InputStreamReader(response.getInputStream(), StandardCharsets.UTF_8)
be moved to previous line using try-with-resources?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No - the need here is that the parser is not closed. The caller has the responsibility of closing.
JsonCodec<Thing> codec = jsonCodec(Thing.class); | ||
|
||
servlet.setResponseBody(json); | ||
servlet.addResponseHeader("Content-Type", "application/json"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if support for application/x-ndjson
is something to consider
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can have a new StreamingResponseHandler
for that, adding it when Airlift's other JSON hander gets support.
ff758e3
to
98fa6c4
Compare
47c8118
to
8352452
Compare
@@ -215,6 +215,11 @@ public byte[] toJsonBytes(T instance) | |||
} | |||
} | |||
|
|||
public ObjectMapper mapper() | |||
{ | |||
return mapper; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mapper is mutable (and even potentially shared io.airlift.json.JsonCodec#OBJECT_MAPPER_SUPPLIER
). This breaks encapsulation. This is a no-go from my perspective.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll find another solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@findepi I've come up with an alternate. PTAL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also return mapper.copy();
would do, right?
8352452
to
b13232d
Compare
b13232d
to
2e768dd
Compare
Add new feature to create a streaming parser over an InputStream from a JsonCodec
2e768dd
to
d3b3b61
Compare
return parser.nextLongValue(defaultValue); | ||
} | ||
|
||
public Boolean nextBooleanValue() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why Boolean
here (not bool
) and int
and long
above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's how it is in JsonParser
(Jackson). But, I'll improve this.
value = responseHandler.handle(request, new JettyStreamingResponse(jettyResponse, span, finalizer)); | ||
} | ||
catch (RuntimeException e) { | ||
throw new StreamingException(e, finalizer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why pass finalizer
through StreamingException
instead calling it immediately here before exception is thrown?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tbh - I don't know what I was thinking. That path is not needed and I've removed and simplified it.
} | ||
|
||
@Override | ||
public void close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: guard from calling more than once.
@@ -158,6 +153,12 @@ | |||
<scope>runtime</scope> | |||
</dependency> | |||
|
|||
<dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merge Tests for streaming execution
into commit which adds support for streaming responses.
d3b3b61
to
1643c30
Compare
Support end-to-end streaming. This is useful for endpoints that return a large list of entities. Instead of needing to queue that entire list in memory in both the server and client the entities are streamed. This has even greater impact when there is a client in the middle: client -> server A -> server B -> back to client. The flow is: - Client sends request to server - Server begins to stream entities - Client reads entities one by one This necessitates giving the client ownership of the internal input stream. Additionally, adds JsonStreamingResponse to make it simpler/convenient to stream lists of JSON entities.
1643c30
to
c2d5ce3
Compare
@Randgalt please rebase |
Closing in favor of #1174 |
Support end-to-end streaming. This is useful for endpoints that return
a large list of entities. Instead of needing to queue that entire list
in memory in both the server and client the entities are streamed.
This has even greater impact when there is a client in the middle:
client -> server A -> server B -> back to client.
The flow is:
This necessitates giving the client ownership of the internal
input stream.
Additionally, adds JsonStreamingResponse to make it simpler/convenient to stream lists
of JSON entities.