Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sdk-trace-base): always wait on pending export in SimpleSpanProcessor #5303

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ For semantic convention package changes, see the [semconv CHANGELOG](packages/se
* refactor(sdk-trace-base)!: remove `new Span` constructor in favor of `Tracer.startSpan` API [#5048](https://github.com/open-telemetry/opentelemetry-js/pull/5048) @david-luna
* refactor(sdk-trace-base)!: remove `BasicTracerProvider.addSpanProcessor` API in favor of constructor options. [#5134](https://github.com/open-telemetry/opentelemetry-js/pull/5134) @david-luna
* refactor(sdk-trace-base)!: make `resource` property private in `BasicTracerProvider` and remove `getActiveSpanProcessor` API. [#5192](https://github.com/open-telemetry/opentelemetry-js/pull/5192) @david-luna
* fix(sdk-trace-base): always wait on pending export in SimpleSpanProcessor. [#5303](https://github.com/open-telemetry/opentelemetry-js/pull/5303) @anuraaga
* feat(sdk-metrics)!: extract `IMetricReader` interface and use it over abstract class [#5311](https://github.com/open-telemetry/opentelemetry-js/pull/5311)
* (user-facing): `MeterProviderOptions` now provides the more general `IMetricReader` type over `MetricReader`
* If you accept `MetricReader` in your public interface, consider accepting the more general `IMetricReader` instead to avoid unintentional breaking changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
ExportResultCode,
globalErrorHandler,
BindOnceFuture,
ExportResult,
} from '@opentelemetry/core';
import { Span } from '../Span';
import { SpanProcessor } from '../SpanProcessor';
Expand All @@ -38,16 +37,16 @@
*/
export class SimpleSpanProcessor implements SpanProcessor {
private _shutdownOnce: BindOnceFuture<void>;
private _unresolvedExports: Set<Promise<void>>;
private _pendingExports: Set<Promise<unknown>>;

constructor(private readonly _exporter: SpanExporter) {
this._shutdownOnce = new BindOnceFuture(this._shutdown, this);
this._unresolvedExports = new Set<Promise<void>>();
this._pendingExports = new Set<Promise<void>>();
}

async forceFlush(): Promise<void> {
// await unresolved resources before resolving
await Promise.all(Array.from(this._unresolvedExports));
// await pending exports
await Promise.all(Array.from(this._pendingExports));
if (this._exporter.forceFlush) {
await this._exporter.forceFlush();
}
Expand All @@ -64,43 +63,26 @@
return;
}

const doExport = () =>
internal
._export(this._exporter, [span])
.then((result: ExportResult) => {
if (result.code !== ExportResultCode.SUCCESS) {
globalErrorHandler(
result.error ??
new Error(
`SimpleSpanProcessor: span export failed (status ${result})`
)
);
}
})
.catch(error => {
globalErrorHandler(error);
});
const pendingExport = this.doExport(span).catch(err =>
globalErrorHandler(err)
);
// Enqueue this export to the pending list so it can be flushed by the user.
this._pendingExports.add(pendingExport);
pendingExport.finally(() => this._pendingExports.delete(pendingExport));
}

// Avoid scheduling a promise to make the behavior more predictable and easier to test
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this since it seemed to be misleading - a promise is scheduled by doExport anyways so there's no way to avoid scheduling one

private async doExport(span: ReadableSpan): Promise<void> {
if (span.resource.asyncAttributesPending) {
const exportPromise = (span.resource as Resource)
.waitForAsyncAttributes?.()
.then(
() => {
if (exportPromise != null) {
this._unresolvedExports.delete(exportPromise);
}
return doExport();
},
err => globalErrorHandler(err)
);
// Ensure resource is fully resolved before exporting.
await (span.resource as Resource).waitForAsyncAttributes?.();
}

// store the unresolved exports
if (exportPromise != null) {
this._unresolvedExports.add(exportPromise);
}
} else {
void doExport();
const result = await internal._export(this._exporter, [span]);
if (result.code !== ExportResultCode.SUCCESS) {
throw (
result.error ??
new Error(`SimpleSpanProcessor: span export failed (status ${result})`)

Check warning on line 84 in packages/opentelemetry-sdk-trace-base/src/export/SimpleSpanProcessor.ts

View check run for this annotation

Codecov / codecov/patch

packages/opentelemetry-sdk-trace-base/src/export/SimpleSpanProcessor.ts#L84

Added line #L84 was not covered by tests
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,40 @@ describe('SimpleSpanProcessor', () => {
);
});

it('should await doExport() and delete from _unresolvedExports', async () => {
it('should await doExport() and delete from _pendingExports', async () => {
const testExporterWithDelay = new TestExporterWithDelay();
const processor = new SimpleSpanProcessor(testExporterWithDelay);
const spanContext: SpanContext = {
traceId: 'a3cda95b652f4a1592b449d5929fda1b',
spanId: '5e0c63257de34c92',
traceFlags: TraceFlags.SAMPLED,
};
const tracer = provider.getTracer('default');
const span = new SpanImpl({
scope: tracer.instrumentationScope,
resource: tracer['_resource'],
context: ROOT_CONTEXT,
spanContext,
name: 'span-name',
kind: SpanKind.CLIENT,
spanLimits: tracer.getSpanLimits(),
spanProcessor: tracer['_spanProcessor'],
});
processor.onStart(span, ROOT_CONTEXT);
processor.onEnd(span);

assert.strictEqual(processor['_pendingExports'].size, 1);

await processor.forceFlush();

assert.strictEqual(processor['_pendingExports'].size, 0);

const exportedSpans = testExporterWithDelay.getFinishedSpans();

assert.strictEqual(exportedSpans.length, 1);
});

it('should await doExport() and delete from _pendingExports with async resource', async () => {
const testExporterWithDelay = new TestExporterWithDelay();
const processor = new SimpleSpanProcessor(testExporterWithDelay);

Expand Down Expand Up @@ -247,11 +280,11 @@ describe('SimpleSpanProcessor', () => {
processor.onStart(span, ROOT_CONTEXT);
processor.onEnd(span);

assert.strictEqual(processor['_unresolvedExports'].size, 1);
assert.strictEqual(processor['_pendingExports'].size, 1);

await processor.forceFlush();

assert.strictEqual(processor['_unresolvedExports'].size, 0);
assert.strictEqual(processor['_pendingExports'].size, 0);

const exportedSpans = testExporterWithDelay.getFinishedSpans();

Expand Down
Loading