Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@
<shaded.protobuf.version>3.25.8</shaded.protobuf.version>
<shaded.grpc.version>1.75.0</shaded.grpc.version>

<!-- OpenTelemetry versions -->
<opentelemetry.version>1.57.0</opentelemetry.version>
<opentelemetry-semconv.version>1.37.0</opentelemetry-semconv.version>
Comment on lines +193 to +194
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using the latest 1.59.0 and 1.40.0?

<!-- Test properties -->
<maven.test.redirectTestOutputToFile>true</maven.test.redirectTestOutputToFile>
<test.exclude.pattern>_</test.exclude.pattern>
Expand Down Expand Up @@ -398,7 +401,35 @@
<artifactId>jakarta.annotation-api</artifactId>
<version>${jakarta.annotation.version}</version>
</dependency>

<!-- OpenTelemetry dependencies -->
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.semconv</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<version>${opentelemetry-semconv.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-context</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
</dependencies>

</dependencyManagement>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(RaftClientReques

Optional.ofNullable(request.getSlidingWindowEntry()).ifPresent(b::setSlidingWindowEntry);
Optional.ofNullable(request.getRoutingTable()).map(RoutingTable::toProto).ifPresent(b::setRoutingTable);
Optional.ofNullable(request.getSpanContext()).ifPresent(b::setSpanContext);

return b.setCallId(request.getCallId())
.setToLeader(request.isToLeader())
Expand Down Expand Up @@ -188,6 +189,9 @@ static RaftClientRequest toRaftClientRequest(RaftClientRequestProto p) {
if (request.hasSlidingWindowEntry()) {
b.setSlidingWindowEntry(request.getSlidingWindowEntry());
}
if (request.hasSpanContext()) {
b.setSpanContext(request.getSpanContext());
}
return b.setClientId(ClientId.valueOf(request.getRequestorId()))
.setGroupId(ProtoUtils.toRaftGroupId(request.getRaftGroupId()))
.setCallId(request.getCallId())
Expand Down
25 changes: 25 additions & 0 deletions ratis-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,31 @@
<artifactId>slf4j-api</artifactId>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
</dependency>

<dependency>
<groupId>io.opentelemetry.semconv</groupId>
<artifactId>opentelemetry-semconv</artifactId>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-context</artifactId>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.ratis.proto.RaftProtos.ReadRequestTypeProto;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
import org.apache.ratis.proto.RaftProtos.SpanContextProto;
import org.apache.ratis.proto.RaftProtos.StaleReadRequestTypeProto;
import org.apache.ratis.proto.RaftProtos.WatchRequestTypeProto;
import org.apache.ratis.proto.RaftProtos.WriteRequestTypeProto;
Expand Down Expand Up @@ -305,6 +306,7 @@ public static class Builder {
private SlidingWindowEntry slidingWindowEntry;
private RoutingTable routingTable;
private long timeoutMs;
private SpanContextProto spanContext;

public RaftClientRequest build() {
return new RaftClientRequest(this);
Expand Down Expand Up @@ -366,6 +368,11 @@ public Builder setTimeoutMs(long timeoutMs) {
this.timeoutMs = timeoutMs;
return this;
}

public Builder setSpanContext(SpanContextProto spanContext) {
this.spanContext = spanContext;
return this;
}
}

public static Builder newBuilder() {
Expand Down Expand Up @@ -397,6 +404,8 @@ public static RaftClientRequest toWriteRequest(RaftClientRequest r, Message mess

private final boolean toLeader;

private SpanContextProto spanContext;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add final.


/** Construct a request for sending to the given server. */
protected RaftClientRequest(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId, Type type) {
this(newBuilder()
Expand Down Expand Up @@ -429,6 +438,7 @@ private RaftClientRequest(Builder b) {
this.slidingWindowEntry = b.slidingWindowEntry;
this.routingTable = b.routingTable;
this.timeoutMs = b.timeoutMs;
this.spanContext = b.spanContext;
}

@Override
Expand Down Expand Up @@ -472,6 +482,10 @@ public long getTimeoutMs() {
return timeoutMs;
}

public SpanContextProto getSpanContext() {
return spanContext;
}

@Override
public String toString() {
return super.toString() + ", seq=" + ProtoUtils.toString(slidingWindowEntry) + ", "
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.trace;

import io.opentelemetry.api.common.AttributeKey;

/**
* The constants in this class correspond with the guidance outlined by the OpenTelemetry <a href=
* "https://github.com/open-telemetry/semantic-conventions">Semantic
* Conventions</a>.
*/
public final class RatisAttributes {
public static final AttributeKey<String> ATTR_MEMBER_ID = AttributeKey.stringKey("raft.member.id");
public static final AttributeKey<String> ATTR_CALLER_ID = AttributeKey.stringKey("raft.caller.id");
Copy link
Contributor

@szetszwo szetszwo Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The callId is unique only within a single client. We should use ClientInvocationId.



private RatisAttributes() {
}
}
85 changes: 85 additions & 0 deletions ratis-common/src/main/java/org/apache/ratis/trace/TraceUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.trace;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.context.propagation.TextMapGetter;
import org.apache.ratis.proto.RaftProtos;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import SpanContextProto instead. The code will be easier to read.

import org.apache.ratis.proto.RaftProtos.SpanContextProto;

import org.apache.ratis.util.VersionInfo;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

public final class TraceUtils {

private TraceUtils() {
}

public static Tracer getGlobalTracer() {
return GlobalOpenTelemetry.getTracer("org.apache.ratis", VersionInfo.getSoftwareInfoVersion());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a static field, so we don't have to create the same Tracer multiple times.

  private static final Tracer TRACER = GlobalOpenTelemetry.getTracer("org.apache.ratis", VersionInfo.getSoftwareInfoVersion());

}

/**
* Create a span which parent is from remote, i.e, passed through rpc.
* </p>
* We will set the kind of the returned span to {@link SpanKind#SERVER}, as this should be the top
* most span at server side.
*/
public static Span createRemoteSpan(String name, Context ctx) {
return getGlobalTracer().spanBuilder(name).setParent(ctx).setSpanKind(SpanKind.SERVER)
.startSpan();
}

private static final TextMapPropagator PROPAGATOR =
GlobalOpenTelemetry.getPropagators().getTextMapPropagator();

public static RaftProtos.SpanContextProto injectContextToProto(Context context) {
Map<String, String> carrier = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For String, would TreeMap be better? It will also sort them.

PROPAGATOR.inject(context, carrier, (map, key, value) -> map.put(key, value));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: What exactly will be injected, ATTR_MEMBER_ID and ATTR_CALLER_ID? Anything else?

return RaftProtos.SpanContextProto.newBuilder().putAllContext(carrier).build();
}

public static Context extractContextFromProto(RaftProtos.SpanContextProto proto) {
if (proto == null || proto.getContextMap().isEmpty()) {
return Context.current();
}
final TextMapGetter<RaftProtos.SpanContextProto> getter = SpanContextGetter.INSTANCE;
return PROPAGATOR.extract(Context.current(), proto, getter);
}
}

class SpanContextGetter implements TextMapGetter<RaftProtos.SpanContextProto> {
static final SpanContextGetter INSTANCE = new SpanContextGetter();

@Override
public Iterable<String> keys(RaftProtos.SpanContextProto carrier) {
return carrier.getContextMap().keySet();
}

@Override
public String get(RaftProtos.SpanContextProto carrier, String key) {
return Optional.ofNullable(carrier).map(RaftProtos.SpanContextProto::getContextMap)
.map(map -> map.get(key)).orElse(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,14 @@ static void printSystemProperties(BiConsumer<String, Object> out) {
sortedMap.forEach(out);
}

/**
* Get the current ratis version.
* @return the current ratis version string.
*/
public static String getSoftwareInfoVersion() {
return VersionInfo.load(VersionInfo.class).softwareInfos.getOrDefault(SoftwareInfo.VERSION);
}

public static void main(String[] args) {
printSystemProperties((key, value) -> System.out.printf("%-40s = %s%n", key, value));

Expand Down
6 changes: 6 additions & 0 deletions ratis-proto/src/main/proto/Raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ message RaftRpcRequestProto {
uint64 callId = 4;
bool toLeader = 5;

SpanContextProto spanContext = 11;
repeated uint64 repliedCallIds = 12; // The call ids of the replied requests
uint64 timeoutMs = 13;
RoutingTableProto routingTable = 14;
Expand Down Expand Up @@ -569,3 +570,8 @@ message LogInfoProto {
TermIndexProto committed = 3;
TermIndexProto lastEntry = 4;
}

// The attribute map for opentelemetry trace
message SpanContextProto {
map<string, string> context = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
*/
package org.apache.ratis.server.impl;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.metrics.Timekeeper;
Expand Down Expand Up @@ -100,6 +104,8 @@
import org.apache.ratis.statemachine.impl.TransactionContextImpl;
import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.trace.RatisAttributes;
import org.apache.ratis.trace.TraceUtils;
import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.ConcurrentUtils;
Expand Down Expand Up @@ -943,16 +949,44 @@ CompletableFuture<RaftClientReply> executeSubmitClientRequestAsync(RaftClientReq
@Override
public CompletableFuture<RaftClientReply> submitClientRequestAsync(
RaftClientRequest request) throws IOException {
assertLifeCycleState(LifeCycle.States.RUNNING);
LOG.debug("{}: receive client request({})", getMemberId(), request);
final Timekeeper timer = raftServerMetrics.getClientRequestTimer(request.getType());
final Optional<Timekeeper.Context> timerContext = Optional.ofNullable(timer).map(Timekeeper::time);
return replyFuture(request).whenComplete((clientReply, exception) -> {
timerContext.ifPresent(Timekeeper.Context::stop);
if (exception != null || clientReply.getException() != null) {
raftServerMetrics.incFailedRequestCount(request.getType());
}
});
final Context remoteContext = TraceUtils.extractContextFromProto(request.getSpanContext());
final Span span = TraceUtils.createRemoteSpan("raft.server.submitClientRequestAsync", remoteContext);
span.setAttribute(RatisAttributes.ATTR_MEMBER_ID, getMemberId().toString());
span.setAttribute(RatisAttributes.ATTR_CALLER_ID, String.valueOf(request.getCallId()));
try (Scope ignored = span.makeCurrent()) {
assertLifeCycleState(LifeCycle.States.RUNNING);
LOG.debug("{}: receive client request({})", getMemberId(), request);
final Timekeeper timer = raftServerMetrics.getClientRequestTimer(request.getType());
final Optional<Timekeeper.Context> timerContext = Optional.ofNullable(timer).map(Timekeeper::time);
span.addEvent("Processing client request");
final CompletableFuture<RaftClientReply> future = replyFuture(request);
return future.whenComplete((clientReply, exception) -> {
try (Scope completeScope = span.makeCurrent()) {
timerContext.ifPresent(Timekeeper.Context::stop);
if (exception != null || (clientReply != null && clientReply.getException() != null)) {
raftServerMetrics.incFailedRequestCount(request.getType());
}
if (exception != null) {
span.recordException(exception);
span.setStatus(StatusCode.ERROR, exception.getMessage());
} else if (clientReply != null && clientReply.getException() != null) {
span.recordException(clientReply.getException());
span.setStatus(StatusCode.ERROR, clientReply.getException().getMessage());
}
} finally {
span.addEvent("Completed client request");
span.end();
}
});
} catch (Exception | Error e) {
// this catch block is for exceptions thrown before the future is returned.
// Any exception thrown after the future is returned should be handled in
// the whenComplete callback above.
span.recordException(e);
span.setStatus(StatusCode.ERROR, e.getMessage());
span.end();
throw e;
}
Comment on lines +952 to +989
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let add traceAsyncMethod(..) to TraceUtils and move the opentelemetry code there.

}

private CompletableFuture<RaftClientReply> replyFuture(RaftClientRequest request) throws IOException {
Expand Down
Loading
Loading