Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update project-reactor dependency & fix deprecated calls #816

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static void main(String[] args) throws Exception {
client.publishEvent(
PUBSUB_NAME,
TOPIC_NAME,
message).subscriberContext(getReactorContext()).block();
message).contextWrite(getReactorContext()).block();
System.out.println("Published message: " + message);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public static void main(String[] args) throws Exception {
InvokeMethodRequest sleepRequest = new InvokeMethodRequest(SERVICE_APP_ID, "proxy_sleep")
.setHttpExtension(HttpExtension.POST);
return client.invokeMethod(sleepRequest, TypeRef.get(Void.class));
}).subscriberContext(getReactorContext()).block();
}).contextWrite(getReactorContext()).block();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public Mono<byte[]> echo(
InvokeMethodRequest request = new InvokeMethodRequest(INVOKE_APP_ID, "echo")
.setBody(body)
.setHttpExtension(HttpExtension.POST);
return client.invokeMethod(request, TypeRef.get(byte[].class)).subscriberContext(getReactorContext(context));
return client.invokeMethod(request, TypeRef.get(byte[].class)).contextWrite(getReactorContext(context));
}

/**
Expand All @@ -71,7 +71,7 @@ public Mono<byte[]> echo(
public Mono<Void> sleep(@RequestAttribute(name = "opentelemetry-context") Context context) {
InvokeMethodRequest request = new InvokeMethodRequest(INVOKE_APP_ID, "sleep")
.setHttpExtension(HttpExtension.POST);
return client.invokeMethod(request, TypeRef.get(byte[].class)).subscriberContext(getReactorContext(context)).then();
return client.invokeMethod(request, TypeRef.get(byte[].class)).contextWrite(getReactorContext(context)).then();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ public Mono<byte[]> invoke(String actorType, String actorId, String methodName,
.setMethod(methodName)
.setData(jsonPayload == null ? ByteString.EMPTY : ByteString.copyFrom(jsonPayload))
.build();
return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<DaprProtos.InvokeActorResponse>createMono(
it -> intercept(context, client).invokeActor(req, it)
it -> intercept(Context.of(context), client).invokeActor(req, it)
)
).map(r -> r.getData().toByteArray());
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.3.11.RELEASE</version>
<version>3.5.0</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
Expand Down
55 changes: 29 additions & 26 deletions sdk/src/main/java/io/dapr/client/DaprClientGrpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,10 @@ public Mono<Void> publishEvent(PublishEventRequest request) {
envelopeBuilder.putAllMetadata(metadata);
}

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context ->
this.<Empty>createMono(
it -> intercept(context, asyncStub).publishEvent(envelopeBuilder.build(), it)
it -> intercept(Context.of(context), asyncStub).publishEvent(envelopeBuilder.build(), it)
)
).then();
} catch (Exception ex) {
Expand All @@ -204,9 +204,9 @@ public <T> Mono<T> invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef
// gRPC to gRPC does not handle metadata in Dapr runtime proto.
// gRPC to HTTP does not map correctly in Dapr runtime as per https://github.com/dapr/dapr/issues/2342

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<CommonProtos.InvokeResponse>createMono(
it -> intercept(context, asyncStub).invokeService(envelope, it)
it -> intercept(Context.of(context), asyncStub).invokeService(envelope, it)
)
).flatMap(
it -> {
Expand Down Expand Up @@ -251,9 +251,9 @@ public <T> Mono<T> invokeBinding(InvokeBindingRequest request, TypeRef<T> type)
}
DaprProtos.InvokeBindingRequest envelope = builder.build();

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<DaprProtos.InvokeBindingResponse>createMono(
it -> intercept(context, asyncStub).invokeBinding(envelope, it)
it -> intercept(Context.of(context), asyncStub).invokeBinding(envelope, it)
)
).flatMap(
it -> {
Expand Down Expand Up @@ -298,10 +298,10 @@ public <T> Mono<State<T>> getState(GetStateRequest request, TypeRef<T> type) {

DaprProtos.GetStateRequest envelope = builder.build();

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context ->
this.<DaprProtos.GetStateResponse>createMono(
it -> intercept(context, asyncStub).getState(envelope, it)
it -> intercept(Context.of(context), asyncStub).getState(envelope, it)
)
).map(
it -> {
Expand Down Expand Up @@ -347,8 +347,9 @@ public <T> Mono<List<State<T>>> getBulkState(GetBulkStateRequest request, TypeRe

DaprProtos.GetBulkStateRequest envelope = builder.build();

return Mono.subscriberContext().flatMap(
context -> this.<DaprProtos.GetBulkStateResponse>createMono(it -> intercept(context, asyncStub)
return Mono.deferContextual(
context -> this.<DaprProtos.GetBulkStateResponse>createMono(it ->
intercept(Context.of(context), asyncStub)
.getBulkState(envelope, it)
)
).map(
Expand Down Expand Up @@ -431,8 +432,9 @@ public Mono<Void> executeStateTransaction(ExecuteStateTransactionRequest request
}
DaprProtos.ExecuteStateTransactionRequest req = builder.build();

return Mono.subscriberContext().flatMap(
context -> this.<Empty>createMono(it -> intercept(context, asyncStub).executeStateTransaction(req, it))
return Mono.deferContextual(
context -> this.<Empty>createMono(it ->
intercept(Context.of(context), asyncStub).executeStateTransaction(req, it))
).then();
} catch (Exception e) {
return DaprException.wrapMono(e);
Expand All @@ -457,8 +459,8 @@ public Mono<Void> saveBulkState(SaveStateRequest request) {
}
DaprProtos.SaveStateRequest req = builder.build();

return Mono.subscriberContext().flatMap(
context -> this.<Empty>createMono(it -> intercept(context, asyncStub).saveState(req, it))
return Mono.deferContextual(
context -> this.<Empty>createMono(it -> intercept(Context.of(context), asyncStub).saveState(req, it))
).then();
} catch (Exception ex) {
return DaprException.wrapMono(ex);
Expand Down Expand Up @@ -541,8 +543,8 @@ public Mono<Void> deleteState(DeleteStateRequest request) {

DaprProtos.DeleteStateRequest req = builder.build();

return Mono.subscriberContext().flatMap(
context -> this.<Empty>createMono(it -> intercept(context, asyncStub).deleteState(req, it))
return Mono.deferContextual(
context -> this.<Empty>createMono(it -> intercept(Context.of(context), asyncStub).deleteState(req, it))
).then();
} catch (Exception ex) {
return DaprException.wrapMono(ex);
Expand Down Expand Up @@ -619,8 +621,9 @@ public Mono<Map<String, String>> getSecret(GetSecretRequest request) {
}
DaprProtos.GetSecretRequest req = requestBuilder.build();

return Mono.subscriberContext().flatMap(
context -> this.<DaprProtos.GetSecretResponse>createMono(it -> intercept(context, asyncStub).getSecret(req, it))
return Mono.deferContextual(
context -> this.<DaprProtos.GetSecretResponse>createMono(it ->
intercept(Context.of(context), asyncStub).getSecret(req, it))
).map(DaprProtos.GetSecretResponse::getDataMap);
}

Expand All @@ -644,10 +647,10 @@ public Mono<Map<String, Map<String, String>>> getBulkSecret(GetBulkSecretRequest

DaprProtos.GetBulkSecretRequest envelope = builder.build();

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context ->
this.<DaprProtos.GetBulkSecretResponse>createMono(
it -> intercept(context, asyncStub).getBulkSecret(envelope, it)
it -> intercept(Context.of(context), asyncStub).getBulkSecret(envelope, it)
)
).map(it -> {
Map<String, DaprProtos.SecretResponse> secretsMap = it.getDataMap();
Expand Down Expand Up @@ -697,9 +700,9 @@ public <T> Mono<QueryStateResponse<T>> queryState(QueryStateRequest request, Typ

DaprProtos.QueryStateRequest envelope = builder.build();

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<DaprProtos.QueryStateResponse>createMono(
it -> intercept(context, asyncStub).queryStateAlpha1(envelope, it)
it -> intercept(Context.of(context), asyncStub).queryStateAlpha1(envelope, it)
)
).map(
it -> {
Expand Down Expand Up @@ -761,9 +764,9 @@ public void close() throws Exception {
*/
@Override
public Mono<Void> shutdown() {
return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<Empty>createMono(
it -> intercept(context, asyncStub).shutdown(Empty.getDefaultInstance(), it))
it -> intercept(Context.of(context), asyncStub).shutdown(Empty.getDefaultInstance(), it))
).then();
}

Expand Down Expand Up @@ -795,10 +798,10 @@ public Mono<Map<String, ConfigurationItem>> getConfiguration(GetConfigurationReq
}

private Mono<Map<String, ConfigurationItem>> getConfigurationAlpha1(DaprProtos.GetConfigurationRequest envelope) {
return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context ->
this.<DaprProtos.GetConfigurationResponse>createMono(
it -> intercept(context, asyncStub).getConfigurationAlpha1(envelope, it)
it -> intercept(Context.of(context), asyncStub).getConfigurationAlpha1(envelope, it)
)
).map(
it -> {
Expand Down