Skip to content

Commit cfa92fb

Browse files
fix: Otlp Trace Exporters to Event Exporter (#70)
Move Traces to LD buffering Proof of testing: <img width="733" height="468" alt="image" src="https://github.com/user-attachments/assets/7e53fdde-f074-4534-beba-d6c9cc130901" /> <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Route trace exporting through the LD event queue with a new OTLP trace event exporter and processor, removing legacy sampling/stdout exporters. > > - **Traces**: > - Switch span exporting to LD event-queue pipeline: add `EventSpanProcessor`, `OtlpTraceEventExporter`, and `SpanItem`. > - Update `TracerDecorator` to use sampler + `EventQueue` (removes `BatchSpanProcessor`/direct `SpanExporter` usage) and provide start-with-time helper via `Tracer` extension. > - Register trace event exporter in `ObservabilityClientFactory` via `BatchWorker`. > - Remove legacy exporters: `SamplingTraceExporter.swift`, `SamplingLogExporter.swift`, `LDStdoutExporter.swift`. > - **Metrics**: > - Make `OtlpMetricEventExporter` and `OtlpMetricScheduleExporter` internal and encapsulate properties (no functional change). > - **TestApp**: > - Enable `.urlSession` in `autoInstrumentation` config. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 14ec46a. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent f186b29 commit cfa92fb

12 files changed

+139
-170
lines changed

Sources/Observability/Client/ObservabilityClientFactory.swift

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,14 @@ public struct ObservabilityClientFactory {
7474
}
7575

7676
if options.traces == .enabled {
77-
let tracesExporter = SamplingTraceExporterDecorator(
78-
exporter: OtlpHttpTraceExporter(
79-
endpoint: url,
80-
config: .init(headers: options.customHeaders.map({ ($0.key, $0.value) }))
81-
),
82-
sampler: sampler
77+
let traceEventExporter = OtlpTraceEventExporter(
78+
endpoint: url,
79+
config: .init(headers: options.customHeaders.map({ ($0.key, $0.value) }))
8380
)
84-
let decorator = TracerDecorator(options: options, sessionManager: sessionManager, exporter: tracesExporter)
81+
Task {
82+
await batchWorker.addExporter(traceEventExporter)
83+
}
84+
let decorator = TracerDecorator(options: options, sessionManager: sessionManager, sampler: sampler, eventQueue: eventQueue)
8585
/// tracer is enabled
8686
if options.autoInstrumentation.contains(.urlSession) {
8787
autoInstrumentation.append(NetworkInstrumentationManager(options: options, tracer: decorator, session: sessionManager))

Sources/Observability/Exporters/LDStdoutExporter.swift

Lines changed: 0 additions & 42 deletions
This file was deleted.

Sources/Observability/Exporters/SamplingLogExporter.swift

Lines changed: 0 additions & 39 deletions
This file was deleted.

Sources/Observability/Exporters/SamplingTraceExporter.swift

Lines changed: 0 additions & 36 deletions
This file was deleted.

Sources/Observability/Metrics/OtlpMetricEventExporter.swift

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,33 +3,33 @@ import Common
33
import Foundation
44
import OpenTelemetryProtocolExporterCommon
55

6-
public final class OtlpMetricEventExporter: EventExporting {
7-
let otlpHttpClient: OtlpHttpClient
6+
final class OtlpMetricEventExporter: EventExporting {
7+
private let otlpHttpClient: OtlpHttpClient
88

9-
public init(endpoint: URL,
10-
config: OtlpConfiguration = OtlpConfiguration(),
11-
useSession: URLSession? = nil,
12-
envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes) {
9+
init(endpoint: URL,
10+
config: OtlpConfiguration = OtlpConfiguration(),
11+
useSession: URLSession? = nil,
12+
envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes) {
1313
self.otlpHttpClient = OtlpHttpClient(endpoint: endpoint,
1414
config: config,
1515
useSession: useSession,
1616
envVarHeaders: envVarHeaders)
1717
}
18-
19-
public func export(items: [EventQueueItem]) async throws {
18+
19+
func export(items: [EventQueueItem]) async throws {
2020
let metricDatas: [OpenTelemetrySdk.MetricData] = items.compactMap { item in
2121
(item.payload as? MetricItem)?.metricData
2222
}
2323
guard metricDatas.isNotEmpty else {
2424
return
2525
}
26+
2627
try await export(metricDatas: metricDatas)
2728
}
2829

2930
private func export(metricDatas: [MetricData],
3031
explicitTimeout: TimeInterval? = nil) async throws {
31-
let body =
32-
Opentelemetry_Proto_Collector_Metrics_V1_ExportMetricsServiceRequest.with { request in
32+
let body = Opentelemetry_Proto_Collector_Metrics_V1_ExportMetricsServiceRequest.with { request in
3333
request.resourceMetrics = MetricsAdapter.toProtoResourceMetrics(
3434
metricData: metricDatas)
3535
}

Sources/Observability/Metrics/OtlpMetricScheduleExporter.swift

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ import OpenTelemetryApi
33
import OpenTelemetrySdk
44

55
final class OtlpMetricScheduleExporter: MetricExporter {
6-
let eventQueue: EventQueue
7-
let aggregationTemporalitySelector: AggregationTemporalitySelector
8-
let defaultAggregationSelector: DefaultAggregationSelector
6+
private let eventQueue: EventQueue
7+
private let aggregationTemporalitySelector: AggregationTemporalitySelector
8+
private let defaultAggregationSelector: DefaultAggregationSelector
99

1010
init(eventQueue: EventQueue,
1111
aggregationTemporalitySelector: AggregationTemporalitySelector = AggregationTemporality.alwaysCumulative(),
@@ -33,17 +33,17 @@ final class OtlpMetricScheduleExporter: MetricExporter {
3333
}
3434

3535
public func getAggregationTemporality(
36-
for instrument: OpenTelemetrySdk.InstrumentType
36+
for instrument: OpenTelemetrySdk.InstrumentType
3737
) -> OpenTelemetrySdk.AggregationTemporality {
38-
return aggregationTemporalitySelector.getAggregationTemporality(
39-
for: instrument)
38+
return aggregationTemporalitySelector.getAggregationTemporality(
39+
for: instrument)
4040
}
41-
41+
4242
// MARK: - DefaultAggregationSelector
43-
43+
4444
public func getDefaultAggregation(
45-
for instrument: OpenTelemetrySdk.InstrumentType
45+
for instrument: OpenTelemetrySdk.InstrumentType
4646
) -> OpenTelemetrySdk.Aggregation {
47-
return defaultAggregationSelector.getDefaultAggregation(for: instrument)
47+
return defaultAggregationSelector.getDefaultAggregation(for: instrument)
4848
}
4949
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import Foundation
2+
import OpenTelemetryApi
3+
import OpenTelemetrySdk
4+
5+
final class EventSpanProcessor: SpanProcessor {
6+
private let eventQueue: EventQueue
7+
private let sampler: ExportSampler
8+
let isStartRequired = false
9+
let isEndRequired = true
10+
11+
init(eventQueue: EventQueue, sampler: ExportSampler) {
12+
self.eventQueue = eventQueue
13+
self.sampler = sampler
14+
}
15+
16+
func onStart(parentContext: OpenTelemetryApi.SpanContext?, span: any OpenTelemetrySdk.ReadableSpan) {
17+
// No-op
18+
}
19+
20+
func onEnd(span: any OpenTelemetrySdk.ReadableSpan) {
21+
if !span.context.traceFlags.sampled {
22+
return
23+
}
24+
25+
let spanData = span.toSpanData()
26+
let sampledItems = sampler.sampleSpans(items: [spanData])
27+
guard !sampledItems.isEmpty else {
28+
return
29+
}
30+
31+
Task {
32+
await eventQueue.send(SpanItem(spanData: spanData))
33+
}
34+
}
35+
36+
func shutdown(explicitTimeout: TimeInterval?) {
37+
// No-op
38+
}
39+
40+
func forceFlush(timeout: TimeInterval?) {
41+
// No-op
42+
}
43+
44+
45+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import OpenTelemetrySdk
2+
import Common
3+
import Foundation
4+
import OpenTelemetryProtocolExporterCommon
5+
6+
final class OtlpTraceEventExporter: EventExporting {
7+
private let otlpHttpClient: OtlpHttpClient
8+
9+
init(endpoint: URL,
10+
config: OtlpConfiguration = OtlpConfiguration(),
11+
useSession: URLSession? = nil,
12+
envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes) {
13+
self.otlpHttpClient = OtlpHttpClient(endpoint: endpoint,
14+
config: config,
15+
useSession: useSession,
16+
envVarHeaders: envVarHeaders)
17+
}
18+
19+
func export(items: [EventQueueItem]) async throws {
20+
let spanDatas: [OpenTelemetrySdk.SpanData] = items.compactMap { item in
21+
(item.payload as? SpanItem)?.spanData
22+
}
23+
guard spanDatas.isNotEmpty else {
24+
return
25+
}
26+
try await export(spanDatas: spanDatas)
27+
}
28+
29+
private func export(spanDatas: [OpenTelemetrySdk.SpanData],
30+
explicitTimeout: TimeInterval? = nil) async throws {
31+
let body =
32+
Opentelemetry_Proto_Collector_Trace_V1_ExportTraceServiceRequest.with {
33+
$0.resourceSpans = SpanAdapter.toProtoResourceSpans(
34+
spanDataList: spanDatas)
35+
}
36+
37+
try await otlpHttpClient.send(body: body, explicitTimeout: explicitTimeout)
38+
}
39+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import Foundation
2+
import OpenTelemetrySdk
3+
4+
struct SpanItem: EventQueueItemPayload {
5+
var exporterClass: AnyClass {
6+
Observability.OtlpTraceEventExporter.self
7+
}
8+
9+
let spanData: SpanData
10+
var timestamp: TimeInterval
11+
12+
init(spanData: SpanData) {
13+
self.spanData = spanData
14+
self.timestamp = spanData.endTime.timeIntervalSince1970
15+
}
16+
17+
func cost() -> Int {
18+
300 + spanData.events.count * 100 + spanData.attributes.count * 100
19+
}
20+
}

0 commit comments

Comments
 (0)