Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

在Reactor Webclient的场景下,使用TTL MDC不生效 #5

Open
naturallight opened this issue Jul 29, 2019 · 5 comments
Open

在Reactor Webclient的场景下,使用TTL MDC不生效 #5

naturallight opened this issue Jul 29, 2019 · 5 comments

Comments

@naturallight
Copy link

naturallight commented Jul 29, 2019

样例代码如下:

  1. 第一次设置requestid为1, 通过webclient发送一次请求,全部符合预期
  2. 第二次设置requestid为2, 通过webclient再发送一次请求,日志中出现的reqeustid仍然为1
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.slf4j.MDC;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.util.concurrent.CountDownLatch;

@Slf4j
public class Log4j2Test {
    final private String REQUESTID="request-id";

    @Test
    public void testLogWithWebClient() throws Exception{
        CountDownLatch countDownLatch = new CountDownLatch(1);
        MDC.put(REQUESTID, "1");
        WebClient webClient = WebClient.create();
        log.info("before first call");
        webClient
                .get()
                .uri("http://www.baidu.com/error")
                .retrieve()
                .bodyToMono(String.class)
                .onErrorResume(e -> {log.error("exception:{}",e.getLocalizedMessage()); return Mono.just("hello");})
                .doOnNext(x -> log.info(x))
                .subscribe(x -> countDownLatch.countDown());
        countDownLatch.await();


        CountDownLatch countDownLatch2 = new CountDownLatch(2);
        MDC.put(REQUESTID, "2");
        log.info("before second call");
        webClient
                .get()
                .uri("http://www.baidu.com/error")
                .retrieve()
                .bodyToMono(String.class)
                .onErrorResume(e -> {log.error("exception:{}",e.getLocalizedMessage()); return Mono.just("hello");})
                .doOnNext(x -> log.info(x))
                .subscribe(x -> countDownLatch2.countDown());
        countDownLatch2.await();
    }
}

log4j2.xml配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN" monitorInterval="30">

    <Appenders>
        <Console name="ConsoleAppender" target="SYSTEM_OUT" follow="true">
            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %5p ${hostName} [%X{request-id}] [%thread] --- %-40.40c{1.} : %m%n%ex"/>
        </Console>
    </Appenders>
    <Loggers>
        <Root level="info">
            <AppenderRef ref="ConsoleAppender" />
        </Root>
    </Loggers>
</Configuration>

输出入下:

2019-07-29 15:19:46.787  INFO DaviddeMacBook-Pro.local [1] [main] --- c.h.w.Log4j2Test                         : before first call
2019-07-29 15:20:17.158 ERROR DaviddeMacBook-Pro.local [1] [reactor-http-nio-3] --- c.h.w.Log4j2Test                         : exception:connection timed out: www.baidu.com/180.97.33.107:80
2019-07-29 15:20:17.159  INFO DaviddeMacBook-Pro.local [1] [reactor-http-nio-3] --- c.h.w.Log4j2Test                         : hello
2019-07-29 15:20:17.159  INFO DaviddeMacBook-Pro.local [2] [main] --- c.h.w.Log4j2Test                         : before second call
2019-07-29 15:20:47.167 ERROR DaviddeMacBook-Pro.local [1] [reactor-http-nio-5] --- c.h.w.Log4j2Test                         : exception:connection timed out: www.baidu.com/180.97.33.107:80
2019-07-29 15:20:47.167  INFO DaviddeMacBook-Pro.local [1] [reactor-http-nio-5] --- c.h.w.Log4j2Test                         : hello

其中,reactor-http-nio-X应该是netty的线程,期望在第二次发请求时,记录的requestid为2。然而现状是,只有在主线程中记录了2,调用webclient后仍然记录的是第一次的requestid

@oldratlee
Copy link
Owner

oldratlee commented Jul 29, 2019

需要与 Reactive 框架的 执行 集成 TTL @naturallight

RxJava的集成

https://github.com/alibaba/transmittable-thread-local/blob/a22669d9770cac0930e07c895057ae10eb487447/src/test/java/com/alibaba/integration/RxJavaIntegrationDemo.kt#L27-L28

