Skip to content
This repository was archived by the owner on Oct 9, 2023. It is now read-only.

Commit fbdb162

Browse files
authored
Handle batched TaskExecutionEvent reasons (#615)
* Handle batched TaskExecutionEvent reasons Signed-off-by: Andrew Dye <[email protected]> * Add tests Signed-off-by: Andrew Dye <[email protected]> * Update flyteidl version Signed-off-by: Andrew Dye <[email protected]> * Update to EventReason Signed-off-by: Andrew Dye <[email protected]> --------- Signed-off-by: Andrew Dye <[email protected]>
1 parent af81751 commit fbdb162

File tree

5 files changed

+322
-15
lines changed

5 files changed

+322
-15
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ require (
1313
github.com/cloudevents/sdk-go/v2 v2.8.0
1414
github.com/coreos/go-oidc v2.2.1+incompatible
1515
github.com/evanphx/json-patch v4.12.0+incompatible
16-
github.com/flyteorg/flyteidl v1.5.14
16+
github.com/flyteorg/flyteidl v1.5.21
1717
github.com/flyteorg/flyteplugins v1.0.67
1818
github.com/flyteorg/flytepropeller v1.1.98
1919
github.com/flyteorg/flytestdlib v1.0.22

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -293,8 +293,8 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga
293293
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
294294
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
295295
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
296-
github.com/flyteorg/flyteidl v1.5.14 h1:+3ewipoOp82fPyIVgvvrMq1lorl5Kz3Lh6sh/a9+loI=
297-
github.com/flyteorg/flyteidl v1.5.14/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og=
296+
github.com/flyteorg/flyteidl v1.5.21 h1:zP1byUlNFqstTe7Io1DiiNgNf+mZAVmGZM04oIUA5kU=
297+
github.com/flyteorg/flyteidl v1.5.21/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og=
298298
github.com/flyteorg/flyteplugins v1.0.67 h1:d2FXpwxQwX/k4YdmhuusykOemHb/cUTPEob4WBmdpjE=
299299
github.com/flyteorg/flyteplugins v1.0.67/go.mod h1:HHt4nKDKVwrZPKDsj99dNtDSIJL378xNotYMA3a/TFA=
300300
github.com/flyteorg/flytepropeller v1.1.98 h1:Zk2ENYB9VZRT5tFUIFjm+aCkr0TU2EuyJ5gh52fpLoA=

pkg/manager/impl/task_execution_manager.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,17 @@ import (
55
"fmt"
66
"strconv"
77

8+
"github.com/golang/protobuf/proto"
9+
"github.com/prometheus/client_golang/prometheus"
10+
"google.golang.org/grpc/codes"
11+
812
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
913
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
1014
"github.com/flyteorg/flytestdlib/contextutils"
1115
"github.com/flyteorg/flytestdlib/logger"
1216
"github.com/flyteorg/flytestdlib/promutils"
1317
"github.com/flyteorg/flytestdlib/promutils/labeled"
1418
"github.com/flyteorg/flytestdlib/storage"
15-
"github.com/golang/protobuf/proto"
16-
"github.com/prometheus/client_golang/prometheus"
17-
"google.golang.org/grpc/codes"
1819

1920
cloudeventInterfaces "github.com/flyteorg/flyteadmin/pkg/async/cloudevent/interfaces"
2021
notificationInterfaces "github.com/flyteorg/flyteadmin/pkg/async/notifications/interfaces"
@@ -189,7 +190,7 @@ func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, req
189190
return nil, err
190191
}
191192

192-
if request.Event.Phase == core.TaskExecution_RUNNING && request.Event.PhaseVersion == 0 {
193+
if request.Event.Phase == core.TaskExecution_RUNNING && request.Event.PhaseVersion == 0 { // TODO: need to be careful about missing inc/decs
193194
m.metrics.ActiveTaskExecutions.Inc()
194195
} else if common.IsTaskExecutionTerminal(request.Event.Phase) && request.Event.PhaseVersion == 0 {
195196
m.metrics.ActiveTaskExecutions.Dec()

pkg/repositories/transformers/task_execution.go

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,19 +153,27 @@ func CreateTaskExecutionModel(ctx context.Context, input CreateTaskExecutionMode
153153
CreatedAt: input.Request.Event.OccurredAt,
154154
Logs: input.Request.Event.Logs,
155155
CustomInfo: input.Request.Event.CustomInfo,
156-
Reason: input.Request.Event.Reason,
157156
TaskType: input.Request.Event.TaskType,
158157
Metadata: metadata,
159158
EventVersion: input.Request.Event.EventVersion,
160159
}
161160

162-
if len(input.Request.Event.Reason) > 0 {
161+
if len(input.Request.Event.Reasons) > 0 {
162+
for _, reason := range input.Request.Event.Reasons {
163+
closure.Reasons = append(closure.Reasons, &admin.Reason{
164+
OccurredAt: reason.OccurredAt,
165+
Message: reason.Reason,
166+
})
167+
}
168+
closure.Reason = input.Request.Event.Reasons[len(input.Request.Event.Reasons)-1].Reason
169+
} else if len(input.Request.Event.Reason) > 0 {
163170
closure.Reasons = []*admin.Reason{
164-
&admin.Reason{
171+
{
165172
OccurredAt: input.Request.Event.OccurredAt,
166173
Message: input.Request.Event.Reason,
167174
},
168175
}
176+
closure.Reason = input.Request.Event.Reason
169177
}
170178

171179
eventPhase := input.Request.Event.Phase
@@ -386,7 +394,17 @@ func UpdateTaskExecutionModel(ctx context.Context, request *admin.TaskExecutionE
386394
}
387395
taskExecutionClosure.UpdatedAt = reportedAt
388396
taskExecutionClosure.Logs = mergeLogs(taskExecutionClosure.Logs, request.Event.Logs)
389-
if len(request.Event.Reason) > 0 {
397+
if len(request.Event.Reasons) > 0 {
398+
for _, reason := range request.Event.Reasons {
399+
taskExecutionClosure.Reasons = append(
400+
taskExecutionClosure.Reasons,
401+
&admin.Reason{
402+
OccurredAt: reason.OccurredAt,
403+
Message: reason.Reason,
404+
})
405+
}
406+
taskExecutionClosure.Reason = request.Event.Reasons[len(request.Event.Reasons)-1].Reason
407+
} else if len(request.Event.Reason) > 0 {
390408
if taskExecutionClosure.Reason != request.Event.Reason {
391409
// by tracking a time-series of reasons we increase the size of the TaskExecutionClosure in scenarios where
392410
// a task reports a large number of unique reasons. if this size increase becomes problematic we this logic

0 commit comments

Comments
 (0)