Skip to content

Commit 6a8a8a0

Browse files
committed
Spans for abort transactions
1 parent 02c4c1f commit 6a8a8a0

File tree

3 files changed

+112
-2
lines changed

3 files changed

+112
-2
lines changed

observability-test/spanner.ts

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,11 @@ describe('EndToEnd', async () => {
142142
tracerProvider: tracerProvider,
143143
enableExtendedTracing: false,
144144
});
145+
let dbCounter = 1;
146+
147+
function newTestDatabase(): Database {
148+
return instance.database(`database-${dbCounter++}`,);
149+
}
145150

146151
const server = setupResult.server;
147152
const spannerMock = setupResult.spannerMock;
@@ -305,6 +310,7 @@ describe('EndToEnd', async () => {
305310
});
306311

307312
it('runTransactionAsync', async () => {
313+
308314
await database.runTransactionAsync(async transaction => {
309315
await transaction!.run('SELECT 1');
310316
});
@@ -327,6 +333,105 @@ describe('EndToEnd', async () => {
327333
);
328334
});
329335

336+
it.only('runTransaction with abort', done => {
337+
let attempts = 0;
338+
let rowCount = 0;
339+
const database = newTestDatabase();
340+
database.runTransaction(async (err, transaction) => {
341+
assert.ifError(err);
342+
if (!attempts) {
343+
spannerMock.abortTransaction(transaction!);
344+
}
345+
attempts++;
346+
transaction!.run(selectSql, (err, rows) => {
347+
assert.ifError(err);
348+
rows.forEach(() => rowCount++);
349+
transaction!
350+
.commit()
351+
.catch(done)
352+
.then(async () => {
353+
const expectedSpanNames = [
354+
'CloudSpanner.Database.batchCreateSessions',
355+
'CloudSpanner.SessionPool.createSessions',
356+
'CloudSpanner.Snapshot.runStream',
357+
'CloudSpanner.Snapshot.run',
358+
'CloudSpanner.Snapshot.runStream',
359+
'CloudSpanner.Snapshot.run',
360+
'CloudSpanner.Transaction.commit',
361+
'CloudSpanner.Snapshot.begin',
362+
'CloudSpanner.Database.runTransaction',
363+
];
364+
const expectedEventNames = [
365+
...waitingSessionsEvents,
366+
'Retrying Transaction',
367+
'Starting stream',
368+
'exception',
369+
'Stream broken. Not safe to retry',
370+
'Begin Transaction',
371+
'Transaction Creation Done',
372+
'Starting stream',
373+
'Starting Commit',
374+
'Commit Done',
375+
];
376+
await verifySpansAndEvents(traceExporter, expectedSpanNames, expectedEventNames)
377+
database
378+
.close()
379+
.catch(done)
380+
.then(() => done());
381+
});
382+
});
383+
});
384+
});
385+
386+
it('runTransactionAsync with abort', async () => {
387+
let attempts = 0;
388+
const database = newTestDatabase();
389+
await database.runTransactionAsync((transaction): Promise<number> => {
390+
if (!attempts) {
391+
spannerMock.abortTransaction(transaction);
392+
}
393+
attempts++;
394+
return transaction.run(selectSql).then(([rows]) => {
395+
let count = 0;
396+
rows.forEach(() => count++);
397+
return transaction.commit().then(() => count);
398+
});
399+
});
400+
assert.strictEqual(attempts, 2);
401+
const expectedSpanNames = [
402+
'CloudSpanner.Database.batchCreateSessions',
403+
'CloudSpanner.SessionPool.createSessions',
404+
'CloudSpanner.Snapshot.runStream',
405+
'CloudSpanner.Snapshot.run',
406+
'CloudSpanner.Snapshot.begin',
407+
'CloudSpanner.Snapshot.runStream',
408+
'CloudSpanner.Snapshot.run',
409+
'CloudSpanner.Transaction.commit',
410+
'CloudSpanner.Database.runTransactionAsync',
411+
];
412+
const expectedEventNames = [
413+
'Requesting 25 sessions',
414+
'Creating 25 sessions',
415+
'Requested for 25 sessions returned 25',
416+
'Starting stream',
417+
'exception',
418+
'Stream broken. Not safe to retry',
419+
'Begin Transaction',
420+
'Transaction Creation Done',
421+
'Starting stream',
422+
'Starting Commit',
423+
'Commit Done',
424+
...waitingSessionsEvents,
425+
'Retrying transaction',
426+
];
427+
await verifySpansAndEvents(
428+
traceExporter,
429+
expectedSpanNames,
430+
expectedEventNames
431+
);
432+
await database.close();
433+
});
434+
330435
it('writeAtLeastOnce', done => {
331436
const blankMutations = new MutationSet();
332437
database.writeAtLeastOnce(blankMutations, async (err, response) => {

src/partial-result-stream.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import {Readable, Transform} from 'stream';
2424
import * as streamEvents from 'stream-events';
2525
import {grpc, CallOptions} from 'google-gax';
2626
import {DeadlineError, isRetryableInternalError} from './transaction-runner';
27-
27+
import {getActiveOrNoopSpan, setSpanErrorAndException} from './instrument';
2828
import {codec, JSONOptions, Json, Field, Value} from './codec';
2929
import {google} from '../protos/protos';
3030
import * as stream from 'stream';
@@ -494,6 +494,7 @@ export function partialResultStream(
494494
let lastRequestStream: Readable;
495495
const startTime = Date.now();
496496
const timeout = options?.gaxOptions?.timeout ?? Infinity;
497+
const span = getActiveOrNoopSpan();
497498

498499
// mergeStream allows multiple streams to be connected into one. This is good;
499500
// if we need to retry a request and pipe more data to the user's stream.
@@ -568,6 +569,7 @@ export function partialResultStream(
568569
// checkpoint stream has queued. After that, we will destroy the
569570
// user's stream with the same error.
570571
setImmediate(() => batchAndSplitOnTokenStream.destroy(err));
572+
setSpanErrorAndException(span, err as Error);
571573
return;
572574
}
573575

src/transaction-runner.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import {Session} from './session';
2323
import {Transaction} from './transaction';
2424
import {NormalCallback} from './common';
2525
import {isSessionNotFoundError} from './session-pool';
26+
import {getActiveOrNoopSpan} from './instrument';
2627
import {Database} from './database';
2728
import {google} from '../protos/protos';
2829
import IRequestOptions = google.spanner.v1.IRequestOptions;
@@ -238,6 +239,7 @@ export abstract class Runner<T> {
238239
this.session.lastError = e as grpc.ServiceError;
239240
lastError = e as grpc.ServiceError;
240241
}
242+
const span = getActiveOrNoopSpan();
241243

242244
// Note that if the error is a 'Session not found' error, it will be
243245
// thrown here. We do this to bubble this error up to the caller who is
@@ -250,7 +252,7 @@ export abstract class Runner<T> {
250252
}
251253

252254
this.attempts += 1;
253-
255+
span.addEvent('Retrying transaction');
254256
const delay = this.getNextDelay(lastError);
255257
await new Promise(resolve => setTimeout(resolve, delay));
256258
}
@@ -321,6 +323,7 @@ export class TransactionRunner extends Runner<void> {
321323
}
322324

323325
stream.unpipe(proxyStream);
326+
// proxyStream.emit('error', err);
324327
reject(err);
325328
})
326329
.pipe(proxyStream);

0 commit comments

Comments
 (0)