OpenTelemetry 深度定制:跨服務(wù)追蹤的實(shí)戰(zhàn)技巧
背景
圖片
我們的需求是這樣的:
圖片
假設(shè)現(xiàn)在有三個(gè)服務(wù):ServiceA、ServiceB、ServiceC。
ServiceA 對(duì)外提供了一個(gè) http 接口 request,在這個(gè)接口會(huì)調(diào)用 ServiceB 的 order 訂單接口創(chuàng)建訂單,同時(shí) serviceB 調(diào)用 serviceC 的 pay 接口。
圖片
整個(gè)調(diào)用關(guān)系如上圖所示。
默認(rèn)情況下 span 中的 attribute 會(huì)記錄當(dāng)前 span 的一些信息,比如:
圖片
這些都是當(dāng)前一些當(dāng)前 span 內(nèi)置的信息,比如當(dāng)前 gRPC 接口的一些基本數(shù)據(jù):服務(wù)名、ip、端口等信息。
但這里并沒(méi)有上游的一些信息,雖然我們可以通過(guò) Jaeger 的樹(shù)狀圖得知上游是哪個(gè)應(yīng)用調(diào)用過(guò)來(lái)的,但是一旦某個(gè) span 下有多個(gè)子 span 的調(diào)用,就沒(méi)辦法很直觀知道這個(gè)子 span 的上游是由誰(shuí)發(fā)起的調(diào)用。
比如如下這個(gè)鏈路:
圖片
當(dāng)一個(gè)調(diào)用鏈非常長(zhǎng),同時(shí)也非常復(fù)雜時(shí),沒(méi)辦法第一時(shí)間知道某一個(gè) span 的上游到底是誰(shuí)發(fā)起的,需要手動(dòng)一層層的去折疊,或者全靠眼睛去找。
預(yù)期效果
圖片
為此我們希望的效果是可以通過(guò)給每一個(gè)子 span 中加入兩個(gè) attribute,來(lái)標(biāo)明它的父調(diào)用來(lái)源。
比如在 serviceB 中的所有 span 中都會(huì)加上兩個(gè)標(biāo)簽:來(lái)源是 serviceA,同時(shí)是 serviceA 的 request 接口發(fā)起的請(qǐng)求。
而在 serviceC 中同樣可以知道來(lái)源是 serviceB 的 Order 接口發(fā)起的調(diào)用。
我啟動(dòng)了三個(gè) demo 應(yīng)用,分別是 create1,create2,create3.
create1 中會(huì)提供一個(gè) request 接口,在這里面調(diào)用 create2 的 create2 接口,create2 的接口里接著調(diào)用 create3 的 create3 接口。
create1:
@RequestMapping("/request")
public String request(@RequestParam String name) {
HelloRequest request = HelloRequest.newBuilder()
.setName(name)
.build();
log.info("request: {}", request);
String message = myServiceStub.create2(request).getMessage();
Executors.newFixedThreadPool(1).execute(() -> {
myServiceStub.create2(request).getMessage();
}); return message;
}
create2:
@Override
public void create2(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
HelloReply reply = HelloReply.newBuilder()
.setMessage("Create2 ==> " + request.getName())
.build();
log.info("Create2: {}", reply.getMessage());
myMethod(request.getName());
myServiceStub.create3(request);
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
create3:
@Override
public void create3(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
HelloReply reply = HelloReply.newBuilder()
.setMessage("Create3 ==> " + request.getName())
.build();
log.info("Create3: {}", reply.getMessage());
myMethod(request.getName());
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
java -javaagent:opentelemetry-javaagent-2.4.0-SNAPSHOT.jar \
-Dotel.javaagent.extensinotallow=otel-extensions-custom-context-1.0-SNAPSHOT.jar \
-Dotel.traces.exporter=otlp \
-Dotel.logs.exporter=none \
-Dotel.service.name=create2 \
-Dotel.exporter.otlp.protocol=grpc \
-Dotel.propagators=tracecontext,baggage,demo \
-Dotel.exporter.otlp.endpoint=http://127.0.0.1:5317 \
-jar target/demo-0.0.1-SNAPSHOT.jar --spring.application.name=create2 --server.port=9191 --grpc.server.port=9292 --grpc.client.myService.address=static://127.0.0.1:9393
只是每個(gè)應(yīng)用都需要使用我這邊單獨(dú)打的 agent 包以及一個(gè) extension(tel-extensions-custom-context-1.0-SNAPSHOT.jar) 才能生效。
最終的效果如下:
圖片
Baggage
在講具體的實(shí)現(xiàn)之前需要先了解幾個(gè) Trace 中的概念,在這里主要用到的是一個(gè)稱為 Baggage 的對(duì)象。
在之前的文章中其實(shí)提到過(guò)它的原理以及使用場(chǎng)景:從 Dapper 到 OpenTelemetry:分布式追蹤的演進(jìn)之旅
圖片
Baggage 的中文翻譯是:包裹??;簡(jiǎn)單來(lái)說(shuō)就是我們可以通過(guò)自定義 baggage 可以將我們想要的數(shù)據(jù)存放在其中,這樣再整個(gè) Trace 的任意一個(gè) Span 中都可以讀取到。
@RequestMapping("/request")
public String request(@RequestParam String name) {
// 寫(xiě)入
Baggage.current().toBuilder().
put("request.name", name).build()
.storeInContext(Context.current()).makeCurrent();
}
// 獲取
String value = Baggage.current().getEntryValue("request.name");
log.info("request.name: {}", value);
理解了這個(gè)之后,我們要實(shí)現(xiàn)的將上游的信息傳遞到下游就可以通過(guò)這個(gè)組件實(shí)現(xiàn)了。
只需要在上游創(chuàng)建 span 時(shí)將它自身數(shù)據(jù)寫(xiě)入到 Baggage 中,再到下游 span 取出來(lái)寫(xiě)入到 attribute 中即可。
ContextCustomizer
這里的關(guān)鍵就是在哪里寫(xiě)入這個(gè) Baggage,因?yàn)閷?duì)第三方組件的 Instrumentation 的實(shí)現(xiàn)都是在 opentelemetry-java-instrumentation項(xiàng)目中。
javaagent.jar 包也是通過(guò)該項(xiàng)目打包出來(lái)的。
所以在該項(xiàng)目的 io.opentelemetry.instrumentation.api.instrumenter.Instrumenter#doStart 這個(gè)函數(shù)中我們發(fā)現(xiàn)一段邏輯:
圖片
這個(gè)函數(shù)是在創(chuàng)建一個(gè) span 的時(shí)候調(diào)用的,通常這個(gè)創(chuàng)建函數(shù)是在這些第三方庫(kù)的攔截器中創(chuàng)建的。
圖片
比如這是在 grpc 的攔截器中調(diào)用。
// context customizers run before span start, so that they can have access to the parent span
// context, and so that their additions to the context will be visible to span processors
for (ContextCustomizer<? super REQUEST> contextCustomizer : contextCustomizers) {
context = contextCustomizer.onStart(context, request, attributes);
}
ContextCustomizer 是一個(gè)接口只提供了一個(gè)函數(shù):
public interface ContextCustomizer<REQUEST> {
/** Allows to customize the operation {@link Context}. */
Context onStart(Context parentContext, REQUEST request, Attributes startAttributes);
}
- Context 是上下文信息,可以在自定義的 ContextCustomizer 繼續(xù)往上下文中追加信息。
- REQUEST 是一個(gè)泛型:一般是當(dāng)前第三方組件的請(qǐng)求信息:
比如是 HTTP 時(shí),這個(gè) request 就是 HTTP 的請(qǐng)求信息。
而如果是 gRPC ,則是 gRPC 的請(qǐng)求信息。
其他的請(qǐng)求類型同理。
- startAttributes 則是預(yù)先寫(xiě)入的一些屬性,比如在上圖中看到的一些 rpc.service/rpc.method等字段。
// context customizers run before span start, so that they can have access to the parent span
// context, and so that their additions to the context will be visible to span processors
從這個(gè)接口的調(diào)用注釋可以看出:這個(gè)自定義的 context 會(huì)在 span 開(kāi)始之前調(diào)用,所以在這里是可以訪問(wèn)到當(dāng)前創(chuàng)建的 span 的父 context,同時(shí)在這里的 context 中新增的數(shù)據(jù)可以在 SpanProcessor 訪問(wèn)到。
SpanProcessor
而 SpanProcessor 又是一個(gè)非常的重要的組件,我們接著剛才的 contextCustomizer 處往后跟蹤代碼。
context = contextCustomizer.onStart(context, request, attributes);
--->Span span = spanBuilder.setParent(context).startSpan();
--->io.opentelemetry.sdk.trace.SdkSpanBuilder#startSpan
--->io.opentelemetry.sdk.trace.SdkSpan#startSpan
--->spanProcessor.onStart(parentContext, span);
可以看到 spanProcessor.onStart 這個(gè)函數(shù)會(huì)在 contextCustomizer 之后調(diào)用。
圖片
/**
* SpanProcessor is the interface {@link SdkTracer} uses to allow synchronous hooks for when a
* {@code Span} is started or when a {@code Span} is ended.
*/
//==========================================================
/**
* Called when a {@link io.opentelemetry.api.trace.Span} is started, if the {@link
* Span#isRecording()} returns true.
* * <p>This method is called synchronously on the execution thread, should not throw or block the
* execution thread. * * @param parentContext the parent {@code Context} of the span that just started.
* @param span the {@code Span} that just started.
*/void onStart(Context parentContext, ReadWriteSpan span);
從注釋中可以知道 SpanProcessor 是作為一個(gè) span 的生命周期中的關(guān)鍵節(jié)點(diǎn)的 hook 函數(shù)。
在這些函數(shù)中我們可以自定義一些 span 的數(shù)據(jù),比如在 onStart 還可以往 span 中寫(xiě)入一些自定義的 attribute。
這也是我們這次會(huì)用到的一個(gè)接口,我們的方案是:
在 gRPC 構(gòu)建 Instrument 時(shí)自定義一個(gè) GrpcServerContextCustomizer ,在這個(gè)自定義的 ContextCustomizer 中寫(xiě)入一個(gè) Baggage。
然后在 io.opentelemetry.sdk.trace.SpanProcessor#onStart 接口中取出這個(gè) Baggage 寫(xiě)入到當(dāng)前 span 的 attribute 中。
這樣我們就可以看到之前提到的那些數(shù)據(jù)上游信息了。
圖片
為 gRPC 添加上下文
先來(lái)看看如何為 gRPC 添加 Baggage:
我們先自定義一個(gè) GrpcServerContextCustomizer 實(shí)現(xiàn)類:
public class GrpcServerContextCustomizer implements ContextCustomizer<GrpcRequest> {
private final String currentServiceName;
private static final String PARENT_RPC_KEY = "parent_rpc";
private static final String CURRENT_RPC_KEY = "current_rpc";
private static final String CURRENT_HTTP_URL_PATH = "current_http_url_path";
public GrpcServerContextCustomizer(String serviceName) {
this.currentServiceName = serviceName;
}
@Override
public Context onStart(Context parentContext, GrpcRequest grpcRequest,
Attributes startAttributeds) {
BaggageBuilder builder = Baggage.fromContext(parentContext).toBuilder();
String currentRpc = Baggage.fromContext(parentContext).getEntryValue(CURRENT_RPC_KEY);
String fullMethodName = startAttributeds.get(AttributeKey.stringKey("rpc.method"));
String rpcService = startAttributeds.get(AttributeKey.stringKey("rpc.service"));
// call from grpc
String method = rpcService + ":" + fullMethodName;
String baggageInfo = getBaggageInfo(currentServiceName, method);
String httpUrlPath = Baggage.fromContext(parentContext).getEntryValue(CURRENT_HTTP_URL_PATH);
if (!StringUtils.isNullOrEmpty(httpUrlPath)) {
// call from http
// currentRpc = currentRpc; currentRpc = create1|GET:/request // clear current_http_url_path builder.put(CURRENT_HTTP_URL_PATH, "");
}
Baggage baggage = builder
.put(PARENT_RPC_KEY, currentRpc)
.put(CURRENT_RPC_KEY, baggageInfo)
.build();
return parentContext.with(baggage);
}
private static String getBaggageInfo(String serviceName, String method) {
if (StringUtils.isNullOrEmpty(serviceName)) {
return "";
} return serviceName + "|" + method;
}
}
從這個(gè)代碼中可以看出,我們需要先從上下文中獲取 CURRENT_RPC_KEY ,從而得知當(dāng)前的 span 是不是 root span。
所以我們其實(shí)是把當(dāng)前的 span 信息作為一個(gè) PARENT_RPC_KEY 寫(xiě)入到 Baggage 中。
這樣在 SpanProcessor 中便可以直接取出 PARENT_RPC_KEY 作為上游的信息寫(xiě)入 span 的 attribute 中。
@Override
public void onStart(Context parentContext, ReadWriteSpan span) {
String parentRpc = Baggage.fromContext(parentContext).getEntryValue("parent_rpc");
if (!StringUtils.isNullOrEmpty(parentRpc)) {
String[] split = parentRpc.split("\\|");
span.setAttribute("parent_rpc", parentRpc);
span.setAttribute("parent_service_name", split[0]);
span.setAttribute("parent_service_method", split[1]);
}
}
需要注意的是,這里的 Baggage 需要使用 Baggage.fromContext(parentContext) 才能拿到剛才寫(xiě)入 Baggage 信息。
之后我們找到構(gòu)建 gRPCServerInstrumenterBuilder 的地方,寫(xiě)入我們剛才自定義的 GrpcServerContextCustomizer 即可。
圖片
.addContextCustomizer(new GrpcServerContextCustomizer(serviceName))
這里我們選擇寫(xiě)入到是 serverInstrumenterBuilder 而不是clientInstrumenterBuilder,因?yàn)樵诜?wù)端的入口就知道是從哪個(gè)接口進(jìn)來(lái)的請(qǐng)求。
為 spring boot 的 http 接口添加上下文
如果只存在 gRPC 調(diào)用時(shí)只添加 gRPC 的上下文也夠用了,但是我們也不排除由外部接口是通過(guò) HTTP 訪問(wèn)進(jìn)來(lái)的,然后再調(diào)用內(nèi)部的 gRPC 接口;這也是非常常見(jiàn)的架構(gòu)模式。
所以我們還需要在 HTTP 中增加 ContextCustomizer 將自身的數(shù)據(jù)寫(xiě)入到 Baggage 中。
好在 HttpServerRouteBuilder 自身是實(shí)現(xiàn)了 ContextCustomizer 接口的,我們只需要往里面寫(xiě)入 Baggage 數(shù)據(jù)即可。
public ContextCustomizer<REQUEST> build() {
Set<String> knownMethods = new HashSet<>(this.knownMethods);
return (context, request, startAttributes) -> {
if (HttpRouteState.fromContextOrNull(context) != null) {
return context;
} String method = getter.getHttpRequestMethod(request);
if (method == null || !knownMethods.contains(method)) {
method = "HTTP";
} String urlPath = getter.getUrlPath(request);
String methodPath = method + ":" + urlPath;
String currentRpc = Baggage.fromContext(context).getEntryValue(CURRENT_RPC_KEY);
String baggageInfo = getBaggageInfo(serviceName, methodPath);
Baggage baggage = Baggage.fromContext(context).toBuilder()
.put(PARENT_RPC_KEY, currentRpc)
.put(CURRENT_RPC_KEY, baggageInfo)
.put(CURRENT_HTTP_URL_PATH, methodPath)
.build();
return context.with(HttpRouteState.create(method, null, 0))
.with(baggage);
};}
這里新增了 CURRENT_HTTP_URL_PATH 用于標(biāo)記當(dāng)前的請(qǐng)求來(lái)源是 HTTP,在 grpc 的 ContextCustomizer 解析時(shí)會(huì)判斷這個(gè)值是否為空。
String httpUrlPath = Baggage.fromContext(parentContext).getEntryValue(CURRENT_HTTP_URL_PATH);
if (!StringUtils.isNullOrEmpty(httpUrlPath)) {
// call from http
// currentRpc = currentRpc; currentRpc = create1|GET:/request // clear current_http_url_path builder.put(CURRENT_HTTP_URL_PATH, "");
}
圖片
這樣就可以在 grpc 的下游接口拿到入口的 HTTP 接口數(shù)據(jù)了。
當(dāng)然也有可能是在 grpc 接口中調(diào)用 HTTP 的接口的場(chǎng)景,只是我們的業(yè)務(wù)中沒(méi)有這種情況,所以就沒(méi)有適配這類的場(chǎng)景。
總結(jié)
ContextCustomizer 接口沒(méi)有提供對(duì)應(yīng)的擴(kuò)展,但是 SpanProcessor 是提供了擴(kuò)展接口的。
原本是想盡量別維護(hù)自己的 javaagent,但也好在 OpenTelemetry 是提供的接口,所以也并不會(huì)去修改原本的代碼。
所以我們還是需要?jiǎng)?chuàng)建一個(gè) extensions 的項(xiàng)目在實(shí)現(xiàn) SpanProcessor,這個(gè)在之前的 《實(shí)戰(zhàn):如何編寫(xiě)一個(gè) OpenTelemetry Extensions》有詳細(xì)講到。
所以最后的應(yīng)用啟動(dòng)方式如下:
java -javaagent:opentelemetry-javaagent-2.4.0-SNAPSHOT.jar \
-Dotel.javaagent.extensinotallow=otel-extensions-custom-context-1.0-SNAPSHOT.jar \
需要使用我們手動(dòng)打包的 javaagent 以及一個(gè)自定義擴(kuò)展包。
打包方式:
./gradlew assemble
opentelemetry-java-instrumentation 項(xiàng)目比較大,所以打包過(guò)程可能比較久。
因?yàn)檫@其實(shí)是一些定制需求,所以就沒(méi)提交到上游,感興趣的可以自行合并代碼測(cè)試。
最后可以這個(gè)分支中查看到修改的部分:https://github.com/crossoverJie/opentelemetry-java-instrumentation/compare/main...add-grpc-context