Reactor的集成

下面的Reactor的集成:

  • 对于Reactor自己的Scheduler是Work的。
  • 但对于reactor-netty所实现(应该主要是实现的Scheduler),目前看起来不能被ReactorSchedulers.addExecutorServiceDecorator Hook到。

所以,虽然这个Demo实现OK,但对于reactor-netty不Work。

要确认Reactor正确的Hook实现方式。

https://github.com/alibaba/transmittable-thread-local/blob/a22669d9770cac0930e07c895057ae10eb487447/src/test/java/com/alibaba/integration/ReactorIntegrationDemo.kt#L11-L14

你的示例代码已加入工程,可以直接运行

下面这个集成 是不对的,运行不对;要确认Reactor正确的Hook实现方式。

static {
// WIP! Not works yet!!
// TTL integration for Reactor
Schedulers.addExecutorServiceDecorator("TransmittableThreadLocal",
(scheduler, scheduledExecutorService) -> TtlExecutors.getTtlScheduledExecutorService(scheduledExecutorService));
Hooks.onEachOperator("TransmittableThreadLocal", objectPublisher -> new Publisher<Object>() {
final Object capture = capture();
@Override
public void subscribe(Subscriber<? super Object> s) {
final Object backup = replay(capture);
try {
objectPublisher.subscribe(s);
} finally {
restore(backup);
}
}
});
}

@naturallight
Copy link
Author

naturallight commented Jul 31, 2019

看了示例,加入了Hooks这段逻辑后,运行用例会报这样的异常:

java.lang.ExceptionInInitializerError
	at org.springframework.web.reactive.function.client.DefaultWebClientBuilder.build(DefaultWebClientBuilder.java:212)
	at org.springframework.web.reactive.function.client.WebClient.create(WebClient.java:134)
	at com.aksk.webfluxdemo.Log4j2Test.testLogWithWebClient(Log4j2Test.java:49)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
	at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.lang.ClassCastException: com.aksk.webfluxdemo.Log4j2Test$1 cannot be cast to reactor.core.publisher.Mono
	at reactor.core.publisher.Mono.onAssembly(Mono.java:4282)
	at reactor.core.publisher.Mono.error(Mono.java:261)
	at org.springframework.web.reactive.function.client.DefaultWebClient.<clinit>(DefaultWebClient.java:70)
	... 25 more

@oldratlee
Copy link
Owner

oldratlee commented Jul 31, 2019

是的。 这个Hook实现 不行(出异常),尝试探索中(Work In Process) 😄 @naturallight

需要找到 像RxJava的Hook方式:

把 传递入FluxLambda,通过统一的Runnable方式 来 Wrap成 TtlRunnable

@oldratlee oldratlee changed the title 在Reactor Webclient的场景下,使用ttl MDC不生效 在Reactor Webclient的场景下,使用TTL MDC不生效 Aug 1, 2019
@hu-chia
Copy link

hu-chia commented Jan 7, 2021

是的。 这个Hook实现 不行(出异常),尝试探索中(Work In Process) 😄 @naturallight

需要找到 像RxJava的Hook方式:

把 传递入FluxLambda,通过统一的Runnable方式 来 Wrap成 TtlRunnable

请教一下找到这种hook方式了么

@zizihrhr335500
Copy link

zizihrhr335500 commented May 20, 2022

TTL最新版本2.12.6中直接使用TtlWrappers#wrapConsumer可以。我自己测试可以

WebClient.create().method(HttpMethod.POST)
    .uri(URI.create("http://127.0.0.1:8013/api/flow/test/a"))
    .accept(MediaType.APPLICATION_JSON).contentType(MediaType.APPLICATION_JSON)
    .retrieve().bodyToMono(Map.class)
    .subscribe(TtlWrappers.wrapConsumer((map) -> {
        try {
            Thread.sleep(ThreadLocalRandom.current().nextInt(0,50));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        if (!Objects.equals(SampleController.LOCAL.get(), num)) {
            System.out.println("aaaaaaaaaaa");
        }
    }));

numSampleController放置的threadLocal@guyeu

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants