Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/quiet-analytics-readers.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@workflow/world': minor
'@workflow/world-vercel': minor
---

Add metadata-only workflow analytics APIs for ClickHouse-backed observability reads.
160 changes: 160 additions & 0 deletions packages/world-vercel/src/analytics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import {
type Analytics,
AnalyticsEventSchema,
AnalyticsHookSchema,
AnalyticsRunSchema,
AnalyticsStepSchema,
AnalyticsWaitSchema,
PaginatedResponseSchema,
type PaginationOptions,
} from '@workflow/world';
import type { APIConfig } from './utils.js';
import { makeRequest } from './utils.js';

function appendPagination(
params: URLSearchParams,
pagination: PaginationOptions | undefined
): void {
if (pagination?.limit) params.set('limit', pagination.limit.toString());
if (pagination?.cursor) params.set('cursor', pagination.cursor);
if (pagination?.sortOrder) params.set('sortOrder', pagination.sortOrder);
}

function createQueryString(params: URLSearchParams): string {
const query = params.toString();
return query ? `?${query}` : '';
}

export function createAnalytics(config?: APIConfig): Analytics {
return {
runs: {
get(runId) {
return makeRequest({
endpoint: `/v2/analytics/runs/${encodeURIComponent(runId)}`,
config,
schema: AnalyticsRunSchema,
});
},
list(params = {}) {
const searchParams = new URLSearchParams();
if (params.workflowName) {
searchParams.set('workflowName', params.workflowName);
}
if (params.status) {
searchParams.set('status', params.status);
}
appendPagination(searchParams, params.pagination);

return makeRequest({
endpoint: `/v2/analytics/runs${createQueryString(searchParams)}`,
config,
schema: PaginatedResponseSchema(AnalyticsRunSchema),
});
},
},
steps: {
get(runId, stepId) {
return makeRequest({
endpoint: `/v2/analytics/runs/${encodeURIComponent(runId)}/steps/${encodeURIComponent(stepId)}`,
config,
schema: AnalyticsStepSchema,
});
},
list(params) {
const searchParams = new URLSearchParams();
appendPagination(searchParams, params.pagination);

return makeRequest({
endpoint: `/v2/analytics/runs/${encodeURIComponent(params.runId)}/steps${createQueryString(searchParams)}`,
config,
schema: PaginatedResponseSchema(AnalyticsStepSchema),
});
},
},
events: {
get(runId, eventId) {
return makeRequest({
endpoint: `/v2/analytics/runs/${encodeURIComponent(runId)}/events/${encodeURIComponent(eventId)}`,
config,
schema: AnalyticsEventSchema,
});
},
list(params) {
const searchParams = new URLSearchParams();
if (params.eventType) {
searchParams.set('eventType', params.eventType);
}
if (params.correlationId) {
searchParams.set('correlationId', params.correlationId);
}
appendPagination(searchParams, params.pagination);

return makeRequest({
endpoint: `/v2/analytics/runs/${encodeURIComponent(params.runId)}/events${createQueryString(searchParams)}`,
config,
schema: PaginatedResponseSchema(AnalyticsEventSchema),
});
},
listByCorrelationId(params) {
const searchParams = new URLSearchParams();
searchParams.set('correlationId', params.correlationId);
appendPagination(searchParams, params.pagination);

return makeRequest({
endpoint: `/v2/analytics/events${createQueryString(searchParams)}`,
config,
schema: PaginatedResponseSchema(AnalyticsEventSchema),
});
},
},
hooks: {
get(hookId, params) {
const searchParams = new URLSearchParams();
if (params?.runId) {
searchParams.set('runId', params.runId);
}

return makeRequest({
endpoint: `/v2/analytics/hooks/${encodeURIComponent(hookId)}${createQueryString(searchParams)}`,
config,
schema: AnalyticsHookSchema,
});
},
list(params = {}) {
const searchParams = new URLSearchParams();
if (params.runId) {
searchParams.set('runId', params.runId);
}
appendPagination(searchParams, params.pagination);

return makeRequest({
endpoint: `/v2/analytics/hooks${createQueryString(searchParams)}`,
config,
schema: PaginatedResponseSchema(AnalyticsHookSchema),
});
},
},
waits: {
get(runId, waitId) {
return makeRequest({
endpoint: `/v2/analytics/runs/${encodeURIComponent(runId)}/waits/${encodeURIComponent(waitId)}`,
config,
schema: AnalyticsWaitSchema,
});
},
list(params) {
const searchParams = new URLSearchParams();
if (params.status) {
searchParams.set('status', params.status);
}
appendPagination(searchParams, params.pagination);

return makeRequest({
endpoint: `/v2/analytics/runs/${encodeURIComponent(params.runId)}/waits${createQueryString(searchParams)}`,
config,
schema: PaginatedResponseSchema(AnalyticsWaitSchema),
});
},
},
};
}
3 changes: 3 additions & 0 deletions packages/world-vercel/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { World } from '@workflow/world';
import { SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT } from '@workflow/world';
import { createAnalytics } from './analytics.js';
import { createGetEncryptionKeyForRun } from './encryption.js';
import { instrumentObject } from './instrumentObject.js';
import { createQueue } from './queue.js';
Expand All @@ -8,6 +9,7 @@ import { createStorage } from './storage.js';
import { createStreamer } from './streamer.js';
import type { APIConfig } from './utils.js';

