Skip to content

Commit a9d3781

Browse files
committed
read autoconfighint from empty poll
1 parent e2a7c2c commit a9d3781

4 files changed

+34
-14
lines changed

internal/internal_task_handlers.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,17 @@ type (
7575
// workflowTask wraps a decision task.
7676
workflowTask struct {
7777
task *s.PollForDecisionTaskResponse
78+
autoConfigHint *s.AutoConfigHint
7879
historyIterator HistoryIterator
7980
doneCh chan struct{}
8081
laResultCh chan *localActivityResult
8182
}
8283

8384
// activityTask wraps a activity task.
8485
activityTask struct {
85-
task *s.PollForActivityTaskResponse
86-
pollStartTime time.Time
86+
task *s.PollForActivityTaskResponse
87+
autoConfigHint *s.AutoConfigHint
88+
pollStartTime time.Time
8789
}
8890

8991
// resetStickinessTask wraps a ResetStickyTaskListRequest.

internal/internal_task_pollers.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -848,7 +848,9 @@ func (wtp *workflowTaskPoller) poll(ctx context.Context) (interface{}, error) {
848848
if response == nil || len(response.TaskToken) == 0 {
849849
wtp.metricsScope.Counter(metrics.DecisionPollNoTaskCounter).Inc(1)
850850
wtp.updateBacklog(request.TaskList.GetKind(), 0)
851-
return &workflowTask{}, nil
851+
return &workflowTask{
852+
autoConfigHint: response.GetAutoConfigHint(),
853+
}, nil
852854
}
853855

854856
wtp.updateBacklog(request.TaskList.GetKind(), response.GetBacklogCountHint())
@@ -908,6 +910,7 @@ func (wtp *workflowTaskPoller) toWorkflowTask(response *s.PollForDecisionTaskRes
908910
task := &workflowTask{
909911
task: response,
910912
historyIterator: historyIterator,
913+
autoConfigHint: response.GetAutoConfigHint(),
911914
}
912915
return task
913916
}
@@ -1116,7 +1119,9 @@ func (atp *activityTaskPoller) pollWithMetrics(ctx context.Context,
11161119
return nil, err
11171120
}
11181121
if response == nil || len(response.TaskToken) == 0 {
1119-
return &activityTask{}, nil
1122+
return &activityTask{
1123+
autoConfigHint: response.GetAutoConfigHint(),
1124+
}, nil
11201125
}
11211126

11221127
workflowType := response.WorkflowType.GetName()
@@ -1128,7 +1133,7 @@ func (atp *activityTaskPoller) pollWithMetrics(ctx context.Context,
11281133
scheduledToStartLatency := time.Duration(response.GetStartedTimestamp() - response.GetScheduledTimestampOfThisAttempt())
11291134
metricsScope.Timer(metrics.ActivityScheduledToStartLatency).Record(scheduledToStartLatency)
11301135

1131-
return &activityTask{task: response, pollStartTime: startTime}, nil
1136+
return &activityTask{task: response, pollStartTime: startTime, autoConfigHint: response.GetAutoConfigHint()}, nil
11321137
}
11331138

11341139
// PollTask polls a new task

internal/internal_worker_base.go

+8-7
Original file line numberDiff line numberDiff line change
@@ -335,10 +335,11 @@ func (bw *baseWorker) pollTask() {
335335
}
336336
}
337337

338+
if bw.concurrencyAutoScaler != nil {
339+
bw.concurrencyAutoScaler.ProcessPollerHint(getAutoConfigHint(task))
340+
}
341+
338342
if task != nil {
339-
if bw.concurrencyAutoScaler != nil {
340-
bw.concurrencyAutoScaler.ProcessPollerHint(getAutoConfigHint(task))
341-
}
342343
select {
343344
case bw.taskQueueCh <- &polledTask{task}:
344345
case <-bw.shutdownCh:
@@ -428,15 +429,15 @@ func (bw *baseWorker) Stop() {
428429
func getAutoConfigHint(task interface{}) *shared.AutoConfigHint {
429430
switch t := task.(type) {
430431
case *workflowTask:
431-
if t.task == nil {
432+
if t.autoConfigHint == nil {
432433
return nil
433434
}
434-
return t.task.AutoConfigHint
435+
return t.autoConfigHint
435436
case *activityTask:
436-
if t.task == nil {
437+
if t.autoConfigHint == nil {
437438
return nil
438439
}
439-
return t.task.AutoConfigHint
440+
return t.autoConfigHint
440441
default:
441442
return nil
442443
}

internal/internal_worker_test.go

+14-2
Original file line numberDiff line numberDiff line change
@@ -1514,13 +1514,25 @@ func TestGetTaskAutoConfigHint(t *testing.T) {
15141514
{
15151515
"decision task",
15161516
&workflowTask{
1517-
&shared.PollForDecisionTaskResponse{AutoConfigHint: &hint}, nil, nil, nil},
1517+
&shared.PollForDecisionTaskResponse{AutoConfigHint: &hint}, &hint, nil, nil, nil},
1518+
&hint,
1519+
},
1520+
{
1521+
"empty decision task",
1522+
&workflowTask{
1523+
nil, &hint, nil, nil, nil},
15181524
&hint,
15191525
},
15201526
{
15211527
"activity task",
15221528
&activityTask{
1523-
&shared.PollForActivityTaskResponse{AutoConfigHint: &hint}, time.Now()},
1529+
&shared.PollForActivityTaskResponse{AutoConfigHint: &hint}, &hint, time.Now()},
1530+
&hint,
1531+
},
1532+
{
1533+
"empty activity task",
1534+
&activityTask{
1535+
nil, &hint, time.Now()},
15241536
&hint,
15251537
},
15261538
{

0 commit comments

Comments
 (0)