Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/compression-telemetry-attributes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@workflow/core': patch
---

Emit OpenTelemetry span attributes for payload compression on the serialize (write) and deserialize (read) paths: `workflow.serialization.{operation,compressed,uncompressed_bytes,stored_bytes,compression_ratio}`. Sizes are measured at the compression boundary (pre-encryption). Telemetry failures never affect serialization.
5 changes: 5 additions & 0 deletions .changeset/gzip-ref-compression-core.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@workflow/core': minor
---

Compress serialized payloads (step inputs/outputs, workflow arguments/return values, errors, hook payloads) before storage using composable codec format prefixes. zstd is the preferred codec (markedly faster than gzip at an equal-or-better ratio, via `node:zlib`); gzip (`CompressionStream`) is the portable fallback when zstd is unavailable. Reads dispatch on the prefix, so both codecs are always decodable. Compression is applied before encryption, gated on run specVersion 5, and skipped for small or incompressible payloads. `WORKFLOW_DISABLE_COMPRESSION=1` disables writes; `WORKFLOW_COMPRESSION_CODEC=gzip` forces the portable codec.
5 changes: 5 additions & 0 deletions .changeset/gzip-ref-compression-world-vercel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@workflow/world-vercel': minor
---

Advertise specVersion 5 so new Vercel runs are eligible for gzip payload compression. The workflow-server declared spec-5 support in vercel/workflow-server#520; payloads remain opaque to the server (compression is client-side). Spec 5 is a superset of spec 4, so initial run attributes still work.
5 changes: 5 additions & 0 deletions .changeset/gzip-ref-compression-world.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@workflow/world': minor
---

Bump `SPEC_VERSION_CURRENT` to 5 (`SPEC_VERSION_SUPPORTS_COMPRESSION`): runs at spec 5+ may contain gzip-compressed payloads, and older SDKs reject them via `requiresNewerWorld()` instead of failing on individual payloads.
5 changes: 5 additions & 0 deletions .changeset/zstd-web-decoder.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@workflow/web-shared': minor
---

Decode zstd-compressed workflow payloads in the observability UI. Since the Web `DecompressionStream` has no zstd support, the web o11y registers a WASM-backed zstd decoder (`@tootallnate/zstd-wasm`) with `@workflow/core` before hydrating payloads; the WASM is compiled lazily on first use.
5 changes: 1 addition & 4 deletions docs/content/docs/v5/observability/meta.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
{
"title": "Observability",
"pages": [
"tracing",
"attributes"
]
"pages": ["tracing", "attributes"]
}
88 changes: 88 additions & 0 deletions packages/core/scripts/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Compression benchmarks

Reproducible benchmarks for the gzip payload compression feature
(specVersion 5, PR adding the `gzip` serialization format prefix). Two
dimensions are measured: **storage size** (bytes saved) and **CPU cost**
(time added to serialize/deserialize). All workloads are shared and
deterministic — see `lib/workloads.mjs`.

Build `@workflow/core` first so the scripts can import the compiled
serialization layer:

```bash
pnpm --filter @workflow/core build
cd packages/core
```

## 1. Storage size

```bash
node scripts/benchmark-compression-size.mjs
```

Prints the exact bytes the serialization layer hands to the World storage
backends (S3/DynamoDB refs for vercel, `bytea` columns for postgres, JSON
files for local), compression off vs on, per workload, plus a simulated
10-step AI-agent event-log total. Backends that base64-encode binary
(DynamoDB inline refs, world-local JSON) see ~33% larger absolute savings
than the raw numbers.

## 2. CPU cost

```bash
node scripts/benchmark-compression-cpu.mjs
```

Three sections:

1. **Per-payload serialize + deserialize cost** through the real shipping
path (`step.serialize` / `step.deserialize`, which use the Web
`CompressionStream('gzip')`), off vs on, with throughput.
2. **Stress** — total serialization CPU to write + replay-read thousands
of event payloads, modelling a long workflow.
3. **Algorithm comparison** (`node:zlib` sync) — gzip levels 1/6/9,
brotli, deflate-raw — informational, to compare candidate codecs for a
future format prefix (e.g. a `zsd1` zstd codec). Not the shipping path.

