Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enabled Automatic Context Propagation and context propagation with lift causes ClassCastException #3762

Closed
kz-dt opened this issue Mar 18, 2024 · 7 comments
Assignees
Labels
area/context This issue is related to the Context type/bug A general bug
Milestone

Comments

@kz-dt
Copy link

kz-dt commented Mar 18, 2024

Expected Behavior

No exceptions are thrown

Actual Behavior

An exception is thrown:

java.lang.ClassCastException: class reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber cannot be cast to class reactor.core.Fuseable$QueueSubscription (reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber and reactor.core.Fuseable$QueueSubscription are in unnamed module of loader 'app')
	at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:264)
	at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocals.java:104)
	at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocals.java:104)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onSubscribe(FluxHide.java:122)
	at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocals.java:104)
	at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocals.java:104)
	at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocals.java:104)
	at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocals.java:104)
	at reactor.core.publisher.FluxRefCount$RefCountInner.setRefCountMonitor(FluxRefCount.java:209)
	at reactor.core.publisher.FluxRefCount.subscribe(FluxRefCount.java:85)
	at reactor.core.publisher.FluxContextWriteRestoringThreadLocals.subscribe(FluxContextWriteRestoringThreadLocals.java:46)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:4578)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4478)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4414)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4361)
	at org.example.ReactorClassCastExceptionReproducerSample.runSample(ReactorClassCastExceptionReproducerSample.java:44)
	at org.example.ReactorClassCastExceptionReproducerSample.main(ReactorClassCastExceptionReproducerSample.java:18)

Steps to Reproduce

Reproducer is here: https://github.com/kz-dt/reactor_class_cast_exception_reproducer

private <T> Function<? super Publisher<T>, ? extends Publisher<T>> tracingLift() {
      return Operators.lift((a, b) -> b);
}
@Test
void reproCase() {
     Hooks.onEachOperator("testTracingLift", tracingLift());
     Hooks.enableAutomaticContextPropagation();
           
      Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
      Flux<String> flux = sink
	      .asFlux()
	      .doOnRequest(v -> System.out.println("OnDoRequest " + v))
	      .doOnTerminate(() -> System.out.println("doOnTerminate"))
	      .doOnCancel(() -> System.out.println("doOnCancel"))
	      .publish()
	      .refCount();
      
      Mono<List<String>> res = flux.map(s -> s + " mapped").collectList();
      res.subscribe(v -> System.out.println("Received a list of mapped strings: " + v));
}

Possible Solution

It looks like Operators.lift returns wrong subscriber type. Calling Hooks.onOperatorDebug(); "fixes" the issue since it adds another wrapper.

Your Environment

reactor-core 3.6.+
micrometer jar should be in classpath

The issue originally was discovered with SpringBoot 3.2+ and couchbase java client 3.4.11, the dependencies were the following:
org.springframework.boot:spring-boot-starter-webflux:3.2.0
org.springframework.boot:spring-boot-starter-data-couchbase-reactive:3.2.0

  • Reactor version(s) used: 3.6.0 - 3.6.4
  • Other relevant libraries versions (eg. netty, ...): micrometer-tracing:1.2.1
  • JVM version (java -version):
    openjdk version "11.0.20.1" 2023-08-24
    OpenJDK Runtime Environment Temurin-11.0.20.1+1 (build 11.0.20.1+1)
    OpenJDK 64-Bit Server VM Temurin-11.0.20.1+1 (build 11.0.20.1+1, mixed mode)
  • OS and version (eg uname -a): MINGW64_NT-10.0-22631 DT-3Q41FY3 3.4.7-25de8b84.x86_64 2023-08-28 21:32 UTC x86_64 Msys
@chemicL chemicL added type/bug A general bug area/context This issue is related to the Context labels Apr 11, 2024
@chemicL chemicL added this to the 3.6.x Backlog milestone Apr 11, 2024
@chemicL
Copy link
Member

chemicL commented Apr 11, 2024

Thank you for the report. I can confirm this is a bug. I will investigate.

@chemicL
Copy link
Member

