Skip to content

Commit 0b4e7f8

Browse files
committed
Add support for Ephemeral TaskLists
Update Sessions to use Ephemeral TaskLists behind a feature flag. This ensures that the per-host TaskList is automatically removed once it is no longer used. This should only be enabled once the server fully supports Ephemeral TaskLists as it will otherwise return errors for the unknown TaskListKind. Signed-off-by: Nathanael Mortensen <[email protected]>
1 parent 2737bd0 commit 0b4e7f8

27 files changed

+316
-109
lines changed

internal/activity_task_handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func newActivityTaskHandlerWithCustomProvider(
8080
}
8181
return &activityTaskHandlerImpl{
8282
clock: clock,
83-
taskListName: params.TaskList,
83+
taskListName: params.TaskList.GetName(),
8484
identity: params.Identity,
8585
service: service,
8686
logger: params.Logger,

internal/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,7 @@ func getFeatureFlags(options *ClientOptions) FeatureFlags {
656656
return FeatureFlags{
657657
WorkflowExecutionAlreadyCompletedErrorEnabled: options.FeatureFlags.WorkflowExecutionAlreadyCompletedErrorEnabled,
658658
PollerAutoScalerEnabled: options.FeatureFlags.PollerAutoScalerEnabled,
659+
EphemeralTaskListsEnabled: options.FeatureFlags.EphemeralTaskListsEnabled,
659660
}
660661
}
661662
return FeatureFlags{}

internal/compatibility/enum_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ func TestTaskListKind(t *testing.T) {
266266
apiv1.TaskListKind_TASK_LIST_KIND_INVALID,
267267
apiv1.TaskListKind_TASK_LIST_KIND_NORMAL,
268268
apiv1.TaskListKind_TASK_LIST_KIND_STICKY,
269+
apiv1.TaskListKind_TASK_LIST_KIND_EPHEMERAL,
269270
} {
270271
assert.Equal(t, item, proto.TaskListKind(thrift.TaskListKind(item)))
271272
}

internal/compatibility/proto/enum.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ func TaskListKind(t *shared.TaskListKind) apiv1.TaskListKind {
3535
return apiv1.TaskListKind_TASK_LIST_KIND_NORMAL
3636
case shared.TaskListKindSticky:
3737
return apiv1.TaskListKind_TASK_LIST_KIND_STICKY
38+
case shared.TaskListKindEphemeral:
39+
return apiv1.TaskListKind_TASK_LIST_KIND_EPHEMERAL
3840
}
3941
panic("unexpected enum value")
4042
}

internal/compatibility/thrift/enum.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ func TaskListKind(t apiv1.TaskListKind) *shared.TaskListKind {
3434
return shared.TaskListKindNormal.Ptr()
3535
case apiv1.TaskListKind_TASK_LIST_KIND_STICKY:
3636
return shared.TaskListKindSticky.Ptr()
37+
case apiv1.TaskListKind_TASK_LIST_KIND_EPHEMERAL:
38+
return shared.TaskListKindEphemeral.Ptr()
3739
}
3840
panic("unexpected enum value")
3941
}

internal/internal_activity.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ type (
5959
activityOptions struct {
6060
ActivityID *string // Users can choose IDs but our framework makes it optional to decrease the crust.
6161
TaskListName string
62+
TaskListKind shared.TaskListKind
6263
ScheduleToCloseTimeoutSeconds int32
6364
ScheduleToStartTimeoutSeconds int32
6465
StartToCloseTimeoutSeconds int32

internal/internal_event_handlers.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ type (
121121
contextPropagators []ContextPropagator
122122
tracer opentracing.Tracer
123123
workflowInterceptorFactories []WorkflowInterceptorFactory
124+
featureFlags FeatureFlags
124125
}
125126

126127
localActivityTask struct {
@@ -205,6 +206,7 @@ func newWorkflowExecutionEventHandler(
205206
contextPropagators []ContextPropagator,
206207
tracer opentracing.Tracer,
207208
workflowInterceptorFactories []WorkflowInterceptorFactory,
209+
featureFlags FeatureFlags,
208210
) workflowExecutionEventHandler {
209211
context := &workflowEnvironmentImpl{
210212
workflowInfo: workflowInfo,
@@ -222,6 +224,7 @@ func newWorkflowExecutionEventHandler(
222224
contextPropagators: contextPropagators,
223225
tracer: tracer,
224226
workflowInterceptorFactories: workflowInterceptorFactories,
227+
featureFlags: featureFlags,
225228
}
226229
context.logger = logger.With(
227230
zapcore.Field{Key: tagWorkflowType, Type: zapcore.StringType, String: workflowInfo.WorkflowType.Name},
@@ -472,7 +475,7 @@ func (wc *workflowEnvironmentImpl) ExecuteActivity(parameters executeActivityPar
472475
}
473476
activityID := scheduleTaskAttr.GetActivityId()
474477
scheduleTaskAttr.ActivityType = activityTypePtr(parameters.ActivityType)
475-
scheduleTaskAttr.TaskList = common.TaskListPtr(m.TaskList{Name: common.StringPtr(parameters.TaskListName)})
478+
scheduleTaskAttr.TaskList = common.TaskListPtr(m.TaskList{Name: common.StringPtr(parameters.TaskListName), Kind: parameters.TaskListKind.Ptr()})
476479
scheduleTaskAttr.Input = parameters.Input
477480
scheduleTaskAttr.ScheduleToCloseTimeoutSeconds = common.Int32Ptr(parameters.ScheduleToCloseTimeoutSeconds)
478481
scheduleTaskAttr.StartToCloseTimeoutSeconds = common.Int32Ptr(parameters.StartToCloseTimeoutSeconds)
@@ -804,6 +807,10 @@ func (wc *workflowEnvironmentImpl) GetWorkflowInterceptors() []WorkflowIntercept
804807
return wc.workflowInterceptorFactories
805808
}
806809

810+
func (wc *workflowEnvironmentImpl) GetFeatureFlags() FeatureFlags {
811+
return wc.featureFlags
812+
}
813+
807814
func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
808815
event *m.HistoryEvent,
809816
isReplay bool,

internal/internal_event_handlers_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1065,6 +1065,7 @@ func testWorkflowExecutionEventHandler(t *testing.T, registry *registry) *workfl
10651065
nil,
10661066
opentracing.NoopTracer{},
10671067
nil,
1068+
FeatureFlags{},
10681069
).(*workflowExecutionEventHandlerImpl)
10691070
}
10701071

internal/internal_task_handlers.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ type (
145145
tracer opentracing.Tracer
146146
workflowInterceptorFactories []WorkflowInterceptorFactory
147147
disableStrictNonDeterminism bool
148+
featureFlags FeatureFlags
148149
}
149150

150151
activityProvider func(name string) activity
@@ -420,6 +421,7 @@ func newWorkflowTaskHandler(
420421
tracer: params.Tracer,
421422
workflowInterceptorFactories: params.WorkflowInterceptorChainFactories,
422423
disableStrictNonDeterminism: params.WorkerBugPorts.DisableStrictNonDeterminismCheck,
424+
featureFlags: params.FeatureFlags,
423425
}
424426

425427
traceLog(func() {
@@ -623,6 +625,7 @@ func (w *workflowExecutionContextImpl) createEventHandler() {
623625
w.wth.contextPropagators,
624626
w.wth.tracer,
625627
w.wth.workflowInterceptorFactories,
628+
w.wth.featureFlags,
626629
)
627630
w.eventHandler.Store(eventHandler)
628631
}

0 commit comments

Comments
 (0)