export { createAnalytics } from './analytics.js';
export {
createGetEncryptionKeyForRun,
deriveRunKey,
Expand All @@ -34,6 +36,7 @@ export function createVercelWorld(config?: APIConfig): World {
processExitTriggersQueueRedelivery: true,
...createQueue(config),
...createStorage(config),
analytics: createAnalytics(config),
...instrumentObject('world.streams', createStreamer(config)),
getEncryptionKeyForRun: createGetEncryptionKeyForRun(
projectId,
Expand Down
165 changes: 165 additions & 0 deletions packages/world/src/analytics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import { z } from 'zod';
import { EventTypeSchema } from './events.js';
import { WorkflowRunStatusSchema } from './runs.js';
import type { PaginatedResponse, PaginationOptions } from './shared.js';
import { StepStatusSchema } from './steps.js';
import { WaitStatusSchema } from './waits.js';

const NullableDateSchema = z.coerce.date().nullable().optional();
const NullableStringSchema = z.string().nullable().optional();
const NullableBooleanSchema = z.boolean().nullable().optional();

export const AnalyticsRunSchema = z.object({
runId: z.string(),
status: WorkflowRunStatusSchema,
deploymentId: z.string(),
workflowName: z.string(),
specVersion: z.coerce.number().optional(),
attributes: z.record(z.string(), z.string()).default({}),
createdAt: z.coerce.date(),
updatedAt: z.coerce.date(),
startedAt: NullableDateSchema,
completedAt: NullableDateSchema,
errorCode: NullableStringSchema,
workflowCoreVersion: NullableStringSchema,
workflowEncryptionEnabled: NullableBooleanSchema,
});

export const AnalyticsStepSchema = z.object({
runId: z.string(),
stepId: z.string(),
stepName: NullableStringSchema,
status: StepStatusSchema,
attempt: z.number().optional(),
createdAt: z.coerce.date(),
updatedAt: z.coerce.date(),
startedAt: NullableDateSchema,
completedAt: NullableDateSchema,
retryAfter: NullableDateSchema,
errorCode: NullableStringSchema,
workflowCoreVersion: NullableStringSchema,
workflowEncryptionEnabled: NullableBooleanSchema,
});

export const AnalyticsEventSchema = z.object({
runId: z.string(),
eventId: z.string(),
eventType: EventTypeSchema,
correlationId: NullableStringSchema,
entityId: NullableStringSchema,
stepName: NullableStringSchema,
workflowName: z.string(),
deploymentId: z.string(),
specVersion: z.coerce.number().optional(),
runCreatedAt: z.coerce.date(),
createdAt: z.coerce.date(),
region: NullableStringSchema,
vercelId: NullableStringSchema,
requestId: NullableStringSchema,
resumeAt: NullableDateSchema,
retryAfter: NullableDateSchema,
errorCode: NullableStringSchema,
workflowCoreVersion: NullableStringSchema,
isWebhook: NullableBooleanSchema,
isSystem: NullableBooleanSchema,
workflowEncryptionEnabled: NullableBooleanSchema,
});

export const AnalyticsHookSchema = z.object({
runId: z.string(),
hookId: z.string(),
status: z.enum(['created', 'received', 'disposed', 'conflict']),
createdAt: z.coerce.date(),
updatedAt: z.coerce.date(),
receivedAt: NullableDateSchema,
disposedAt: NullableDateSchema,
isWebhook: NullableBooleanSchema,
isSystem: NullableBooleanSchema,
workflowCoreVersion: NullableStringSchema,
workflowEncryptionEnabled: NullableBooleanSchema,
});

export const AnalyticsWaitSchema = z.object({
runId: z.string(),
waitId: z.string(),
status: WaitStatusSchema,
resumeAt: NullableDateSchema,
createdAt: z.coerce.date(),
updatedAt: z.coerce.date(),
completedAt: NullableDateSchema,
workflowCoreVersion: NullableStringSchema,
workflowEncryptionEnabled: NullableBooleanSchema,
});

export type AnalyticsRun = z.infer<typeof AnalyticsRunSchema>;
export type AnalyticsStep = z.infer<typeof AnalyticsStepSchema>;
export type AnalyticsEvent = z.infer<typeof AnalyticsEventSchema>;
export type AnalyticsHook = z.infer<typeof AnalyticsHookSchema>;
export type AnalyticsWait = z.infer<typeof AnalyticsWaitSchema>;

export interface AnalyticsListRunsParams {
workflowName?: string;
status?: AnalyticsRun['status'];
pagination?: PaginationOptions;
}

export interface AnalyticsListRunScopedParams {
runId: string;
pagination?: PaginationOptions;
}

export interface AnalyticsListEventsParams
extends AnalyticsListRunScopedParams {
eventType?: AnalyticsEvent['eventType'];
correlationId?: string;
}

export interface AnalyticsListEventsByCorrelationIdParams {
correlationId: string;
pagination?: PaginationOptions;
}

export interface AnalyticsListHooksParams {
runId?: string;
pagination?: PaginationOptions;
}

export interface AnalyticsListWaitsParams extends AnalyticsListRunScopedParams {
status?: AnalyticsWait['status'];
}

export interface Analytics {
runs: {
get(runId: string): Promise<AnalyticsRun>;
list(
params?: AnalyticsListRunsParams
): Promise<PaginatedResponse<AnalyticsRun>>;
};
steps: {
get(runId: string, stepId: string): Promise<AnalyticsStep>;
list(
params: AnalyticsListRunScopedParams
): Promise<PaginatedResponse<AnalyticsStep>>;
};
events: {
get(runId: string, eventId: string): Promise<AnalyticsEvent>;
list(
params: AnalyticsListEventsParams
): Promise<PaginatedResponse<AnalyticsEvent>>;
listByCorrelationId(
params: AnalyticsListEventsByCorrelationIdParams
): Promise<PaginatedResponse<AnalyticsEvent>>;
};
hooks: {
get(hookId: string, params?: { runId?: string }): Promise<AnalyticsHook>;
list(
params?: AnalyticsListHooksParams
): Promise<PaginatedResponse<AnalyticsHook>>;
};
waits: {
get(runId: string, waitId: string): Promise<AnalyticsWait>;
list(
params: AnalyticsListWaitsParams
): Promise<PaginatedResponse<AnalyticsWait>>;
};
}
10 changes: 9 additions & 1 deletion packages/world/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
export type * from './analytics.js';
export {
AnalyticsEventSchema,
AnalyticsHookSchema,
AnalyticsRunSchema,
AnalyticsStepSchema,
AnalyticsWaitSchema,
} from './analytics.js';
export type * from './attributes.js';
export {
applyAttributeChanges,
ATTRIBUTE_KEY_MAX_LENGTH,
ATTRIBUTE_MAX_PER_RUN,
ATTRIBUTE_VALUE_MAX_BYTES,
AttributeChangeSchema,
AttributeChangesSchema,
AttributeValidationError,
applyAttributeChanges,
RESERVED_ATTRIBUTE_KEY_PREFIX,
validateAttributeChanges,
validateAttributeKey,
Expand Down
Loading
Loading