chemicL commented Apr 15, 2024

As a matter of fact, Automatic Context Propagation is not the root cause here.

Flux#refCount combined with Operators.lift is problematic:
For Subscriber of a Fuseable operator (FluxMapFuseable), it feeds a non-Fuseable Subscription. Namely, ContextWriteRestoringThreadLocalsSubscriber, which comes from lifting (via FluxLift) the FluxRefCount operator.
The solution that FluxLiftFuseable has for this case – using FluxHide#SuppressFuseableSubscriber – does not help.

Here is a more simplified reproducer demonstrating the issue with no automatic context propagation.

	private <T> Function<? super Publisher<T>, ? extends Publisher<T>> tracingLift() {
		return Operators.lift((a, b) -> new FluxMap.MapSubscriber<>(b, i -> i));
	}
	
	@Test
	void reproCase() {
		Hooks.onEachOperator("testTracingLift", tracingLift());

		Flux<Integer> flux = Flux.just(1);

		flux.map(s -> s).blockLast();

		Hooks.resetOnEachOperator();
	}

Let me try to work with that.

@chemicL
Copy link
Member

chemicL commented Apr 16, 2024

I dug a bit deeper. Two things:

  1. The issue mentioned above is expected: wrapping a Fuseable Subscriber with a non-Fuseable one is prohibited. There is no way the internals in reactor-core could help.
  2. The actual problem is that the lift operation wraps FluxRefCount, which is Fuseable with a non-Fuseable FluxContextWriteRestoringThreadLocals that provides a non-Fuseable Subscriber, which also acts as a Subscription to the FluxMapFusable.MapFuseableConditionalSubscriber. That disconnect is undesired.

The reason for it is that FluxRefCount is not properly marked as an internal operator and is wrapped multiple times due to other issues with handling lifting. I will follow-up with a PR.

@chemicL chemicL self-assigned this Apr 16, 2024
@chemicL chemicL modified the milestones: 3.6.x Backlog, 3.6.6 Apr 16, 2024
@chemicL
Copy link
Member

chemicL commented Apr 17, 2024

One more issue is that FluxRefCount, being Fuseable, gets wrapped into a FluxLiftFuseable upon assembly, which upon creation takes the source Publisher, FluxRefCount and applies Flux.from on top of it. Flux.from wraps FluxRefCount with a non-Fuseable FluxContextWriteRestoringThreadLocals which delivers a non-QueueSubscription downstream which is the problem. Furthermore, Flux.from also applies assembly hooks again, which means that FluxContextWriteRestoringThreadLocals is wrapped with a FluxLift (as a non-Fuseable operator). This is a bit of a mess unfortunately. The PR is coming...

@chemicL
Copy link
Member

chemicL commented Apr 17, 2024

I created a PR to address the multitude of problems that led to this report.

@kz-dt I have a question. The answers won't influence the fix that is needed with no doubt. However, I wonder why you combine automatic context propagation with Hooks.onEachOperator. Especially as the reproducer names the lift operation as tracingLift. The intentions behind automatic context propagation were to eliminate the use of onEachOperator and other Hooks for the means of tracing and correlation propagation across Thread boundaries. Combining these two approach can lead to errors as well as performance issues. If you don't mind, please elaborate a bit about your use case. Perhaps we can do something in terms of documentation, design or educational material if other users also have similar patterns.

@kz-dt
Copy link
Author

kz-dt commented Apr 24, 2024

Hi @chemicL,

Thank you for looking into the issue.

A customer uses both automatic context propagation and separate solution which injects java agent and setups Hooks.onEachOperator handler. I.e. Dynatrace OneAgent, OTel instrumentation or DataDog.

@chemicL chemicL closed this as completed in c760a0a May 6, 2024
@chemicL
Copy link
Member

chemicL commented May 6, 2024

@kz-dt thanks for sharing. I think I managed to resolve the possible issues. Please validate with the latest 3.6.6-SNAPSHOT.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/context This issue is related to the Context type/bug A general bug
Projects
None yet
Development

No branches or pull requests

2 participants