Skip to content

feat(workers): first-class Cloudflare Queue consumer helper (and other non-HTTP entrypoints) #294

@HugoRCD

Description

@HugoRCD

Summary

evlog/workers only ships HTTP-oriented helpers today (initWorkersLogger, createWorkersLogger(request)). Both assume a Request object to extract cf-ray, request.cf, method, and path. Cloudflare Queue consumers (export default { async queue(batch, env) {} }) and other non-HTTP entrypoints like Cron Triggers, Durable Object alarms, and Email Workers have no first-class story.

The building blocks already exist (createLogger in evlog, createLoggerStorage in evlog/toolkit), but every user has to wire them together themselves. We should ship the helper and the docs.

Context

Reported by @nick_radford on X

evlog folks — is there an adapter for cloudflare queue consumers, or a pattern you'd recommend? I'd like to define a logger per queue process and have it easily referenced with useLogger in nested calls.

Proposed API

import { initWorkersLogger, withQueueLogger, useQueueLogger } from 'evlog/workers'

initWorkersLogger({ env: { service: 'my-worker' } })

export default {
  async queue(batch, env) {
    await withQueueLogger(batch, async (msg) => {
      await processOrder(msg.body)
    })
  },
}

async function processOrder(order) {
  const log = useQueueLogger()
  log.set({ order: { id: order.id, total: order.total } })
}

What withQueueLogger does:

  1. Creates one wide event per message via createLogger({ queue, messageId, attempts, requestId })uses createLogger, not createRequestLogger, because queues aren't HTTP requests. The createLogger JSDoc explicitly lists "workflows, jobs, scripts, queues" as use cases.
  2. Runs the handler inside storage.run(log, fn) so useQueueLogger() resolves the per-message logger anywhere in the call stack — same DX as useLogger() in evlog/express, evlog/fastify, evlog/elysia.
  3. Auto-emits the wide event on success.
  4. On throw: calls log.error(err), emits, then rethrows so Cloudflare's retry / DLQ logic still applies.

Open questions

  • Unified or scoped accessor? Should useLogger() be unified across evlog/workers (one ALS for both fetch and queue) or scoped (useQueueLogger)? Unified is friendlier; scoped avoids cross-talk if a fetch handler synchronously enqueues work.
  • Sequential vs parallel processing. The naive loop processes messages sequentially. Parallel processing is possible (Promise.all with one storage.run per message) but interacts with Cloudflare's consumer concurrency model. Which should be the default?
  • Status field convention. Wide events from HTTP handlers carry a status (HTTP code). For queues, should we emit a synthetic status: 200/500, or leave it absent and rely on level: 'error'? Leaning toward absent — queues aren't HTTP.
  • Other entrypoints. Should this issue cover Cron Triggers / DO alarms / Email Workers in one go, or land queues first and generalize?

Workaround (works today, ~20 lines)

Until shipped, users can roll their own with the public toolkit. Requires nodejs_compat (or nodejs_als) compatibility flag in wrangler.toml — already standard for most Worker projects.

// src/queue-logger.ts
import type { Message, MessageBatch } from '@cloudflare/workers-types'
import { createLogger } from 'evlog'
import { createLoggerStorage } from 'evlog/toolkit'

const { storage, useLogger } = createLoggerStorage(
  'queue handler. Wrap your handler in withQueueLogger().',
)

export { useLogger }

export async function withQueueLogger<Body>(
  batch: MessageBatch<Body>,
  handler: (msg: Message<Body>) => Promise<void>,
): Promise<void> {
  for (const msg of batch.messages) {
    const log = createLogger({
      queue: batch.queue,
      messageId: msg.id,
      attempts: msg.attempts,
      requestId: msg.id,
    })

    await storage.run(log, async () => {
      try {
        await handler(msg)
        log.emit()
      }
      catch (err) {
        log.error(err as Error)
        log.emit()
        throw err
      }
    })
  }
}
// src/worker.ts
import { initWorkersLogger } from 'evlog/workers'
import { useLogger, withQueueLogger } from './queue-logger'

initWorkersLogger({ env: { service: 'my-worker' } })

export default {
  async queue(batch, env) {
    await withQueueLogger(batch, async (msg) => {
      await processOrder(msg.body)
    })
  },
}

async function processOrder(order: { id: string, total: number }) {
  const log = useLogger()
  log.set({ order: { id: order.id, total: order.total } })
}

The same initWorkersLogger config (drain, enrichers, sampling, redaction) flows through to the queue logger automatically, there's nothing to re-wire.

These would all share the same ALS + createLogger pattern.

References

  • Current adapter: `packages/evlog/src/workers/index.ts`
  • Toolkit primitive: `packages/evlog/src/shared/storage.ts` (`createLoggerStorage`)
  • Core primitive: `packages/evlog/src/logger.ts` (`createLogger` — line 557)
  • Reference implementations of the same pattern: `packages/evlog/src/express/index.ts`, `packages/evlog/src/elysia/index.ts`
  • Cloudflare Queues consumer docs: https://developers.cloudflare.com/queues/configuration/javascript-apis/#consumer

Metadata

Metadata

Assignees

Labels

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions