Skip to content
Open
2 changes: 1 addition & 1 deletion _examples/callback/callback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestCallbackWorkflow(t *testing.T) {

foreignID := "andrew"
runID, err := wf.Trigger(ctx, foreignID)
require.Nil(t, err)
require.NoError(t, err)

workflow.TriggerCallbackOn(t, wf, foreignID, runID, callback.StatusStarted, callback.EmailConfirmationResponse{
Confirmed: true,
Expand Down
2 changes: 1 addition & 1 deletion _examples/gettingstarted/gettingstarted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestWorkflow(t *testing.T) {

foreignID := "82347982374982374"
_, err := wf.Trigger(ctx, foreignID)
require.Nil(t, err)
require.NoError(t, err)

workflow.Require(t, wf, foreignID, gettingstarted.StatusReadTheDocs, gettingstarted.GettingStarted{
ReadTheDocs: "✅",
Expand Down
6 changes: 3 additions & 3 deletions _examples/schedule/schedule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestExampleWorkflow(t *testing.T) {

go func() {
err := wf.Schedule(foreignID, "@hourly")
require.Nil(t, err)
require.NoError(t, err)
}()

// Give time for go routine to spin up
Expand All @@ -52,7 +52,7 @@ func TestExampleWorkflow(t *testing.T) {
time.Sleep(200 * time.Millisecond)

firstScheduled, err := recordStore.Latest(ctx, wf.Name(), foreignID)
require.Nil(t, err)
require.NoError(t, err)

require.Equal(t, "schedule trigger example", firstScheduled.WorkflowName)
require.Equal(t, "hourly-run", firstScheduled.ForeignID)
Expand All @@ -63,7 +63,7 @@ func TestExampleWorkflow(t *testing.T) {
time.Sleep(200 * time.Millisecond)

secondScheduled, err := recordStore.Latest(ctx, wf.Name(), foreignID)
require.Nil(t, err)
require.NoError(t, err)

require.Equal(t, "schedule trigger example", secondScheduled.WorkflowName)
require.Equal(t, "hourly-run", secondScheduled.ForeignID)
Expand Down
3 changes: 2 additions & 1 deletion _examples/timeout/timeout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ func TestTimeoutWorkflow(t *testing.T) {

foreignID := "andrew"
runID, err := wf.Trigger(ctx, foreignID)
require.Nil(t, err)
require.NoError(t, err)
require.NotEmpty(t, runID)

workflow.AwaitTimeoutInsert(t, wf, foreignID, runID, timeout.StatusStarted)

Expand Down
24 changes: 12 additions & 12 deletions adapters/adaptertest/eventstreaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,18 @@ func RunEventStreamerTest(t *testing.T, factory func() workflow.EventStreamer) {
wg.Add(1)
go func() {
go func() {
err = sender.Send(ctx, "789", 5, map[workflow.Header]string{
err := sender.Send(ctx, "789", 5, map[workflow.Header]string{
workflow.HeaderTopic: topic,
})
require.Nil(t, err)
require.NoError(t, err)
}()

e, ack, err := receiver.Recv(ctx)
require.Nil(t, err)
require.NoError(t, err)
require.Equal(t, "789", e.ForeignID)

err = ack()
require.Nil(t, err)
require.NoError(t, err)

Comment on lines 77 to 83
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Guard Recv with a timeout to prevent hung tests

If the send path fails, Recv(ctx) can block indefinitely. Use a short-lived context to fail fast and avoid wg.Wait() hanging.

-				e, ack, err := receiver.Recv(ctx)
+				recvCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
+				defer cancel()
+				e, ack, err := receiver.Recv(recvCtx)
 				require.NoError(t, err)
 				require.Equal(t, "789", e.ForeignID)
 
 				err = ack()
 				require.NoError(t, err)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
e, ack, err := receiver.Recv(ctx)
require.Nil(t, err)
require.NoError(t, err)
require.Equal(t, "789", e.ForeignID)
err = ack()
require.Nil(t, err)
require.NoError(t, err)
recvCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
e, ack, err := receiver.Recv(recvCtx)
require.NoError(t, err)
require.Equal(t, "789", e.ForeignID)
err = ack()
require.NoError(t, err)
🤖 Prompt for AI Agents
In adapters/adaptertest/eventstreaming.go around lines 77 to 83, Recv(ctx) is
called with the test's root context and can block indefinitely if the send path
fails; wrap the call in a short-lived context with a timeout (e.g.
context.WithTimeout), defer its cancel, pass the timeout context to
receiver.Recv, and assert the returned error is not a context deadline exceeded
(or fail the test explicitly) so the test fails fast instead of hanging on
wg.Wait().

wg.Done()
}()
Expand All @@ -88,24 +88,24 @@ func RunEventStreamerTest(t *testing.T, factory func() workflow.EventStreamer) {
})

err = receiver.Close()
require.Nil(t, err)
require.NoError(t, err)

t.Run("StreamFromLatest should have no affect when offset is committed", func(t *testing.T) {
t.Run("StreamFromLatest should have no effect when offset is committed", func(t *testing.T) {
err = sender.Send(ctx, "101", 5, map[workflow.Header]string{
workflow.HeaderTopic: topic,
})
require.Nil(t, err)
require.NoError(t, err)

secondReceiver, err := streamer.NewReceiver(ctx, topic, "my-receiver", workflow.StreamFromLatest())
require.Nil(t, err)
require.NoError(t, err)

// Should receive event send when receiver wasn't receiving events based on the offset being set.
e, ack, err := secondReceiver.Recv(ctx)
require.Nil(t, err)
require.NoError(t, err)
require.Equal(t, "101", e.ForeignID)

err = ack()
require.Nil(t, err)
require.NoError(t, err)
})
})

Expand Down Expand Up @@ -159,14 +159,14 @@ func RunEventStreamerTest(t *testing.T, factory func() workflow.EventStreamer) {
CountryCode: "GB",
}
runId, err := wf.Trigger(ctx, foreignID, workflow.WithInitialValue[User, SyncStatus](&u))
require.Nil(t, err)
require.NoError(t, err)

workflow.AwaitTimeoutInsert(t, wf, foreignID, runId, SyncStatusEmailSet)

clock.Step(time.Hour)

record, err := wf.Await(ctx, foreignID, runId, SyncStatusCompleted)
require.Nil(t, err)
require.NoError(t, err)

require.Equal(t, "[email protected]", record.Object.Email)
require.Equal(t, SyncStatusCompleted.String(), record.Status.String())
Expand Down
Loading