Compression is a **world-independent CPU cost** added to the
serialize/deserialize path. The world only changes the *baseline* you
compare against: local (filesystem) is the fastest baseline so the
relative impact is largest there; Vercel (network + AES encryption + S3)
has the slowest baseline so the relative impact is smallest. The absolute
microbenchmark numbers hold for every backend.

## 3. End-to-end runtime (local + vercel)

The end-to-end harness already in the repo drives the stress workflows in
`workbench/example/workflows/97_bench.ts` through a real World and records
per-run `executionTimeMs` (`completedAt − createdAt`) to
`bench-timings-<app>-<backend>.json`:

```bash
# Local world (nextjs-turbopack dev server on :3000)
cd workbench/nextjs-turbopack && WORKFLOW_PUBLIC_MANIFEST=1 pnpm dev &
pnpm bench:local # from repo root

# Full suite incl. 1000-step / 1000-concurrent / 500×10KB cases
BENCHMARK_FULL_SUITE=true pnpm bench:local
```

To measure the compression delta, run the harness twice and diff the
output JSON: once normally (compression on, specVersion 5) and once with
`WORKFLOW_DISABLE_COMPRESSION=1` set on **both** the dev server and the
bench runner (compression off, everything else identical):

```bash
# compression OFF baseline
WORKFLOW_DISABLE_COMPRESSION=1 pnpm dev & # in the workbench
WORKFLOW_DISABLE_COMPRESSION=1 pnpm bench:local # from repo root
mv bench-timings-nextjs-turbopack-local.json bench-timings-...-off.json
```

For **Vercel**, the same harness targets a deployment when the Vercel env
vars from `CLAUDE.md` are set (`WORKFLOW_VERCEL_ENV`, `VERCEL_DEPLOYMENT_ID`,
`WORKFLOW_VERCEL_AUTH_TOKEN`, `WORKFLOW_VERCEL_PROJECT`, `VERCEL_OIDC_TOKEN`,
etc.); it then writes `bench-timings-<app>-vercel.json`. The
`WORKFLOW_DISABLE_COMPRESSION=1` kill switch must be set on the deployment
(an env var on the Vercel project) for the off baseline, since compression
runs server-side in the step/workflow handlers there.
222 changes: 222 additions & 0 deletions packages/core/scripts/benchmark-compression-cpu.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
// Benchmark: CPU cost of payload compression in the serialization layer.
//
// Compression is a pure client-side CPU cost added to the serialize
// (write) and deserialize (read) paths — it is WORLD-INDEPENDENT. The
// world (local / vercel / postgres) only changes the *baseline* you
// compare against (filesystem vs network+encryption+S3), so the absolute
// numbers here hold regardless of backend, and the *relative* impact is
// largest on the local world (fast baseline) and smallest on Vercel
// (network + encryption dominate). See scripts/README.md.
//
// Usage (from packages/core, after `pnpm build`):
// node scripts/benchmark-compression-cpu.mjs
//
// Three sections:
// 1. Per-payload serialize + deserialize cost via the real shipping
// path (the SDK's preferred codec — zstd when node:zlib has it,
// else gzip), off vs on. Force gzip with WORKFLOW_COMPRESSION_CODEC=gzip.
// 2. Stress: total CPU to (de)serialize thousands of event payloads,
// modelling a long workflow + replay.
// 3. Algorithm comparison (node:zlib sync APIs) — informational, to
// compare gzip levels / zstd levels / brotli / deflate.

import zlib from 'node:zlib';
import * as step from '../dist/serialization/step.js';
import { ecommerceOrder, WORKLOADS } from './lib/workloads.mjs';

const encoder = new TextEncoder();

function rawBytes(value) {
if (value instanceof Uint8Array) return value.byteLength;
return encoder.encode(JSON.stringify(value)).byteLength;
}

