Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,8 @@
],
"dependencies": {
"@abraham/reflection": "^0.13.0",
"@opentelemetry/api": "^1.9.1",
"@opentelemetry/api-logs": "^0.218.0",
"@peculiar/x509": "^2.0.0",
"@repo/shared": "workspace:*",
"axios": "^1.16.1",
Expand Down
27 changes: 24 additions & 3 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

82 changes: 82 additions & 0 deletions src/telemetry/export/metrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import type { TelemetryEvent } from "../event";

/** One measurement, classified for export. */
export interface MetricMeasurement {
readonly name: string;
readonly value: number;
readonly kind: "gauge" | "counter";
/** OTel/UCUM unit (e.g. "ms", "Mbit/s", "{request}", "1"). */
readonly unit: string;
}

/**
* Typed view of a metric event. `windowSeconds` is set on windowed events
* (`http.requests`) and absent on point-in-time samples; exporters use it to
* stamp gauge start times and anchor cumulative counters.
*/
export interface MetricDescriptor {
readonly windowSeconds?: number;
readonly measurements: readonly MetricMeasurement[];
}

// Single source of truth for which event names are metric series.
const METRIC_EVENT_NAMES: ReadonlySet<string> = new Set([
"http.requests",
"ssh.network.info",
"ssh.network.sampled",
]);

export function isMetricEvent(event: TelemetryEvent): boolean {
return METRIC_EVENT_NAMES.has(event.eventName);
}

/** Typed layout for a metric event, or `undefined` if it isn't a metric. */
export function describeMetricEvent(
event: TelemetryEvent,
): MetricDescriptor | undefined {
if (!isMetricEvent(event)) {
return undefined;
}
if (event.eventName === "http.requests") {
return describeHttpRequests(event);
}
return {
measurements: Object.entries(event.measurements).map(([name, value]) => ({
name,
value,
kind: "gauge",
unit: measurementUnit(name),
})),
};
}

// `window_seconds` is metadata, `count_*` are cumulative counters, the rest gauges.
function describeHttpRequests(event: TelemetryEvent): MetricDescriptor {
let windowSeconds = 0;
const measurements: MetricMeasurement[] = [];
for (const [name, value] of Object.entries(event.measurements)) {
if (name === "window_seconds") {
windowSeconds = value;
} else if (name.startsWith("count_")) {
measurements.push({ name, value, kind: "counter", unit: "{request}" });
} else {
measurements.push({
name,
value,
kind: "gauge",
unit: measurementUnit(name),
});
}
}
return { windowSeconds, measurements };
}

function measurementUnit(name: string): string {
if (name.endsWith("_ms") || name.endsWith("Ms")) {
return "ms";
}
if (name.endsWith("Mbits")) {
return "Mbit/s";
}
return "1";
}
97 changes: 97 additions & 0 deletions src/telemetry/export/writers/otlp/envelope.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import { createWriteStream } from "node:fs";

import { toError } from "../../../../error/errorUtils";

/** Append-only writer for one OTLP/JSON envelope file. `append` is not re-entrant. */
export interface EnvelopeFile {
append(value: unknown): Promise<void>;
close(): Promise<void>;
}

/** Streams `<prefix>v1,v2,...<suffix>` JSON into `filePath`. */
export async function openEnvelopeFile(
filePath: string,
prefix: string,
suffix: string,
): Promise<EnvelopeFile> {
const stream = createWriteStream(filePath, { encoding: "utf8" });
// Open failures (ENOENT/EACCES) surface as 'error' events, not write
// callbacks; capture them so pending writes reject instead of hanging.
let asyncError: Error | undefined;
stream.once("error", (err) => {
asyncError ??= err;
});

await write(stream, prefix, filePath, () => asyncError);
let written = 0;
let closed = false;
return {
async append(value) {
await write(
stream,
(written === 0 ? "" : ",") + JSON.stringify(value),
filePath,
() => asyncError,
);
written += 1;
},
async close() {
if (closed) {
return;
}
closed = true;
try {
await write(stream, suffix, filePath, () => asyncError);
} catch (err) {
// Re-label suffix-write failures as a close failure.
const inner = (err as { cause?: unknown }).cause;
const msg =
inner instanceof Error ? inner.message : toError(err).message;
throw new Error(`Failed to close ${filePath}: ${msg}`, { cause: err });
}
await new Promise<void>((resolve, reject) => {
stream.end((err?: Error | null) => {
const failure = err ?? asyncError;
if (failure) {
reject(
new Error(`Failed to close ${filePath}: ${failure.message}`, {
cause: failure,
}),
);
} else {
resolve();
}
});
});
},
};
}

function write(
stream: NodeJS.WritableStream,
chunk: string,
filePath: string,
asyncError: () => Error | undefined,
): Promise<void> {
return new Promise((resolve, reject) => {
const reject_ = (err: unknown) =>
reject(
new Error(`Failed to write ${filePath}: ${toError(err).message}`, {
cause: err,
}),
);
const existing = asyncError();
if (existing) {
reject_(existing);
return;
}
stream.write(chunk, "utf8", (err) => {
const failure = err ?? asyncError();
if (failure) {
reject_(failure);
} else {
resolve();
}
});
});
}
Loading
Loading