Skip to content

Conversation

andrewwormald
Copy link
Collaborator

@andrewwormald andrewwormald commented Sep 9, 2025

This MR allows the disabling of the outbox consumer inside workflow. This is desired in the case where the workflow has a record store that purges it's outbox by itself. This is ideal when the record store is centralised for multiple workflows across many services. In cases where a record store is being provisioned locally (same host) or specifaclly for the workflow then the internal outbox consumer should be used for simplicity.

Summary by CodeRabbit

  • New Features
    • Optional background outbox purging to stream and clear queued events when enabled, plus a public option to configure it and a build option to disable automatic polling.
  • Bug Fixes
    • Prevented timer resource leaks by ensuring timers are stopped in several wait helpers.
  • Tests
    • Added acceptance and targeted tests for outbox-enabled, outbox-disabled and invalid outbox configurations; updated many tests to use clearer error assertions.

Copy link

coderabbitai bot commented Sep 9, 2025

Walkthrough

Adds configurable outbox processing to the in‑memory record store and a runtime toggle to disable it. Introduces memrecordstore.WithOutbox to start a background purge loop that lists outbox events, publishes them via a provided EventStreamer, and deletes them in batches. New OutboxLister and OutboxDeleter types and PurgeOutboxForever implement polling, backoff and per‑topic senders. memrecordstore.New now validates outbox prerequisites and may panic on incomplete config. Adds workflow Build option WithoutOutbox to skip starting the outbox consumer. Tests updated/added for outbox behaviour and many test assertion refinements; small timer cleanup defers added to several wait helpers.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Suggested reviewers

  • echarrod

Pre-merge checks (2 passed, 1 warning)

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title clearly identifies the main change—enabling the outbox to be disabled—using concise language that directly relates to the PR’s objective of adding an outbox disable option without extraneous detail.

Poem

I’m a rabbit in the code, I hop from queue to queue,
I nibble at the outbox crumbs and stitch events anew.
When WithOutbox calls, I wake and humbly purge the pile,
But flip WithoutOutbox and I’ll nap for a while. 🥕🐇

Tip

👮 Agentic pre-merge checks are now available in preview!

Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.

  • Built-in checks – Quickly apply ready-made checks to enforce title conventions, require pull request descriptions that follow templates, validate linked issues for compliance, and more.
  • Custom agentic checks – Define your own rules using CodeRabbit’s advanced agentic capabilities to enforce organization-specific policies and workflows. For example, you can instruct CodeRabbit’s agent to verify that API documentation is updated whenever API schema files are modified in a PR. Note: Upto 5 custom checks are currently allowed during the preview period. Pricing for this feature will be announced in a few weeks.

Please see the documentation for more information.

Example:

reviews:
  pre_merge_checks:
    custom_checks:
      - name: "Undocumented Breaking Changes"
        mode: "warning"
        instructions: |
          Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).

Please share your feedback with us on this Discord post.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch andreww-allowDisablingOfOutbox

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (12)
builder.go (1)

295-301: Consider documenting the use case more clearly.

The comment could better explain when to use this option, particularly the centralised record store scenario mentioned in the PR objectives.

 // WithoutOutbox disables the polling of the RecordStore outbox for pushing events to the provided EventStreamer
 // and allows for external submission of outbox messages to the EventStreamer.