/** Time an async fn: warm up, then run until both minIters and minMs met. */
async function timeAsync(
fn,
{ warmup = 30, minIters = 50, minMs = 1500 } = {}
) {
for (let i = 0; i < warmup; i++) await fn();
let iters = 0;
const start = performance.now();
let elapsed = 0;
do {
await fn();
iters++;
elapsed = performance.now() - start;
} while (iters < minIters || elapsed < minMs);
return { usPerOp: (elapsed * 1000) / iters, iters };
}

/** Time a sync fn the same way. */
function timeSync(fn, { warmup = 30, minIters = 50, minMs = 1000 } = {}) {
for (let i = 0; i < warmup; i++) fn();
let iters = 0;
const start = performance.now();
let elapsed = 0;
do {
fn();
iters++;
elapsed = performance.now() - start;
} while (iters < minIters || elapsed < minMs);
return { usPerOp: (elapsed * 1000) / iters, iters };
}

const mbPerSec = (bytes, usPerOp) => bytes / (usPerOp / 1e6) / (1024 * 1024);
const pct = (a, b) => `${(((a - b) / b) * 100).toFixed(1)}%`;

// ---------------------------------------------------------------------------
// 1. Per-payload serialize + deserialize cost (real shipping path)
// ---------------------------------------------------------------------------

const writeCodec = process.env.WORKFLOW_COMPRESSION_CODEC || 'zstd (default)';
console.log(
`## Serialize + deserialize CPU cost (real shipping path, codec: ${writeCodec})`
);
console.log('');
console.log(
'| Workload | ser off | ser on | ser Δ | deser off | deser on | deser Δ | compress MB/s |'
);
console.log('| --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: |');

for (const [name, value] of WORKLOADS) {
const bytes = rawBytes(value);
const serOff = await timeAsync(() => step.serialize(value, undefined, {}));
const serOn = await timeAsync(() =>
step.serialize(value, undefined, { compression: true })
);

const uncompressed = await step.serialize(value, undefined, {});
const compressed = await step.serialize(value, undefined, {
compression: true,
});
const deserOff = await timeAsync(() =>
step.deserialize(uncompressed, undefined, {})
);
const deserOn = await timeAsync(() =>
step.deserialize(compressed, undefined, {})
);

console.log(
`| ${name} | ${serOff.usPerOp.toFixed(1)}µs | ${serOn.usPerOp.toFixed(1)}µs | +${pct(serOn.usPerOp, serOff.usPerOp)} | ${deserOff.usPerOp.toFixed(1)}µs | ${deserOn.usPerOp.toFixed(1)}µs | +${pct(deserOn.usPerOp, deserOff.usPerOp)} | ${mbPerSec(bytes, serOn.usPerOp - serOff.usPerOp).toFixed(0)} |`
);
}

// ---------------------------------------------------------------------------
// 2. Stress: thousands of event payloads (long workflow + replay)
// ---------------------------------------------------------------------------
//
// A long workflow writes each step payload once and re-reads (replays)
// every prior payload on each cold start. We model the serialization CPU
// for N events: N serializes + N deserializes, off vs on. The "added"
// number is the total extra CPU compression spends across the whole run.

console.log('');
console.log('## Stress: total serialization CPU for N event payloads');
console.log('');
console.log(
'(e-commerce order payload, ~6.6 KB each — representative step output)'
);
console.log('');
console.log(
'| Events | off (ser+deser) | on (ser+deser) | added CPU | per event |'
);
console.log('| ---: | ---: | ---: | ---: | ---: |');

const stressValue = ecommerceOrder();
const stressUncompressed = await step.serialize(stressValue, undefined, {});
const stressCompressed = await step.serialize(stressValue, undefined, {
compression: true,
});

