30、Skywalking的使用-异步链路追踪

絮语

对skywalking架构设计、性能调优感兴趣可以查看作者的文章:

1、异步链路追踪的概述

通过对 Callable,Runnable,Supplier 这3种接口的实现者进行增强拦截,将trace的上下文信息传递到子线程中,实现了异步链路追踪。

有非常多的方式来实现Callable,Runnable,Supplier 这3种接口,那么增强就面临以下问题:

  1. 增强所有的实现类显然不可能,必须基于有限的约定
  2. 不能让使用者大量修改代码,尽可能的基于现有的实现

可能基于以上问题的考虑,skywalking提供了一种即通用又快捷的方式来规范这一现象:

  1. 只拦截增强带有@TraceCrossThread 注解的类:
  2. 通过装饰的方式包装任务,避免大刀阔斧的修改
原始类 提供的包装类 拦截方法 使用技巧
Callable CallableWrapper call CallableWrapper.of(xxxCallable)
Runnable RunnableWrapper run RunnableWrapper.of(xxxRunable)
Supplier SupplierWrapper get SupplierWrapper.of(xxxSupplier)

包装类 都有注解 @TraceCrossThread ,skywalking内部的拦截匹配逻辑是,标注了@TraceCrossThread的类,拦截 其名称为callrunget ,且没有入参的方法;对使用者来说大致分为2种方式:

  1. 自定义类,实现接口 CallableRunnableSupplier,加@TraceCrossThread注解。当需要有更多的自定义属性时,考虑这种方式;参考 CallableWrapperRunnableWrapperSupplierWrapper 的实现方式。

  2. 通过xxxWrapper.of 装饰的方式,即CallableWrapper.of(xxxCallable)RunnableWrapper.of(xxxRunable)SupplierWrapper.of(xxxSupplier)。大多情况下,通过这种包装模式即可。

2、异步链路追踪的使用

需引入如下依赖(版本限参考):


    org.apache.skywalking
    apm-toolkit-trace
    8.5.0

2.1. CallableWrapper

Skywalking 通过CallableWrapper包装Callable

image.png
2.1.1 thread+callable
private String async_thread_callable(String way,long time11,long time22 ) throws ExecutionException, InterruptedException {
        FutureTask futureTask = new FutureTask(CallableWrapper.of(()->{
            ActiveSpan.debug("async_Thread_Callable");
            String str1 = service.sendMessage(way, time11, time22);
            return str1;
        }));
        new Thread(futureTask).start();
        return futureTask.get();
    }
2.1.2 threadPool+callable
    private String async_executorService_callable(String way,long time11,long time22 ) throws ExecutionException, InterruptedException {
        Future callableResult = executorService.submit(CallableWrapper.of(() -> {
            String str1 = service.sendMessage(way, time11, time22);
            return str1;
        }));
        return (String) callableResult.get();
    }

2.2. RunnableWrapper

Skywalking 通过RunnableWrapper包装Runnable

image.png
2.2.1 thread+runnable
    private String async_thread_runnable(String way,long time11,long time22 ) throws ExecutionException, InterruptedException {
        //忽略返回值
        FutureTask futureTask = new FutureTask(RunnableWrapper.of(() -> {
            String str1 = service.sendMessage(way, time11, time22);
        }), "mockRunnableResult");
        new Thread(futureTask).start();
        return (String) futureTask.get();
    }
2.2.2 threadPool+runnable
        private String async_executorService_runnable(String way,long time11,long time22 ) throws ExecutionException, InterruptedException {
        //忽略真实返回值,mock固定返回值
        Future mockRunnableResult = executorService.submit(RunnableWrapper.of(() -> {
            String str1 = service.sendMessage(way, time11, time22);
        }), "mockRunnableResult");
        return (String) mockRunnableResult.get();
    }
2.2.3 completableFuture + runAsync

通过RunnableWrapper.of(xxx)包装rannable即可。

2.3. SupplierWrapper

Skywalking 通过SupplierWrapper包装Supplier

image.png
2.3.1 completableFuture + supplyAsync
    private String async_completableFuture_supplyAsync(String way,long time11,long time22 ) throws ExecutionException, InterruptedException {
        CompletableFuture stringCompletableFuture = CompletableFuture.supplyAsync(SupplierWrapper.of(() -> {
            String str1 = service.sendMessage(way, time11, time22);
            return str1;
        }));
        return stringCompletableFuture.get();
    }

3、异步链路追踪的内部原理

需要将trace信息,在线程之间传递,比如 线程A -调用-> 线程B 的场景:

  • 线程A

    1. 调用ContextManager.capture(),将trace的上下文信息保存到一个ContextSnapshot的实例,并返回,此处命名为:contextSnapshot。
    2. 通过某种方式将contextSnapshot传递给线程B。
  • 线程B

    1. 在任务执行前,线程中B获取到contextSnapshot对象,并将其作为入参调用ContextManager.continued(contextSnapshot)
    2. 此方法中解析出trace的信息后,存储到线程B的线程上下文中。

附语

对skywalking架构设计、性能调优感兴趣可以查看作者的文章:

如果这些内容对您有所帮助,或者有所启发的话,可以关注公众号:「架构染色」,进行交流和学习。

版权声明:
作者:ht
链接:https://www.techfm.club/p/43417.html
来源:TechFM
文章版权归作者所有,未经允许请勿转载。

THE END
分享
二维码
< <上一篇
下一篇>>