Skip to content

Commit

Permalink
Add an example that processes streaming replies in a blocking … (line…
Browse files Browse the repository at this point in the history
…#2068)

…way to help with line#2065
  • Loading branch information
anuraaga authored and unknown committed Oct 15, 2019
1 parent aad32c9 commit f991f15
Showing 1 changed file with 39 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -118,6 +120,43 @@ public void onCompleted() {
await().untilTrue(completed);
}

@Test
public void blockForLotsOfReplies() throws Exception {
final BlockingQueue<HelloReply> replies = new LinkedBlockingQueue<>();
final AtomicBoolean completed = new AtomicBoolean();
helloService.lotsOfReplies(
HelloRequest.newBuilder().setName("Armeria").build(),
new StreamObserver<HelloReply>() {

@Override
public void onNext(HelloReply value) {
replies.offer(value);
}

@Override
public void onError(Throwable t) {
// Should never reach here.
throw new Error(t);
}

@Override
public void onCompleted() {
completed.set(true);
}
});
int sequence = 0;
while (!completed.get() || !replies.isEmpty()) {
final HelloReply value = replies.poll(100, TimeUnit.MILLISECONDS);
if (value == null) {
// Timed out, try again.
continue;
}
assertThat(value.getMessage())
.isEqualTo("Hello, Armeria! (sequence: " + ++sequence + ')');
}
assertThat(sequence).isEqualTo(5);
}

@Test
public void sendLotsOfGreetings() {
final String[] names = { "Armeria", "Grpc", "Streaming" };
Expand Down

0 comments on commit f991f15

Please sign in to comment.