Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class DatabaseClientImpl implements DatabaseClient {
private static final String READ_ONLY_TRANSACTION = "CloudSpanner.ReadOnlyTransaction";
private static final String PARTITION_DML_TRANSACTION = "CloudSpanner.PartitionDMLTransaction";
private final TraceWrapper tracer;
private Attributes commonAttributes;
private final Attributes databaseAttributes;
@VisibleForTesting final String clientId;
@VisibleForTesting final SessionPool pool;
@VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient;
Expand Down Expand Up @@ -88,15 +88,15 @@ class DatabaseClientImpl implements DatabaseClient {
boolean useMultiplexedSessionPartitionedOps,
TraceWrapper tracer,
boolean useMultiplexedSessionForRW,
Attributes commonAttributes) {
Attributes databaseAttributes) {
this.clientId = clientId;
this.pool = pool;
this.useMultiplexedSessionBlindWrite = useMultiplexedSessionBlindWrite;
this.multiplexedSessionDatabaseClient = multiplexedSessionDatabaseClient;
this.useMultiplexedSessionPartitionedOps = useMultiplexedSessionPartitionedOps;
this.tracer = tracer;
this.useMultiplexedSessionForRW = useMultiplexedSessionForRW;
this.commonAttributes = commonAttributes;
this.databaseAttributes = databaseAttributes;

this.clientIdToOrdinalMap = new HashMap<String, Integer>();
this.dbId = this.dbIdFromClientId(this.clientId);
Expand Down Expand Up @@ -203,7 +203,7 @@ public Timestamp write(final Iterable<Mutation> mutations) throws SpannerExcepti
public CommitResponse writeWithOptions(
final Iterable<Mutation> mutations, final TransactionOption... options)
throws SpannerException {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, databaseAttributes, options);
try (IScope s = tracer.withSpan(span)) {
if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) {
return getMultiplexedSessionDatabaseClient().writeWithOptions(mutations, options);
Expand All @@ -230,7 +230,7 @@ public Timestamp writeAtLeastOnce(final Iterable<Mutation> mutations) throws Spa
public CommitResponse writeAtLeastOnceWithOptions(
final Iterable<Mutation> mutations, final TransactionOption... options)
throws SpannerException {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, databaseAttributes, options);
try (IScope s = tracer.withSpan(span)) {
if (useMultiplexedSessionBlindWrite && getMultiplexedSessionDatabaseClient() != null) {
return getMultiplexedSessionDatabaseClient()
Expand Down Expand Up @@ -260,7 +260,7 @@ int getNthRequest() {
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
final Iterable<MutationGroup> mutationGroups, final TransactionOption... options)
throws SpannerException {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, databaseAttributes, options);
try (IScope s = tracer.withSpan(span)) {
if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) {
return getMultiplexedSessionDatabaseClient().batchWriteAtLeastOnce(mutationGroups, options);
Expand All @@ -278,7 +278,7 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(

@Override
public ReadContext singleUse() {
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes);
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, databaseAttributes);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSession().singleUse();
} catch (RuntimeException e) {
Expand All @@ -290,7 +290,7 @@ public ReadContext singleUse() {

@Override
public ReadContext singleUse(TimestampBound bound) {
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes);
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, databaseAttributes);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSession().singleUse(bound);
} catch (RuntimeException e) {
Expand All @@ -302,7 +302,7 @@ public ReadContext singleUse(TimestampBound bound) {

@Override
public ReadOnlyTransaction singleUseReadOnlyTransaction() {
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes);
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, databaseAttributes);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSession().singleUseReadOnlyTransaction();
} catch (RuntimeException e) {
Expand All @@ -314,7 +314,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction() {

@Override
public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes);
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, databaseAttributes);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSession().singleUseReadOnlyTransaction(bound);
} catch (RuntimeException e) {
Expand All @@ -326,7 +326,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {

@Override
public ReadOnlyTransaction readOnlyTransaction() {
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes);
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, databaseAttributes);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSession().readOnlyTransaction();
} catch (RuntimeException e) {
Expand All @@ -338,7 +338,7 @@ public ReadOnlyTransaction readOnlyTransaction() {

@Override
public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes);
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, databaseAttributes);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSession().readOnlyTransaction(bound);
} catch (RuntimeException e) {
Expand All @@ -350,7 +350,7 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {

@Override
public TransactionRunner readWriteTransaction(TransactionOption... options) {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, databaseAttributes, options);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSessionForRW().readWriteTransaction(options);
} catch (RuntimeException e) {
Expand All @@ -362,7 +362,7 @@ public TransactionRunner readWriteTransaction(TransactionOption... options) {

@Override
public TransactionManager transactionManager(TransactionOption... options) {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, databaseAttributes, options);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSessionForRW().transactionManager(options);
} catch (RuntimeException e) {
Expand All @@ -374,7 +374,7 @@ public TransactionManager transactionManager(TransactionOption... options) {

@Override
public AsyncRunner runAsync(TransactionOption... options) {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, databaseAttributes, options);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSessionForRW().runAsync(options);
} catch (RuntimeException e) {
Expand All @@ -386,7 +386,7 @@ public AsyncRunner runAsync(TransactionOption... options) {

@Override
public AsyncTransactionManager transactionManagerAsync(TransactionOption... options) {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, databaseAttributes, options);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSessionForRW().transactionManagerAsync(options);
} catch (RuntimeException e) {
Expand Down Expand Up @@ -449,7 +449,7 @@ private TransactionOption[] withReqId(

private long executePartitionedUpdateWithPooledSession(
final Statement stmt, final UpdateOption... options) {
ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION, commonAttributes);
ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION, databaseAttributes);
try (IScope s = tracer.withSpan(span)) {
return runWithSessionRetry(
(session, reqId) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void run() {
List<SessionImpl> sessions;
int remainingSessionsToCreate = sessionCount;
ISpan span =
spanner.getTracer().spanBuilder(SpannerImpl.BATCH_CREATE_SESSIONS, commonAttributes);
spanner.getTracer().spanBuilder(SpannerImpl.BATCH_CREATE_SESSIONS, databaseAttributes);
try (IScope s = spanner.getTracer().withSpan(span)) {
spanner
.getTracer()
Expand Down Expand Up @@ -185,7 +185,7 @@ interface SessionConsumer {
private final ExecutorFactory<ScheduledExecutorService> executorFactory;
private final ScheduledExecutorService executor;
private final DatabaseId db;
private final Attributes commonAttributes;
private final Attributes databaseAttributes;

// SessionClient is created long before a DatabaseClientImpl is created,
// as batch sessions are firstly created then later attached to each Client.
Expand All @@ -204,7 +204,7 @@ interface SessionConsumer {
this.db = db;
this.executorFactory = executorFactory;
this.executor = executorFactory.get();
this.commonAttributes = spanner.getTracer().createCommonAttributes(db);
this.databaseAttributes = spanner.getTracer().createDatabaseAttributes(db);
}

@Override
Expand Down Expand Up @@ -236,7 +236,8 @@ SessionImpl createSession() {
sessionChannelCounter++;
}
XGoogSpannerRequestId reqId = nextRequestId(channelId, 1);
ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_SESSION, this.commonAttributes);
ISpan span =
spanner.getTracer().spanBuilder(SpannerImpl.CREATE_SESSION, this.databaseAttributes);
try (IScope s = spanner.getTracer().withSpan(span)) {
com.google.spanner.v1.Session session =
spanner
Expand Down Expand Up @@ -289,7 +290,7 @@ SessionImpl createMultiplexedSession() {
ISpan span =
spanner
.getTracer()
.spanBuilder(SpannerImpl.CREATE_MULTIPLEXED_SESSION, this.commonAttributes);
.spanBuilder(SpannerImpl.CREATE_MULTIPLEXED_SESSION, this.databaseAttributes);
// MultiplexedSession doesn't use a channelId hence this hard-coded value.
int channelId = 0;
XGoogSpannerRequestId reqId = nextRequestId(channelId, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ public DatabaseClient getDatabaseClient(DatabaseId db) {
multiplexedSessionDatabaseClient,
getOptions().getSessionPoolOptions().getUseMultiplexedSessionPartitionedOps(),
useMultiplexedSessionForRW,
this.tracer.createCommonAttributes(db));
this.tracer.createDatabaseAttributes(db));
dbClients.put(db, dbClient);
return dbClient;
}
Expand All @@ -375,7 +375,7 @@ DatabaseClientImpl createDatabaseClient(
@Nullable MultiplexedSessionDatabaseClient multiplexedSessionClient,
boolean useMultiplexedSessionPartitionedOps,
boolean useMultiplexedSessionForRW,
Attributes commonAttributes) {
Attributes databaseAttributes) {
if (multiplexedSessionClient != null) {
// Set the session pool in the multiplexed session client.
// This is required to handle fallback to regular sessions for in-progress transactions that
Expand All @@ -390,7 +390,7 @@ DatabaseClientImpl createDatabaseClient(
useMultiplexedSessionPartitionedOps,
tracer,
useMultiplexedSessionForRW,
commonAttributes);
databaseAttributes);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class TraceWrapper {
private final Tracer openCensusTracer;
private final io.opentelemetry.api.trace.Tracer openTelemetryTracer;
private final boolean enableExtendedTracing;
private final AttributesBuilder commonAttributesBuilder;

TraceWrapper(
Tracer openCensusTracer,
Expand All @@ -68,20 +69,24 @@ class TraceWrapper {
this.openTelemetryTracer = openTelemetryTracer;
this.openCensusTracer = openCensusTracer;
this.enableExtendedTracing = enableExtendedTracing;
this.commonAttributesBuilder = createCommonAttributes();
}

ISpan spanBuilder(String spanName) {
return spanBuilder(spanName, Attributes.empty());
}

ISpan spanBuilder(String spanName, Attributes commonAttributes, TransactionOption... options) {
return spanBuilder(spanName, createTransactionAttributes(commonAttributes, options));
ISpan spanBuilder(String spanName, Attributes attributes, TransactionOption... options) {
return spanBuilder(spanName, createTransactionAttributes(attributes, options));
}

ISpan spanBuilder(String spanName, Attributes attributes) {
if (SpannerOptions.getActiveTracingFramework().equals(TracingFramework.OPEN_TELEMETRY)) {
return new OpenTelemetrySpan(
openTelemetryTracer.spanBuilder(spanName).setAllAttributes(attributes).startSpan());
openTelemetryTracer
.spanBuilder(spanName)
.setAllAttributes(this.commonAttributesBuilder.putAll(attributes).build())
.startSpan());
} else {
return new OpenCensusSpan(openCensusTracer.spanBuilder(spanName).startSpan());
}
Expand Down Expand Up @@ -209,15 +214,20 @@ Attributes createTableAttributes(String tableName, Options options) {
return builder.build();
}

Attributes createCommonAttributes(DatabaseId db) {
Attributes createDatabaseAttributes(DatabaseId db) {
AttributesBuilder builder = Attributes.builder();
builder.put(DB_NAME_KEY, db.getDatabase());
builder.put(INSTANCE_NAME_KEY, db.getInstanceId().getInstance());
return builder.build();
}

private AttributesBuilder createCommonAttributes() {
AttributesBuilder builder = Attributes.builder();
builder.put(GCP_CLIENT_SERVICE_KEY, "spanner");
builder.put(GCP_CLIENT_REPO_KEY, "googleapis/java-spanner");
builder.put(GCP_CLIENT_VERSION_KEY, GaxProperties.getLibraryVersion(TraceWrapper.class));
builder.put(CLOUD_REGION_KEY, BuiltInMetricsProvider.detectClientLocation());
return builder.build();
return builder;
}

private static String getTraceThreadName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class XGoogSpannerRequestId {
@VisibleForTesting
static final String RAND_PROCESS_ID = XGoogSpannerRequestId.generateRandProcessId();

static String REQUEST_ID = "x-goog-spanner-request-id";
public static String REQUEST_ID = "x-goog-spanner-request-id";
public static final Metadata.Key<String> REQUEST_HEADER_KEY =
Metadata.Key.of(REQUEST_ID, Metadata.ASCII_STRING_MARSHALLER);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,17 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
Span span = Span.current();
DatabaseName databaseName = extractDatabaseName(headers);
String key = extractKey(databaseName, method.getFullMethodName());
String requestId = extractRequestId(headers);
TagContext tagContext = getTagContext(key, method.getFullMethodName(), databaseName);
Attributes attributes =
getMetricAttributes(key, method.getFullMethodName(), databaseName);
Map<String, String> builtInMetricsAttributes =
getBuiltInMetricAttributes(key, databaseName);
builtInMetricsAttributes.put(
BuiltInMetricsConstant.REQUEST_ID_KEY.getKey(), extractRequestId(headers));
builtInMetricsAttributes.put(BuiltInMetricsConstant.REQUEST_ID_KEY.getKey(), requestId);
addBuiltInMetricAttributes(compositeTracer, builtInMetricsAttributes);
if (span != null) {
span.setAttribute(XGoogSpannerRequestId.REQUEST_ID, requestId);
}
super.start(
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
Expand Down
Loading