30、Skywalking的使用-异步链路追踪
絮语
对skywalking架构设计、性能调优感兴趣可以查看作者的文章:
1、异步链路追踪的概述
通过对 Callable
,Runnable
,Supplier
这3种接口的实现者进行增强拦截,将trace的上下文信息传递到子线程中,实现了异步链路追踪。
有非常多的方式来实现Callable
,Runnable
,Supplier
这3种接口,那么增强就面临以下问题:
- 增强所有的实现类显然不可能,必须基于有限的约定
- 不能让使用者大量修改代码,尽可能的基于现有的实现
可能基于以上问题的考虑,skywalking提供了一种即通用又快捷的方式来规范这一现象:
- 只拦截增强带有
@TraceCrossThread
注解的类: - 通过装饰的方式包装任务,避免大刀阔斧的修改
原始类 | 提供的包装类 | 拦截方法 | 使用技巧 |
---|---|---|---|
Callable |
CallableWrapper |
call | CallableWrapper.of(xxxCallable) |
Runnable | RunnableWrapper | run | RunnableWrapper.of(xxxRunable) |
Supplier |
SupplierWrapper |
get | SupplierWrapper.of(xxxSupplier) |
包装类 都有注解 @TraceCrossThread
,skywalking内部的拦截匹配逻辑是,标注了@TraceCrossThread
的类,拦截 其名称为call
或run
或 get
,且没有入参的方法;对使用者来说大致分为2种方式:
-
自定义类,实现接口
Callable
、Runnable
、Supplier
,加@TraceCrossThread
注解。当需要有更多的自定义属性时,考虑这种方式;参考CallableWrapper
、RunnableWrapper
、SupplierWrapper
的实现方式。 -
通过xxxWrapper.of 装饰的方式,即
CallableWrapper.of(xxxCallable)
、RunnableWrapper.of(xxxRunable)
、SupplierWrapper.of(xxxSupplier)
。大多情况下,通过这种包装模式即可。
2、异步链路追踪的使用
需引入如下依赖(版本限参考):
org.apache.skywalking
apm-toolkit-trace
8.5.0
2.1. CallableWrapper
Skywalking 通过CallableWrapper
包装Callable
2.1.1 thread+callable
private String async_thread_callable(String way,long time11,long time22 ) throws ExecutionException, InterruptedException {
FutureTask futureTask = new FutureTask(CallableWrapper.of(()->{
ActiveSpan.debug("async_Thread_Callable");
String str1 = service.sendMessage(way, time11, time22);
return str1;
}));
new Thread(futureTask).start();
return futureTask.get();
}
2.1.2 threadPool+callable
private String async_executorService_callable(String way,long time11,long time22 ) throws ExecutionException, InterruptedException {
Future callableResult = executorService.submit(CallableWrapper.of(() -> {
String str1 = service.sendMessage(way, time11, time22);
return str1;
}));
return (String) callableResult.get();
}
2.2. RunnableWrapper
Skywalking 通过RunnableWrapper
包装Runnable
2.2.1 thread+runnable
private String async_thread_runnable(String way,long time11,long time22 ) throws ExecutionException, InterruptedException {
//忽略返回值
FutureTask futureTask = new FutureTask(RunnableWrapper.of(() -> {
String str1 = service.sendMessage(way, time11, time22);
}), "mockRunnableResult");
new Thread(futureTask).start();
return (String) futureTask.get();
}
2.2.2 threadPool+runnable
private String async_executorService_runnable(String way,long time11,long time22 ) throws ExecutionException, InterruptedException {
//忽略真实返回值,mock固定返回值
Future mockRunnableResult = executorService.submit(RunnableWrapper.of(() -> {
String str1 = service.sendMessage(way, time11, time22);
}), "mockRunnableResult");
return (String) mockRunnableResult.get();
}
2.2.3 completableFuture + runAsync
通过RunnableWrapper.of(xxx)包装rannable即可。
2.3. SupplierWrapper
Skywalking 通过SupplierWrapper
包装Supplier
2.3.1 completableFuture + supplyAsync
private String async_completableFuture_supplyAsync(String way,long time11,long time22 ) throws ExecutionException, InterruptedException {
CompletableFuture stringCompletableFuture = CompletableFuture.supplyAsync(SupplierWrapper.of(() -> {
String str1 = service.sendMessage(way, time11, time22);
return str1;
}));
return stringCompletableFuture.get();
}
3、异步链路追踪的内部原理
需要将trace信息,在线程之间传递,比如 线程A -调用-> 线程B 的场景:
-
线程A
- 调用
ContextManager.capture()
,将trace的上下文信息保存到一个ContextSnapshot
的实例,并返回,此处命名为:contextSnapshot。 - 通过某种方式将contextSnapshot传递给线程B。
- 调用
-
线程B
- 在任务执行前,线程中B获取到contextSnapshot对象,并将其作为入参调用
ContextManager.continued(contextSnapshot)
。 - 此方法中解析出trace的信息后,存储到线程B的线程上下文中。
- 在任务执行前,线程中B获取到contextSnapshot对象,并将其作为入参调用
附语
对skywalking架构设计、性能调优感兴趣可以查看作者的文章:
如果这些内容对您有所帮助,或者有所启发的话,可以关注公众号:「架构染色」,进行交流和学习。
共有 0 条评论