Skip to content

feat: span fixes #2331

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 103 additions & 14 deletions samples/observability-traces.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,81 @@
) {
// [START spanner_opentelemetry_traces_cloudtrace_usage]

const {NodeTracerProvider} = require('@opentelemetry/sdk-trace-node');

Check failure on line 30 in samples/observability-traces.js

View workflow job for this annotation

GitHub Actions / lint

'NodeTracerProvider' is assigned a value but never used
const {NodeSDK} = require('@opentelemetry/sdk-node');
// const {
// TraceExporter,
// } = require('@google-cloud/opentelemetry-cloud-trace-exporter');
const {
TraceExporter,
} = require('@google-cloud/opentelemetry-cloud-trace-exporter');
const {
ConsoleSpanExporter,
BatchSpanProcessor,
TraceIdRatioBasedSampler,
} = require('@opentelemetry/sdk-trace-base');
const {Spanner} = require('@google-cloud/spanner');
const {diag, DiagConsoleLogger, DiagLogLevel} = require('@opentelemetry/api');
const {Spanner} = require('../build/src');
const {
getNodeAutoInstrumentations,
} = require('@opentelemetry/auto-instrumentations-node');

diag.setLogger(new DiagConsoleLogger(), DiagLogLevel['ALL']);

const traceExporter = new TraceExporter();
const traceExporter = new ConsoleSpanExporter();

// Create a provider with a custom sampler
const provider = new NodeTracerProvider({
sampler: new TraceIdRatioBasedSampler(1.0), // Sample 100% of traces
spanProcessors: [new BatchSpanProcessor(traceExporter)],
});
// const provider = new NodeTracerProvider({
// sampler: new TraceIdRatioBasedSampler(1.0), // Sample 100% of traces
// spanProcessors: [new BatchSpanProcessor(traceExporter)],
// });

// Uncomment following line to register tracerProvider globally or pass it in Spanner object
// provider.register();

// Set global propagator to propogate the trace context for end to end tracing.
const {propagation} = require('@opentelemetry/api');
const {W3CTraceContextPropagator} = require('@opentelemetry/core');
propagation.setGlobalPropagator(new W3CTraceContextPropagator());
// const {propagation} = require('@opentelemetry/api');
const {
W3CTraceContextPropagator,
CompositePropagator,
} = require('@opentelemetry/core');
// propagation.setGlobalPropagator(new W3CTraceContextPropagator());

const sdk = new NodeSDK({
// resource: detectedResourceAttributes.merge(resource),
traceExporter: traceExporter,
batchSpanProcessor: [new BatchSpanProcessor(traceExporter)],
sampler: new TraceIdRatioBasedSampler(1.0), // Sample 100% of traces
instrumentations: [
getNodeAutoInstrumentations({
// Disable file system instrumentation to reduce noise
'@opentelemetry/instrumentation-fs': {
enabled: false,
},
'@opentelemetry/instrumentation-net': {
enabled: false,
},
'@opentelemetry/instrumentation-dns': {
enabled: false,
},
}),
],
textMapPropagator: new CompositePropagator({
propagators: [new W3CTraceContextPropagator()],
}),
});

// Initialize the SDK
sdk.start();
process.on('SIGTERM', () => {
sdk
.shutdown()
.then(() => console.log('Tracing terminated'))
.catch(error => console.log('Error terminating tracing', error))
.finally(() => process.exit(0));

Check failure on line 98 in samples/observability-traces.js

View workflow job for this annotation

GitHub Actions / lint

Don't use process.exit(); throw an error instead
});

const spanner = new Spanner({
projectId: projectId,
observabilityOptions: {
tracerProvider: provider,
// tracerProvider: provider,
// Enable extended tracing to allow your SQL statements to be annotated.
enableExtendedTracing: true,
// Enable end to end tracing.
Expand All @@ -78,10 +123,54 @@
console.log(`Query: ${rows.length} found.`);
rows.forEach(row => console.log(row));
} finally {
spanner.close();
// spanner.close();
}

database.getSnapshot(async (err, transaction) => {
if (err) {
console.error(err);
return;
}
const queryOne = 'SELECT SingerId, AlbumId, AlbumTitle FROM Albums';

try {
// Read #1, using SQL
const [qOneRows] = await transaction.run(queryOne);

qOneRows.forEach(row => {
const json = row.toJSON();
console.log(
`SingerId: ${json.SingerId}, AlbumId: ${json.AlbumId}, AlbumTitle: ${json.AlbumTitle}`,
);
});

const queryTwo = {
columns: ['SingerId', 'AlbumId', 'AlbumTitle'],
};

// Read #2, using the `read` method. Even if changes occur
// in-between the reads, the transaction ensures that both
// return the same data.
const [qTwoRows] = await transaction.read('Albums', queryTwo);

qTwoRows.forEach(row => {
const json = row.toJSON();
console.log(
`SingerId: ${json.SingerId}, AlbumId: ${json.AlbumId}, AlbumTitle: ${json.AlbumTitle}`,
);
});

console.log('Successfully executed read-only transaction.');
} catch (err) {
console.error('ERROR:', err);
} finally {
transaction.end();
// Close the database when finished.
await database.close();
}
});

provider.forceFlush();

Check failure on line 173 in samples/observability-traces.js

View workflow job for this annotation

GitHub Actions / lint

'provider' is not defined

// This sleep gives ample time for the trace
// spans to be exported to Google Cloud Trace.
Expand Down
5 changes: 3 additions & 2 deletions samples/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@
"yargs": "^17.0.0"
},
"devDependencies": {
"@google-cloud/opentelemetry-cloud-trace-exporter": "^2.4.1",
"@opentelemetry/exporter-trace-otlp-grpc": "^0.57.0",
"@opentelemetry/instrumentation": "^0.57.0",
"@opentelemetry/instrumentation-grpc": "^0.57.0",
"@opentelemetry/resources": "1.30.1",
"@opentelemetry/auto-instrumentations-node": "^0.60.0",
"@opentelemetry/sdk-trace-base": "~1.30.1",
"@opentelemetry/sdk-trace-node": "^1.30.1",
"@opentelemetry/sdk-node": "^0.202.0",
"@opentelemetry/api": "^1.9.0",
"@opentelemetry/core": "^1.30.1",
"@opentelemetry/core": "^2.0.0",
"chai": "^4.2.0",
"mocha": "^9.0.0",
"p-limit": "^3.0.1"
Expand Down
35 changes: 22 additions & 13 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ import {
startTrace,
setSpanError,
setSpanErrorAndException,
endSpan,
traceConfig,
} from './instrument';
import {
Expand Down Expand Up @@ -1190,15 +1191,20 @@ class Database extends common.GrpcServiceObject {
* @param {Transaction} transaction The transaction to observe.
* @returns {Transaction}
*/
private _releaseOnEnd(session: Session, transaction: Snapshot, span: Span) {
private _releaseOnEnd(
session: Session,
transaction: Snapshot,
span: Span,
spanState?: {ended: boolean},
) {
transaction.once('end', () => {
try {
this.sessionFactory_.release(session);
} catch (e) {
setSpanErrorAndException(span, e as Error);
this.emit('error', e);
} finally {
span.end();
endSpan(span, spanState);
}
});
}
Expand Down Expand Up @@ -2176,10 +2182,11 @@ class Database extends common.GrpcServiceObject {
}

return startTrace('Database.getSnapshot', this._traceConfig, span => {
const spanState = {ended: false};
this.sessionFactory_.getSession((err, session) => {
if (err) {
setSpanError(span, err);
span.end();
endSpan(span, spanState);
callback!(err as ServiceError);
return;
}
Expand All @@ -2198,19 +2205,19 @@ class Database extends common.GrpcServiceObject {
});
session!.lastError = err;
this.sessionFactory_.release(session!);
span.end();
endSpan(span, spanState);
this.getSnapshot(options, callback!);
} else {
span.addEvent('Using Session', {'session.id': session?.id});
this.sessionFactory_.release(session!);
span.end();
endSpan(span, spanState);
callback!(err);
}
return;
}

this._releaseOnEnd(session!, snapshot, span);
span.end();
this._releaseOnEnd(session!, snapshot, span, spanState);
endSpan(span, spanState);
callback!(err, snapshot);
});
});
Expand Down Expand Up @@ -2283,6 +2290,7 @@ class Database extends common.GrpcServiceObject {
transactionTag: options.requestOptions?.transactionTag,
},
span => {
const spanState = {ended: false};
this.pool_.getSession((err, session, transaction) => {
if (!err) {
if (options.requestOptions) {
Expand All @@ -2296,11 +2304,11 @@ class Database extends common.GrpcServiceObject {
);
span.addEvent('Using Session', {'session.id': session?.id});
transaction!._observabilityOptions = this._observabilityOptions;
this._releaseOnEnd(session!, transaction!, span);
this._releaseOnEnd(session!, transaction!, span, spanState);
} else {
setSpanError(span, err);
}
span.end();
endSpan(span, spanState);
cb!(err as grpc.ServiceError | null, transaction);
});
},
Expand Down Expand Up @@ -3144,6 +3152,7 @@ class Database extends common.GrpcServiceObject {
options?: TimestampBounds,
): PartialResultStream {
const proxyStream: Transform = through.obj();
const spanState = {ended: false};
return startTrace(
'Database.runStream',
{
Expand All @@ -3156,15 +3165,15 @@ class Database extends common.GrpcServiceObject {
if (err) {
setSpanError(span, err);
proxyStream.destroy(err);
span.end();
endSpan(span, spanState);
return;
}

span.addEvent('Using Session', {'session.id': session?.id});

const snapshot = session!.snapshot(options, this.queryOptions_);

this._releaseOnEnd(session!, snapshot, span);
this._releaseOnEnd(session!, snapshot, span, spanState);

let dataReceived = false;
let dataStream = snapshot.runStream(query);
Expand Down Expand Up @@ -3196,7 +3205,7 @@ class Database extends common.GrpcServiceObject {
dataStream.removeListener('end', endListener);
dataStream.end();
snapshot.end();
span.end();
endSpan(span, spanState);
// Create a new data stream and add it to the end user stream.
dataStream = this.runStream(query, options);
dataStream.pipe(proxyStream);
Expand All @@ -3215,7 +3224,7 @@ class Database extends common.GrpcServiceObject {
if (err) {
setSpanError(span, err);
}
span.end();
endSpan(span, spanState);
});

return proxyStream as PartialResultStream;
Expand Down
17 changes: 16 additions & 1 deletion src/instrument.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,17 @@ export function setSpanErrorAndException(
return false;
}

export function endSpan(span: Span, spanState?: {ended: boolean}): void {
if (spanState) {
if (!spanState.ended) {
span.end();
spanState.ended = true;
}
} else {
span.end();
}
}

/**
* getActiveOrNoopSpan queries the global tracer for the currently active
* span and returns it, otherwise if there is no active span available, it'll
Expand All @@ -242,7 +253,11 @@ export function setSpanErrorAndException(
*/
export function getActiveOrNoopSpan(): Span {
const span = trace.getActiveSpan();
if (span) {
if (
span &&
(span as any).name?.startsWith('CloudSpanner') &&
(span as any).ended === false
) {
return span;
}
return new noopSpan();
Expand Down
Loading
Loading