for (const n of [1000, 5000, 10000]) {
// Time one ser+deser cycle for each mode, then multiply by N. Timing
// each cycle (rather than looping N inline) keeps GC pressure realistic
// and the per-op cost stable.
const offCycle = await timeAsync(async () => {
const s = await step.serialize(stressValue, undefined, {});
await step.deserialize(s, undefined, {});
});
const onCycle = await timeAsync(async () => {
const s = await step.serialize(stressValue, undefined, {
compression: true,
});
await step.deserialize(s, undefined, {});
});
const offTotalMs = (offCycle.usPerOp * n) / 1000;
const onTotalMs = (onCycle.usPerOp * n) / 1000;
console.log(
`| ${n.toLocaleString()} | ${offTotalMs.toFixed(0)}ms | ${onTotalMs.toFixed(0)}ms | +${(onTotalMs - offTotalMs).toFixed(0)}ms | +${(onCycle.usPerOp - offCycle.usPerOp).toFixed(1)}µs |`
);
}
void stressUncompressed;
void stressCompressed;

// ---------------------------------------------------------------------------
// 3. Algorithm comparison (node:zlib sync) — informational
// ---------------------------------------------------------------------------
//
// Production uses Web CompressionStream('gzip') ≈ zlib gzip level 6. These
// sync numbers let us compare candidate codecs for a future format prefix
// (e.g. a `zsd1` zstd codec) without committing to one. Measured on the
// devalue-serialized payload bytes (what the layer actually compresses).

console.log('');
console.log('## Algorithm comparison (node:zlib sync, informational)');
console.log('');

// zstd entries are gated on availability (node:zlib >= 22.15). The
// production gzip path ships via the Web CompressionStream (≈ gzip -6); the
// node:zlib gzip rows here isolate pure codec speed from that stream
// overhead, and are the apples-to-apples comparison against zstd.
const hasZstd = typeof zlib.zstdCompressSync === 'function';
const zstdAt = (level) => (b) =>
zlib.zstdCompressSync(b, {
params: { [zlib.constants.ZSTD_c_compressionLevel]: level },
});

const ALGOS = [
['gzip -1', (b) => zlib.gzipSync(b, { level: 1 }), zlib.gunzipSync],
['gzip -6 (default)', (b) => zlib.gzipSync(b, { level: 6 }), zlib.gunzipSync],
['gzip -9', (b) => zlib.gzipSync(b, { level: 9 }), zlib.gunzipSync],
...(hasZstd
? [
['zstd -3 (default)', zstdAt(3), zlib.zstdDecompressSync],
['zstd -9', zstdAt(9), zlib.zstdDecompressSync],
['zstd -19', zstdAt(19), zlib.zstdDecompressSync],
]
: []),
[
'brotli -q5',
(b) =>
zlib.brotliCompressSync(b, {
params: { [zlib.constants.BROTLI_PARAM_QUALITY]: 5 },
}),
zlib.brotliDecompressSync,
],
['deflate-raw', (b) => zlib.deflateRawSync(b), zlib.inflateRawSync],
];

// Use the larger text/structured payloads where codec choice matters.
const ALGO_WORKLOADS = WORKLOADS.filter(([name]) =>
/chat|API|document|Time series/.test(name)
);

for (const [name, value] of ALGO_WORKLOADS) {
const input = await step.serialize(value, undefined, {}); // devl + bytes
const inputBytes = input.byteLength;
console.log(`### ${name} (${(inputBytes / 1024).toFixed(1)} KB serialized)`);
console.log('');
console.log('| Algorithm | ratio | compress | decompress | compress MB/s |');
console.log('| --- | ---: | ---: | ---: | ---: |');
for (const [algo, compressFn, decompressFn] of ALGOS) {
const out = compressFn(input);
const ratio = ((1 - out.length / inputBytes) * 100).toFixed(1);
const c = timeSync(() => compressFn(input));
const d = timeSync(() => decompressFn(out));
console.log(
`| ${algo} | ${ratio}% | ${c.usPerOp.toFixed(1)}µs | ${d.usPerOp.toFixed(1)}µs | ${mbPerSec(inputBytes, c.usPerOp).toFixed(0)} |`
);
}
console.log('');
}
Loading
Loading