+// This is useful when the workflow uses a record store that performs its own outbox purging,
+// typically when the record store is centralised and shared across multiple workflows and services.
 func WithoutOutbox() BuildOption {
adapters/memrecordstore/memrecordstore.go (2)

36-47: Consider error handling for the goroutine.

The goroutine launches PurgeOutboxForever but discards the error. Whilst the function internally logs errors, consider whether critical errors should be handled differently.

You might want to add a mechanism to surface critical errors from the purge loop, perhaps through a callback or by storing the goroutine reference for monitoring.


198-214: Add documentation comment to listOutbox
Document that this private helper returns up to limit events from the global outbox (regardless of workflow), unlike ListOutboxEvents which filters by workflowName.

adapters/memrecordstore/memrecordstore_test.go (1)

46-77: Test name doesn't match its purpose.

The test is named TestOutboxDisabled but it actually tests the scenario where the outbox is enabled in the record store but disabled in the workflow. Consider renaming for clarity.

-func TestOutboxDisabled(t *testing.T) {
+func TestWorkflowWithoutOutboxButRecordStoreWithOutbox(t *testing.T) {
workflow_test.go (1)

171-203: Test could be more comprehensive.

The test validates that the workflow runs successfully with the outbox disabled, but doesn't verify that the outbox consumer actually isn't running.

Consider adding assertions to verify that the outbox consumer process is not present in the workflow's internal state:

// After wf.Run(ctx)
states := wf.States()
for processName := range states {
    require.NotContains(t, processName, "outbox-consumer", "outbox consumer should not be running when disabled")
}
adapters/memrecordstore/outbox.go (7)

22-23: Avoid shadowing the predeclared delete identifier

Rename the parameter and its uses to reduce confusion and improve readability.

 func PurgeOutboxForever(
   ctx context.Context,
   list OutboxLister,
-  delete OutboxDeleter,
+  deleter OutboxDeleter,
   stream workflow.EventStreamer,
   logger workflow.Logger,
   pollingFrequency time.Duration,
   lookupLimit int64,
 ) error {
   for ctx.Err() == nil {
     err := purgeOutbox(
       ctx,
       list,
-      delete,
+      deleter,
       stream,
       pollingFrequency,
       lookupLimit,
     )
 func purgeOutbox(
   ctx context.Context,
   list OutboxLister,
-  delete OutboxDeleter,
+  deleter OutboxDeleter,
   stream workflow.EventStreamer,
   pollingFrequency time.Duration,
   lookupLimit int64,
 ) error {
   events, err := list(lookupLimit)
-    err = delete(ctx, e.ID)
+    err = deleter(ctx, e.ID)
     if err != nil {
       return err
     }

Also applies to: 52-53, 93-95


28-47: Respect context during error backoff and unify cancellation semantics

Use the existing wait helper so backoff cancels promptly; also return nil consistently once the loop exits due to cancellation.

-    } else if err != nil {
-      logger.Error(ctx, err)
-      time.Sleep(time.Second)
-      continue
-    }
+    } else if err != nil {
+      logger.Error(ctx, err)
+      if err := wait(ctx, time.Second); errors.Is(err, context.Canceled) {
+        return nil
+      }
+      continue
+    }
   }
-
-  return ctx.Err()
+  return nil

82-86: Fail fast on missing topic header with a clear error

Currently an empty topic will flow into NewSender and fail opaquely. Emit a precise error instead.

+    if topic == "" {
+      return fmt.Errorf("outbox event %s missing %q header", e.ID, workflow.HeaderTopic)
+    }
     producer, err := stream.NewSender(ctx, topic)

Also add the import:

 import (
   "context"
   "errors"
+  "fmt"
   "time"

102-114: Stop the timer on early return to avoid leaks

Ensure the timer is stopped when the context path fires.

 func wait(ctx context.Context, d time.Duration) error {
   if d == 0 {
     return nil
   }
 
   t := time.NewTimer(d)
+  defer t.Stop()
   select {
   case <-ctx.Done():
     return ctx.Err()
   case <-t.C:
     return nil
   }
 }

66-99: Consider sender reuse per topic to reduce per-event churn

Creating a new sender for every event may be expensive depending on the EventStreamer. Caching senders per topic within a purge batch can reduce overhead. Confirm whether EventSender requires closing; if so, close cached senders after the loop.

Would you like me to propose a small in-batch cache with optional Close handling once we confirm EventSender’s lifecycle?


15-17: Pass context into lister for cancellation and timeouts

OutboxLister lacks context, which can make long list operations uninterruptible. Consider OutboxLister func(ctx context.Context, limit int64) (...).

If changing the type is too invasive for this PR, we can keep it and add a follow-up issue.


68-73: Poison-pill handling to avoid wedging the outbox

A single corrupt event (unmarshal failure) will repeatedly fail the whole batch. Consider a strategy: log + move to a dead-letter topic/table, or mark the event failed after N attempts.

Do we have an existing dead-letter mechanism in the mem store or higher layers you want this to integrate with?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fbf31cc and 0374257.

📒 Files selected for processing (7)
  • adapters/memrecordstore/memrecordstore.go (3 hunks)
  • adapters/memrecordstore/memrecordstore_test.go (1 hunks)
  • adapters/memrecordstore/outbox.go (1 hunks)
  • builder.go (1 hunks)
  • outbox.go (1 hunks)
  • workflow.go (1 hunks)
  • workflow_test.go (4 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
adapters/memrecordstore/outbox.go (3)
event.go (1)
  • OutboxEvent (47-60)
eventstreamer.go (3)
  • EventStreamer (10-13)
  • Header (34-34)
  • HeaderTopic (39-39)
logger.go (1)
  • Logger (14-19)
workflow_test.go (4)
adapters/memrecordstore/memrecordstore.go (2)
  • New (16-50)
  • WithOutbox (63-70)
adapters/memstreamer/memstreamer.go (1)
  • New (12-33)
internal/logger/logger.go (1)
  • New (21-30)
builder.go (1)
  • WithoutOutbox (297-301)
adapters/memrecordstore/memrecordstore.go (4)
adapters/memrecordstore/outbox.go (1)
  • PurgeOutboxForever (19-47)
eventstreamer.go (1)
  • EventStreamer (10-13)
logger.go (1)
  • Logger (14-19)
event.go (1)
  • OutboxEvent (47-60)
adapters/memrecordstore/memrecordstore_test.go (5)
adapters/memrecordstore/memrecordstore.go (2)
  • New (16-50)
  • WithOutbox (63-70)
adapters/memstreamer/memstreamer.go (1)
  • New (12-33)
adapters/memrolescheduler/memrolescheduler.go (1)
  • New (44-48)
workflow_test.go (1)
  • TestOutboxDisabled (171-203)
builder.go (2)
  • NewBuilder (25-43)
  • WithoutOutbox (297-301)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: core (1)
🔇 Additional comments (2)
workflow.go (1)

117-122: LGTM! Clean conditional outbox startup.

The implementation correctly guards the outbox consumer startup based on the configuration flag, maintaining backwards compatibility whilst allowing users to disable it when needed.

outbox.go (1)

62-62: LGTM! Simple boolean flag addition.

The addition of the disabled field to the outboxConfig struct is clean and follows the existing pattern.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
adapters/memrecordstore/memrecordstore.go (1)

44-46: Make poll cadence and batch size configurable; use named defaults

Hard-coding 10ms and 1000 reduces flexibility and can waste CPU in low-traffic tests/services. Prefer named defaults and allow overrides.

Apply these diffs:

Introduce defaults:

@@
-const defaultListLimit = 25
+const defaultListLimit = 25
+
+const (
+    defaultOutboxPollInterval = 100 * time.Millisecond
+    defaultOutboxLookupLimit  = int64(1000)
+)

Use them here:

-                10*time.Millisecond,
-                1000,
+                defaultOutboxPollInterval,
+                defaultOutboxLookupLimit,
🧹 Nitpick comments (5)
adapters/memrecordstore/memrecordstore.go (5)

47-49: Avoid allocating an empty map for Debug meta

Passing nil is fine for most loggers and avoids a tiny alloc. Also clarify the stop reason.

Apply this diff:

-                opt.logger.Debug(opt.ctx, "outbox processor stopped", map[string]string{})
+                opt.logger.Debug(opt.ctx, "outbox processor stopped (context cancelled)", nil)

50-55: Fix grammar in error log message

Minor text tweak.

Apply this diff:

-                err = errors.Join(
-                    errors.New("outbox processor stopped with unexpectedly"),
-                    err,
-                )
+                err = errors.Join(errors.New("outbox processor stopped unexpectedly"), err)

62-69: Bundle outbox-related fields into an OutboxConfig inside options

Reduces parameter sprawl and makes future extensions (poll interval, limits) simpler. Example:

  • options.outbox struct { enabled bool; ctx context.Context; es workflow.EventStreamer; logger workflow.Logger }

73-80: Document inputs and semantics of WithOutbox; consider a clearer name

This enables the in-process outbox consumer. A name like WithOutboxConsumer or WithInternalOutbox could reduce ambiguity. Also document that ctx must be cancellable and es/logger must be non-nil.

Apply this minimal doc addition:

+// WithOutbox enables the in-process outbox consumer for the memrecordstore.
+// ctx must be cancellable; es and logger must be non-nil.
 func WithOutbox(ctx context.Context, es workflow.EventStreamer, logger workflow.Logger) Option {

208-224: Pre-allocate and tighten listOutbox for fewer allocations

Saves a small amount of work on hot paths.

Apply this diff:

-    var filtered []workflow.OutboxEvent
+    n := int(limit)
+    if n > len(s.outbox) {
+        n = len(s.outbox)
+    }
+    filtered := make([]workflow.OutboxEvent, 0, n)
     for _, outboxEvent := range s.outbox {
-
         filtered = append(filtered, outboxEvent)
         if len(filtered) >= int(limit) {
             break
         }
     }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 85efb0d and 1a5527b.

📒 Files selected for processing (2)
  • adapters/memrecordstore/memrecordstore.go (3 hunks)
  • builder.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • builder.go
🧰 Additional context used
🧬 Code graph analysis (1)
adapters/memrecordstore/memrecordstore.go (4)
adapters/memrecordstore/outbox.go (1)
  • PurgeOutboxForever (19-47)
eventstreamer.go (1)
  • EventStreamer (10-13)
logger.go (1)
  • Logger (14-19)
event.go (1)
  • OutboxEvent (47-60)
🔇 Additional comments (2)
adapters/memrecordstore/memrecordstore.go (2)

208-224: Confirm that cross-workflow listing is intended

Unlike ListOutboxEvents, this helper ignores workflowName. If PurgeOutboxForever expects global scanning across all workflows, this is fine; otherwise, consider aligning semantics or adding a comment.


50-54: errors.Join support confirmed — no changes required
Go module specifies version 1.24.2, which includes errors.Join (Go 1.20+).

Comment on lines 36 to 46
if opt.writeToOutbox {
go func() {
err := PurgeOutboxForever(
opt.ctx,
s.listOutbox,
s.DeleteOutboxEvent,
opt.eventStreamer,
opt.logger,
10*time.Millisecond,
1000,
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Guard against nil ctx/eventStreamer/logger before starting the outbox processor

If any of these are nil, the goroutine will panic when PurgeOutboxForever uses them. Start only when all are non-nil.

Apply this diff:

 if opt.writeToOutbox {
-    go func() {
+    if opt.ctx == nil || opt.eventStreamer == nil || opt.logger == nil {
+        // Misconfigured outbox: missing ctx/streamer/logger; skip starting to avoid panic.
+    } else {
+    go func() {
         err := PurgeOutboxForever(
             opt.ctx,
             s.listOutbox,
             s.DeleteOutboxEvent,
             opt.eventStreamer,
             opt.logger,
-            10*time.Millisecond,
-            1000,
+            10*time.Millisecond,
+            1000,
         )
         if errors.Is(err, context.Canceled) {
             opt.logger.Debug(opt.ctx, "outbox processor stopped", map[string]string{})
         } else if err != nil {
-            err = errors.Join(
-                errors.New("outbox processor stopped with unexpectedly"),
-                err,
-            )
+            err = errors.Join(
+                errors.New("outbox processor stopped with unexpectedly"),
+                err,
+            )
             opt.logger.Error(opt.ctx, err)
         }
     }()
+    }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if opt.writeToOutbox {
go func() {
err := PurgeOutboxForever(
opt.ctx,
s.listOutbox,
s.DeleteOutboxEvent,
opt.eventStreamer,
opt.logger,
10*time.Millisecond,
1000,
)
if opt.writeToOutbox {
if opt.ctx == nil || opt.eventStreamer == nil || opt.logger == nil {
// Misconfigured outbox: missing ctx/streamer/logger; skip starting to avoid panic.
} else {
go func() {
err := PurgeOutboxForever(
opt.ctx,
s.listOutbox,
s.DeleteOutboxEvent,
opt.eventStreamer,
opt.logger,
10*time.Millisecond,
1000,
)
if errors.Is(err, context.Canceled) {
opt.logger.Debug(opt.ctx, "outbox processor stopped", map[string]string{})
} else if err != nil {
err = errors.Join(
errors.New("outbox processor stopped with unexpectedly"),
err,
)
opt.logger.Error(opt.ctx, err)
}
}()
}
}
🤖 Prompt for AI Agents
In adapters/memrecordstore/memrecordstore.go around lines 36 to 46, the
goroutine starting PurgeOutboxForever can panic if opt.ctx, opt.eventStreamer,
or opt.logger are nil; before launching the goroutine check that
opt.writeToOutbox is true AND opt.ctx != nil AND opt.eventStreamer != nil AND
opt.logger != nil (or otherwise handle missing dependencies), and only then
start the goroutine; if any required value is nil, log a warning via the
available logger or skip starting the outbox processor to avoid a runtime panic.

@coderabbitai coderabbitai bot requested a review from echarrod September 9, 2025 16:43
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (7)
schedule.go (1)

99-103: Micro-optimisation: skip timer when duration is non-positive

Avoid allocating a timer if the deadline has already passed.

Apply this diff:

 timeDiffAsDuration := until.Sub(clock.Now())

-t := clock.NewTimer(timeDiffAsDuration)
-defer t.Stop()
+if timeDiffAsDuration <= 0 {
+	return nil
+}
+t := clock.NewTimer(timeDiffAsDuration)
+defer t.Stop()
adapters/kafkastreamer/kafka.go (1)

254-256: Handle negative durations too

Guard for d <= 0, not just == 0. Behaviour stays the same for positives.

-if d == 0 {
+if d <= 0 {
 	return nil
 }
step.go (2)

211-214: Also guard for negative durations

Mirror the kafka wait change to avoid creating a timer for d < 0.

-if d == 0 {
+if d <= 0 {
 	return nil
 }

211-225: Reduce duplication of wait helpers

There are now three very similar wait functions (here, schedule.go, kafkastreamer). Consider a small shared helper to keep behaviour uniform and simplify future fixes.

workflow_test.go (3)

106-117: Nice: exercise external outbox + disabled internal consumer

Wiring memrecordstore.WithOutbox with a shared event streamer while disabling the internal outbox aligns with the PR’s goal.

To keep CI logs quieter, consider discarding logs:

-recordStore := memrecordstore.New(memrecordstore.WithOutbox(ctx, eventStreamer, logger.New(os.Stdout)))
+recordStore := memrecordstore.New(memrecordstore.WithOutbox(ctx, eventStreamer, logger.New(io.Discard)))

171-203: Add an explicit assertion that the outbox consumer is disabled

This test proves forward progress; also assert the process set excludes the outbox consumer.

 wf.Run(ctx)
 t.Cleanup(wf.Stop)

+if _, ok := wf.States()["outbox-consumer"]; ok {
+	t.Fatalf("outbox-consumer should be disabled")
+}
 
 fid := strconv.FormatInt(expectedUserID, 10)

695-722: Assert absence of outbox process

You verify expected processes; also assert “outbox-consumer” isn’t present to fully cover the toggle.

 w.Run(context.Background())
 t.Cleanup(w.Stop)

+require.NotContains(t, w.States(), "outbox-consumer")
+
 for process := range w.States() {
 	require.Truef(t, expected[process], "process '%s' is missing expected value", process)
 }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1a5527b and 31ecbd7.

📒 Files selected for processing (6)
  • adapters/kafkastreamer/kafka.go (1 hunks)
  • adapters/memrecordstore/outbox.go (1 hunks)
  • builder.go (1 hunks)
  • schedule.go (1 hunks)
  • step.go (1 hunks)
  • workflow_test.go (5 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • builder.go
  • adapters/memrecordstore/outbox.go
🧰 Additional context used
🧬 Code graph analysis (1)
workflow_test.go (4)
adapters/memrecordstore/memrecordstore.go (3)
  • New (17-60)
  • WithOutbox (73-80)
  • WithClock (82-86)
adapters/memstreamer/memstreamer.go (2)
  • New (12-33)
  • WithClock (41-45)
internal/logger/logger.go (1)
  • New (21-30)
builder.go (5)
  • WithTimeoutStore (289-293)
  • WithClock (307-311)
  • WithDebugMode (315-319)
  • WithoutOutbox (299-303)
  • NewBuilder (25-43)
🔇 Additional comments (4)
schedule.go (1)

102-103: Good: stop the timer to avoid leaks

Deferring Stop prevents a stray timer when the context path returns early. Looks solid.

adapters/kafkastreamer/kafka.go (1)

259-260: Good: add timer Stop for context-cancel path

This prevents a lingering timer if ctx is cancelled.

step.go (1)

217-218: Good: stop the timer to avoid leaks

Matches the pattern elsewhere; safe and consistent.

workflow_test.go (1)

8-9: LGTM: imports for outbox wiring and logging

New imports are appropriate for the outbox-enabled memrecordstore wiring.

Also applies to: 21-21

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (7)
adapters/memrecordstore/outbox.go (3)

15-18: Add GoDoc comments for exported types

OutboxLister and OutboxDeleter are exported; add short comments to satisfy linting and improve discoverability.

Apply this diff:

 type (
-	OutboxLister  func(limit int64) ([]workflow.OutboxEvent, error)
-	OutboxDeleter func(ctx context.Context, id string) error
+	// OutboxLister lists up to limit outbox events in creation order.
+	OutboxLister func(limit int64) ([]workflow.OutboxEvent, error)
+	// OutboxDeleter deletes a single outbox event by ID.
+	OutboxDeleter func(ctx context.Context, id string) error
 )

61-65: Consider passing context into the lister

OutboxLister doesn’t accept a context, so a slow or blocking list call can’t be cancelled. Consider OutboxLister(ctx context.Context, limit int64) for consistency with Deleter.

Would updating the memrecordstore lister to accept context be feasible in this PR, or do other call sites make this change noisy?


110-124: Utility duplication

wait is now implemented in multiple packages with the same semantics. If convenient, consider centralising to reduce duplication.

workflow_test.go (4)

106-117: Silence outbox logs to keep tests clean.

Switch the outbox logger writer to io.Discard to avoid noisy stdout during test runs.

-	recordStore := memrecordstore.New(memrecordstore.WithOutbox(ctx, eventStreamer, logger.New(os.Stdout)))
+	recordStore := memrecordstore.New(memrecordstore.WithOutbox(ctx, eventStreamer, logger.New(io.Discard)))

8-8: Drop os import if logs are silenced.

If you adopt io.Discard in the previous suggestion, remove the os import.

Also applies to: 21-21


171-203: Decouple from global acceptance constants.

This unit test doesn’t need expectedUserID; use a local foreign ID to reduce coupling.

-	fid := strconv.FormatInt(expectedUserID, 10)
+	fid := "1"

695-722: Assert exact process set, not just absence of unexpected entries.

Strengthen the assertion to check both directions (all expected present and no extras).

-	for process := range w.States() {
-		require.Truef(t, expected[process], "process '%s' is missing expected value", process)
-	}
+	actual := map[string]bool{}
+	for process := range w.States() {
+		actual[process] = true
+	}
+	require.Equal(t, len(expected), len(actual), "process set size mismatch")
+	for process := range expected {
+		require.Truef(t, actual[process], "process '%s' not present", process)
+	}
+	for process := range actual {
+		require.Truef(t, expected[process], "unexpected process '%s'", process)
+	}

Consider mirroring this stronger check in TestExpectedProcesses as well.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1a5527b and 31ecbd7.

📒 Files selected for processing (6)
  • adapters/kafkastreamer/kafka.go (1 hunks)
  • adapters/memrecordstore/outbox.go (1 hunks)
  • builder.go (1 hunks)
  • schedule.go (1 hunks)
  • step.go (1 hunks)
  • workflow_test.go (5 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • builder.go
🧰 Additional context used
🧬 Code graph analysis (2)
workflow_test.go (4)
adapters/memrecordstore/memrecordstore.go (2)
  • New (17-60)
  • WithOutbox (73-80)
adapters/memstreamer/memstreamer.go (1)
  • New (12-33)
internal/logger/logger.go (1)
  • New (21-30)
builder.go (2)
  • WithoutOutbox (299-303)
  • NewBuilder (25-43)
adapters/memrecordstore/outbox.go (3)
event.go (1)
  • OutboxEvent (47-60)
eventstreamer.go (3)
  • EventStreamer (10-13)
  • Header (34-34)
  • HeaderTopic (39-39)
logger.go (1)
  • Logger (14-19)
🔇 Additional comments (4)
step.go (1)

217-218: Good addition: stop the timer

Deferring t.Stop() avoids leaking timer resources on early returns.

schedule.go (1)

102-103: Good addition: stop the timer

Deferring t.Stop() prevents timer/goroutine leaks when the context path wins.

adapters/kafkastreamer/kafka.go (1)

259-260: Good addition: stop the timer

Deferring t.Stop() is correct here and aligns with the pattern used elsewhere.

workflow_test.go (1)

106-117: Outbox wiring matches intent (store purger + disabled internal).

Using memrecordstore.WithOutbox alongside workflow.WithoutOutbox is the correct composition for centralised purging.

Comment on lines +66 to +68
if len(events) == 0 {
return wait(ctx, pollingFrequency)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid tight spin when pollingFrequency is zero

If pollingFrequency == 0, this returns immediately and PurgeOutboxForever busy-loops at 100% CPU. Clamp to a sensible minimum.

Apply this diff:

-	if len(events) == 0 {
-		return wait(ctx, pollingFrequency)
-	}
+	if len(events) == 0 {
+		d := pollingFrequency
+		if d <= 0 {
+			d = 250 * time.Millisecond
+		}
+		return wait(ctx, d)
+	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if len(events) == 0 {
return wait(ctx, pollingFrequency)
}
if len(events) == 0 {
d := pollingFrequency
if d <= 0 {
d = 250 * time.Millisecond
}
return wait(ctx, d)
}
🤖 Prompt for AI Agents
In adapters/memrecordstore/outbox.go around lines 66-68, the early return when
pollingFrequency == 0 causes a busy spin; clamp pollingFrequency to a sensible
minimum before waiting. Change the logic so that if pollingFrequency <= 0 you
set pollingFrequency to a small safe default (e.g. 10ms or 100ms) and then call
wait(ctx, clampedPollingFrequency) instead of returning immediately; ensure the
clamped value is used consistently so PurgeOutboxForever cannot busy-loop.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
workflow_test.go (4)

8-8: Keep tests quiet: avoid stdout logging

Use io.Discard for the test logger and drop the os import to prevent noisy CI logs.

-import (
-	"context"
-	"encoding/json"
-	"fmt"
-	"io"
-	"os"
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"io"
 	"strconv"
 	"testing"
 	"time"
-	recordStore := memrecordstore.New(memrecordstore.WithOutbox(ctx, eventStreamer, logger.New(os.Stdout)))
+	recordStore := memrecordstore.New(memrecordstore.WithOutbox(ctx, eventStreamer, logger.New(io.Discard)))

Also applies to: 21-21


106-117: Intent check: outbox disabled in acceptance test while memrecordstore outbox is enabled

This flips the acceptance path to rely on the store’s purge loop instead of the workflow’s internal outbox consumer. Ensure we still have at least one end-to-end test exercising the internal outbox enabled path (i.e. no workflow.WithoutOutbox and no memrecordstore.WithOutbox), not just a states() assertion.


171-203: Minor assertions/style: prefer require.NoError

Use require.NoError for error checks.

-	runID, err := wf.Trigger(ctx, fid)
-	require.Nil(t, err)
+	runID, err := wf.Trigger(ctx, fid)
+	require.NoError(t, err)

-	_, err = wf.Await(ctx, fid, runID, StatusCompleted)
-	require.Nil(t, err)
+	_, err = wf.Await(ctx, fid, runID, StatusCompleted)
+	require.NoError(t, err)

695-722: Strengthen process-set assertions to avoid false positives

Current check won’t fail if an expected process is missing. Compare both directions and counts.

-	for process := range w.States() {
-		require.Truef(t, expected[process], "process '%s' is missing expected value", process)
-	}
+	states := w.States()
+	require.Equal(t, len(expected), len(states), "unexpected process count")
+	for process := range states {
+		require.Truef(t, expected[process], "unexpected process '%s'", process)
+	}
+	for process := range expected {
+		require.Contains(t, states, process, "missing expected process '%s'", process)
+	}

Consider applying the same pattern to TestExpectedProcesses for symmetry.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 31ecbd7 and 3d33e41.

📒 Files selected for processing (3)
  • adapters/memrecordstore/memrecordstore.go (3 hunks)
  • adapters/memrecordstore/memrecordstore_test.go (1 hunks)
  • workflow_test.go (5 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • adapters/memrecordstore/memrecordstore_test.go
  • adapters/memrecordstore/memrecordstore.go
🧰 Additional context used
🧬 Code graph analysis (1)
workflow_test.go (5)
adapters/memrecordstore/memrecordstore.go (3)
  • New (17-64)
  • WithOutbox (77-84)
  • WithClock (86-90)
adapters/memstreamer/memstreamer.go (2)
  • New (12-33)
  • WithClock (41-45)
internal/logger/logger.go (1)
  • New (21-30)
builder.go (5)
  • WithTimeoutStore (289-293)
  • WithClock (307-311)
  • WithDebugMode (315-319)
  • WithoutOutbox (299-303)
  • NewBuilder (25-43)
adapters/memrecordstore/memrecordstore_test.go (1)
  • TestOutboxDisabled (47-78)

Copy link

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
adapters/adaptertest/eventstreaming.go (1)

69-85: Always call wg.Done even on assertion failures.

If an assertion inside the goroutine fails, the test can hang on wg.Wait(). Defer Done at the top; remove the explicit call at the bottom.

-           wg.Add(1)
-           go func() {
+           wg.Add(1)
+           go func() {
+                   defer wg.Done()
                    go func() {
                            if err := sender.Send(ctx, "789", 5, map[workflow.Header]string{
                                    workflow.HeaderTopic: topic,
                            }); err != nil {
                                    t.Errorf("Send failed: %v", err)
                            }
                    }()
                    e, ack, err := receiver.Recv(ctx)
                    require.NoError(t, err)
                    require.Equal(t, "789", e.ForeignID)

                    err = ack()
                    require.NoError(t, err)

-                   wg.Done()
            }()
metrics_test.go (1)

200-213: Normalise spacing in expected metrics text Remove the extra space after the comma before workflow_name so all labels read {process_name="…",workflow_name="…"}. Fix metrics_test.go at lines 205–206 (and similarly in 220–233).

step_internal_test.go (1)

233-246: Bug: stale err checked; stepConsumer doesn’t return an error on construction.

require.NoError(t, err) at Line 245 references an outer-scoped err unrelated to this subtest and is meaningless here.

Apply:

 		consume := stepConsumer(
 			w.Name(),
 			"",
 			consumer,
 			testStatus(current.Status),
 			lookup,
 			store,
 			w.logger,
 			updater,
 			3,
 			w.errorCounter,
 		)
-		require.NoError(t, err)
♻️ Duplicate comments (1)
adapters/memrecordstore/outbox.go (1)

66-68: Avoid CPU spin when pollingFrequency is zero.

With d==0, wait returns immediately and the loop busy-spins. Clamp to a sane minimum.

Apply either fix:

Option A (local clamp):

- if len(events) == 0 {
-   return wait(ctx, pollingFrequency)
- }
+ if len(events) == 0 {
+   d := pollingFrequency
+   if d <= 0 {
+     d = 250 * time.Millisecond
+   }
+   return wait(ctx, d)
+ }

Option B (central clamp in wait):

-func wait(ctx context.Context, d time.Duration) error {
-  if d == 0 {
-    return nil
-  }
+func wait(ctx context.Context, d time.Duration) error {
+  if d <= 0 {
+    d = 250 * time.Millisecond
+  }

Also applies to: 121-124

🧹 Nitpick comments (17)
_examples/timeout/timeout_test.go (1)

35-35: Tiny improvement: assert runID is set

Optional, but asserting non-empty runID can catch unexpected regressions.

 runID, err := wf.Trigger(ctx, foreignID)
 require.NoError(t, err)
+require.NotEmpty(t, runID)
adapters/adaptertest/eventstreaming.go (1)

93-103: Nit: “affect” → “effect” in test name.

Tiny grammar fix in the subtest description.

-       t.Run("StreamFromLatest should have no affect when offset is committed", func(t *testing.T) {
+       t.Run("StreamFromLatest should have no effect when offset is committed", func(t *testing.T) {
delete_internal_test.go (1)

101-111: Prefer require.ErrorIs over errors.Is wrapped in require.True.

Improves readability and error diffs.

-           require.True(t, errors.Is(err, tc.expectedErr))
+           require.ErrorIs(t, err, tc.expectedErr)
metrics_test.go (1)

80-91: Reduce flakiness: avoid fixed sleeps before metric assertions.

Use Eventually/polling, then perform the strict compare.

Example pattern:

require.Eventually(t, func() bool {
    return testutil.CollectAndCount(metrics.ConsumerLag) >= 2
}, time.Second, 25*time.Millisecond)

require.NoError(t, testutil.CollectAndCompare(metrics.ConsumerLag, strings.NewReader(expected)))
testing.go (2)

56-64: Prevent CPU spin in AwaitTimeoutInsert loop.

Add a short sleep/backoff when not found to avoid a tight loop.

   for !found {
       if w.ctx.Err() != nil {
           return
       }

       ls, err := w.timeoutStore.List(w.ctx, w.Name())
       require.NoError(t, err)
+      // Avoid busy loop when the timeout hasn’t been inserted yet.
+      if len(ls) == 0 {
+          time.Sleep(10 * time.Millisecond)
+          continue
+      }

Add time to imports.

import (
    // ...
    "time"
)

164-174: Avoid busy-waiting in waitFor.

Sleep briefly when Latest/Snapshots are not ready to reduce test flakiness and CPU usage.

   for runID == "" {
       latest, err := w.recordStore.Latest(context.Background(), w.Name(), foreignID)
       if errors.Is(err, ErrRecordNotFound) {
-          continue
+          time.Sleep(10 * time.Millisecond)
+          continue
       } else {
           require.NoError(t, err)
       }
       runID = latest.RunID
   }

   var wr Record
   for wr.RunID == "" {
       snapshots := testingStore.Snapshots(w.Name(), foreignID, runID)
+      if len(snapshots) == 0 {
+          time.Sleep(10 * time.Millisecond)
+          continue
+      }
       for _, r := range snapshots {
           ok, err := fn(r)
           require.NoError(t, err)
           if ok {
               wr = *r
           }
       }
   }

Also applies to: 179-190

_examples/schedule/schedule_test.go (1)

37-41: Avoid asserting inside a detached goroutine.

Capture the error and assert in the main goroutine to prevent missed failures.

-   go func() {
-       err := wf.Schedule(foreignID, "@hourly")
-       require.NoError(t, err)
-   }()
+   errCh := make(chan error, 1)
+   go func() { errCh <- wf.Schedule(foreignID, "@hourly") }()
+   require.NoError(t, <-errCh)
schedule_test.go (2)

50-56: Stabilise tests: avoid racy WaitGroup + sleeps.

The goroutine calls wg.Done() before Schedule returns, then relies on Sleep. Prefer synchronous calls or Eventually/asserting on a condition (e.g. Latest succeeds) to avoid flakes.

Example refactor (first test):

- var wg sync.WaitGroup
- wg.Add(1)
- go func() {
-   wg.Done()
-   err := wf.Schedule("andrew", "@monthly")
-   require.NoError(t, err)
- }()
- wg.Wait()
- // Allow scheduling to take place
- time.Sleep(10 * time.Millisecond)
+ require.NoError(t, wf.Schedule("andrew", "@monthly"))
+ require.Eventually(t, func() bool {
+   _, err := recordStore.Latest(ctx, workflowName, "andrew")
+   return err == nil
+ }, time.Second, 5*time.Millisecond)

Also applies to: 57-61, 110-116, 118-121, 179-186, 192-207


172-176: Naming/comment mismatch in filter test.

Variable name “shouldSkip” and comment “Disable the filter to enable scheduling” conflict with setting it to true. Consider renaming to “allowSchedule” or correcting the comment for clarity.

- skipVal := false
- shouldSkip := &skipVal
- filter := func(ctx context.Context) (bool, error) {
-   return *shouldSkip, nil
- }
+ allow := false
+ filter := func(ctx context.Context) (bool, error) {
+   return allow, nil
+ }
...
- *shouldSkip = true
+ allow = true

Also applies to: 199-203

runstate_test.go (1)

74-81: Reduce reliance on sleeps.

Use Await/Eventually against store state instead of fixed sleeps to speed up and deflake.

adapters/adaptertest/recordstore.go (2)

137-160: Minor clean-up in DeleteOutboxEvent test.

latest, err := store.Lookup(...) then latest.Status = ... is unused; consider removing to avoid confusion.

- latest, err := store.Lookup(ctx, expected.RunID)
- require.NoError(t, err)
-
- latest.Status = int(statusMiddle)

441-453: Export field or add JSON tag to reflect intent.

Unexported field name won’t be marshalled. If you intended to persist it, export or tag it.

- type example struct {
-   name string
- }
+ type example struct {
+   Name string `json:"name"`
+ }
- e := example{name: foreignID}
+ e := example{Name: foreignID}
adapters/memrecordstore/outbox.go (3)

15-18: Pass context into OutboxLister to avoid blocking on cancel.

Without ctx, lister cannot be interrupted. Consider making it func(ctx context.Context, limit int64) (...).

- OutboxLister  func(limit int64) ([]workflow.OutboxEvent, error)
+ OutboxLister  func(ctx context.Context, limit int64) ([]workflow.OutboxEvent, error)

And update call sites:

- events, err := lister(lookupLimit)
+ events, err := lister(ctx, lookupLimit)

I can update callers if you want.


29-51: Error handling/backoff: add jitter and cap logs.

On persistent failures this will log every second forever. Add jitter and exponential backoff to reduce thundering-herd and log noise.

- if err := wait(ctx, time.Second); ...
+ // simple jittered backoff
+ backoff := time.Second + time.Duration(rand.Intn(500))*time.Millisecond
+ if err := wait(ctx, backoff); ...

79-116: Poison-pill handling and at-least-once semantics.

A permanently bad event blocks subsequent ones in the same batch. Consider per-event retry counters with DLQ after N attempts, and document at-least-once delivery (or include a stable event ID header for consumer idempotency).

workflow_test.go (2)

170-202: Tighten TestOutboxDisabled to focus on “internal outbox off”

The test purpose doesn’t need the record store’s outbox goroutine. Dropping memrecordstore.WithOutbox keeps the test minimal and avoids an extra background worker.

Apply this diff:

-	eventStreamer := memstreamer.New()
-	recordStore := memrecordstore.New(memrecordstore.WithOutbox(ctx, eventStreamer, logger.New(io.Discard)))
+	eventStreamer := memstreamer.New()
+	recordStore := memrecordstore.New()

704-731: Also assert completeness of expected processes when outbox is disabled

Currently this only checks that no unexpected processes exist; it doesn’t fail if an expected process is missing. Mirror the checks from TestExpectedProcesses for full set equality.

Apply this diff:

-	for process := range w.States() {
-		require.Truef(t, expected[process], "process '%s' is missing expected value", process)
-	}
+	states := w.States()
+	require.Equal(t, len(expected), len(states), "unexpected process count")
+	for process := range states {
+		require.Truef(t, expected[process], "unexpected process '%s'", process)
+	}
+	for process := range expected {
+		require.Contains(t, states, process, "missing expected process '%s'", process)
+	}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3d33e41 and 1b3a1a8.

📒 Files selected for processing (31)
  • _examples/callback/callback_test.go (1 hunks)
  • _examples/gettingstarted/gettingstarted_test.go (1 hunks)
  • _examples/schedule/schedule_test.go (3 hunks)
  • _examples/timeout/timeout_test.go (1 hunks)
  • adapters/adaptertest/eventstreaming.go (3 hunks)
  • adapters/adaptertest/recordstore.go (23 hunks)
  • adapters/adaptertest/rolescheduler.go (3 hunks)
  • adapters/adaptertest/timeoutstore.go (3 hunks)
  • adapters/memrecordstore/memrecordstore_test.go (1 hunks)
  • adapters/memrecordstore/outbox.go (1 hunks)
  • adapters/reflexstreamer/streamfunc_test.go (2 hunks)
  • adapters/reflexstreamer/util_test.go (1 hunks)
  • await_test.go (1 hunks)
  • callback_internal_test.go (5 hunks)
  • delete_internal_test.go (3 hunks)
  • eventfilter_test.go (1 hunks)
  • hook_internal_test.go (2 hunks)
  • hook_test.go (3 hunks)
  • metrics_test.go (11 hunks)
  • run_test.go (1 hunks)
  • runstate_internal_test.go (1 hunks)
  • runstate_test.go (3 hunks)
  • schedule_test.go (6 hunks)
  • step_internal_test.go (6 hunks)
  • testing.go (6 hunks)
  • testing_test.go (2 hunks)
  • timeout_internal_test.go (2 hunks)
  • visualiser_test.go (1 hunks)
  • workflow_internal_test.go (4 hunks)
  • workflow_test.go (10 hunks)
  • workflowpb/util_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • adapters/memrecordstore/memrecordstore_test.go
🧰 Additional context used
🧬 Code graph analysis (9)
testing.go (2)
marshal.go (1)
  • Marshal (8-10)
unmarshal.go (1)
  • Unmarshal (6-8)
run_test.go (1)
status.go (1)
  • SkipTypeRunStateUpdate (27-27)
runstate_test.go (2)
runstate.go (6)
  • RunStateInitiated (13-13)
  • RunState (9-9)
  • NewRunStateController (90-95)
  • RunStatePaused (15-15)
  • RunStateRunning (14-14)
  • RunStateCancelled (16-16)
record.go (1)
  • Meta (51-77)
adapters/reflexstreamer/util_test.go (1)
adapters/reflexstreamer/util.go (1)
  • DefaultConnectorTranslator (31-44)
workflowpb/util_test.go (1)
workflowpb/util.go (1)
  • UnmarshalRecord (32-49)
metrics_test.go (1)
marshal.go (1)
  • Marshal (8-10)
adapters/adaptertest/eventstreaming.go (2)
eventstreamer.go (3)
  • Header (34-34)
  • HeaderTopic (39-39)
  • StreamFromLatest (62-66)
testing.go (1)
  • AwaitTimeoutInsert (41-82)
workflow_test.go (5)
adapters/memrecordstore/memrecordstore.go (3)
  • New (17-64)
  • WithOutbox (77-84)
  • WithClock (86-90)
adapters/memstreamer/memstreamer.go (2)
  • New (12-33)
  • WithClock (41-45)
internal/logger/logger.go (1)
  • New (21-30)
builder.go (5)
  • WithTimeoutStore (289-293)
  • WithClock (307-311)
  • WithDebugMode (315-319)
  • WithoutOutbox (299-303)
  • NewBuilder (25-43)
adapters/memrecordstore/memrecordstore_test.go (1)
  • TestOutboxDisabled (47-78)
adapters/memrecordstore/outbox.go (3)
event.go (1)
  • OutboxEvent (47-60)
eventstreamer.go (4)
  • EventStreamer (10-13)
  • EventSender (17-20)
  • Header (34-34)
  • HeaderTopic (39-39)
logger.go (1)
  • Logger (14-19)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: core (1.24)
🔇 Additional comments (37)
_examples/gettingstarted/gettingstarted_test.go (1)

30-30: LGTM: clearer assertion

Switching to require.NoError improves failure messages and intent.

adapters/reflexstreamer/streamfunc_test.go (1)

28-28: LGTM: use require.NoError after Trigger

Good consistency and clearer diagnostics on failure.

Also applies to: 70-70

timeout_internal_test.go (1)

30-30: LGTM: explicit NoError assertions

Reads better than comparing to nil and yields better error output.

Also applies to: 200-200

runstate_internal_test.go (1)

15-15: LGTM: assertion style modernised

No behaviour change; improves readability across tests.

Also applies to: 19-19, 22-22, 25-25

hook_test.go (1)

33-33: LGTM: NoError after Trigger

Consistent with the project-wide shift and improves test clarity.

Also applies to: 55-55, 77-77

adapters/adaptertest/rolescheduler.go (1)

32-32: LGTM: clearer error assertions

Good consistency across adapter tests; improves failure output.

Also applies to: 50-50, 83-83

eventfilter_test.go (1)

61-61: LGTM: require.NoError for UUID creation

Better intent signalling and diagnostics.

_examples/callback/callback_test.go (1)

28-28: LGTM: Prefer require.NoError for clearer failure messages.
Consistent with repo-wide assertion style; no semantic change.

hook_internal_test.go (2)

42-42: LGTM: require.NoError here reads better than Nil.
Matches intent to treat unmarshal failure as non-fatal for the hook executor.


53-53: LGTM: Assert no marshal error explicitly.
Improves diagnostics without changing behaviour.

workflow_internal_test.go (4)

80-80: LGTM: Switch to NoError.
Clearer assertion; unchanged semantics.


108-108: LGTM: NoError improves readability.
Keeps the retry expectation explicit.


151-151: LGTM: Use NoError over Nil.
Aligns with project style.


176-176: LGTM: NoError for state update test.
No functional change.

await_test.go (2)

37-37: LGTM: Prefer NoError for trigger assertions.
Clearer failure output.


40-40: LGTM: Prefer NoError for await assertions.
Consistent with suite-wide changes.

testing_test.go (2)

114-114: LGTM: NoError after Trigger.
Improves error visibility on failure.


185-185: LGTM: NoError after Trigger.
Consistent assertion style.

visualiser_test.go (1)

26-26: LGTM: NoError for diagram creation.
Better diagnostics; no behaviour change.

run_test.go (2)

16-16: LGTM: NoError after Pause.
Clearer than Nil.


20-20: LGTM: NoError after Cancel.
Consistent with preceding assertion.

workflowpb/util_test.go (2)

27-27: LGTM: Assert NoError on proto marshal.
Improves failure messaging.


30-30: LGTM: Assert NoError on unmarshal.
Consistent with test intent.

adapters/reflexstreamer/util_test.go (1)

65-69: Use of require.NoError is idiomatic.

Good swap from Nil to NoError; clearer failure messages.

callback_internal_test.go (1)

29-31: NoError assertions look good.

Concise and consistent across subtests.

Also applies to: 79-81, 119-121, 157-159, 200-202

adapters/adaptertest/timeoutstore.go (1)

33-40: LGTM on NoError swaps.

More descriptive failures without altering behaviour.

Also applies to: 56-57, 83-84

schedule_test.go (2)

55-56: LGTM: Switched to require.NoError for clearer intent.

The updated assertions improve failure messages and match common testify usage.

Also applies to: 74-75, 77-78, 86-87, 115-116, 184-185, 209-213


122-129: Confirm assumption: outbox enabled by default.

This test asserts “outbox-consumer” running/shutdown. Please verify the new “disable outbox” option keeps the default as enabled; otherwise this test will fail.

Also applies to: 132-139

step_internal_test.go (2)

33-34: LGTM: require.NoError assertions.

Solid switch; improves diagnostics.

Also applies to: 94-95, 145-146, 194-195


253-255: LGTM: pause threshold behaviour verified.

Third call returns nil and triggers store as expected.

runstate_test.go (1)

72-73: LGTM: require.NoError migration across run state tests.

Consistent, clearer assertions.

Also applies to: 124-125, 139-140, 143-145, 149-150, 154-155, 156-158, 162-164, 165-167, 170-172, 173-175, 179-181, 182-184

adapters/adaptertest/recordstore.go (1)

41-45: LGTM: assertion modernisation.

The NoError updates look good throughout.

Also applies to: 51-55, 66-70, 81-86, 91-97, 101-103, 115-119, 127-128, 151-158, 178-179, 182-187, 193-194, 201-206, 215-216, 219-220, 229-230, 247-248, 259-260, 270-271, 281-282, 302-303, 316-317, 333-334, 368-369, 381-382, 400-401, 414-415, 425-426, 435-436, 444-445, 452-453

adapters/memrecordstore/outbox.go (1)

70-77: Good: sender reuse and cleanup.

Reusing per-topic senders and closing them prevents connection churn and leaks.

workflow_test.go (4)

20-20: OK to import internal logger here

Using the internal logger in tests is fine since they live within the module path.


105-116: Outbox wiring matches the PR intent

Explicit event streamer + memrecordstore.WithOutbox paired with workflow.WithoutOutbox cleanly disables the internal consumer while keeping store-level purge active.


128-128: Good swap to require.NoError

Clearer failure messages and consistent style across tests.

Also applies to: 150-150, 153-153, 159-159, 315-315, 323-323, 473-473, 526-526, 640-640, 646-646


693-702: Nice strengthening of process-set assertions

Asserting both count and membership prevents silent omissions and unexpected additions.

Comment on lines 71 to 75
err = sender.Send(ctx, "789", 5, map[workflow.Header]string{
workflow.HeaderTopic: topic,
})
require.Nil(t, err)
require.NoError(t, err)
}()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Fix data race on shared variable ‘err’ inside goroutine.

The inner goroutine writes to the outer-scoped err, while the outer goroutine also assigns to err. Shadow locally to avoid a race.

-               go func() {
-                       err = sender.Send(ctx, "789", 5, map[workflow.Header]string{
-                               workflow.HeaderTopic: topic,
-                       })
-                       require.NoError(t, err)
-               }()
+               go func() {
+                       if err := sender.Send(ctx, "789", 5, map[workflow.Header]string{
+                               workflow.HeaderTopic: topic,
+                       }); err != nil {
+                               t.Errorf("Send failed: %v", err)
+                       }
+               }()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
err = sender.Send(ctx, "789", 5, map[workflow.Header]string{
workflow.HeaderTopic: topic,
})
require.Nil(t, err)
require.NoError(t, err)
}()
go func() {
if err := sender.Send(ctx, "789", 5, map[workflow.Header]string{
workflow.HeaderTopic: topic,
}); err != nil {
t.Errorf("Send failed: %v", err)
}
}()
🤖 Prompt for AI Agents
In adapters/adaptertest/eventstreaming.go around lines 71 to 75, the goroutine
writes to the outer-scoped variable `err`, causing a data race; change the
assignment inside the goroutine from `err = sender.Send(...)` to a short
declaration `err := sender.Send(...)` so `err` is shadowed locally within the
goroutine (or alternatively use a dedicated local name like `gErr := ...`), and
keep the existing `require.NoError(t, err)` to assert the local error — this
prevents concurrent writes to the outer `err`.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant