Skip to content

Commit 02e5d7f

Browse files
knopers8justonedev1
authored andcommitted
[core] ODC: Send SOR/EOR timestamps as FMQ properties
OCTRL-987
1 parent bf0e386 commit 02e5d7f

File tree

3 files changed

+65
-22
lines changed

3 files changed

+65
-22
lines changed

core/integration/odc/handlers.go

+37-12
Original file line numberDiff line numberDiff line change
@@ -97,17 +97,10 @@ func handleGetState(ctx context.Context, odcClient *RpcClient, envId string) (st
9797
return odcutils.StateForOdcState(newState), err
9898
}
9999

100-
func handleStart(ctx context.Context, odcClient *RpcClient, arguments map[string]string, paddingTimeout time.Duration, envId string, runNumber uint64, call *callable.Call) error {
101-
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("odcclient").WithField("partition", envId))
100+
func setProperties(ctx context.Context, odcClient *RpcClient, arguments map[string]string, paddingTimeout time.Duration, envId string, runNumber uint64, call *callable.Call) error {
102101

103102
var err error = nil
104-
var rep *odcpb.StateReply
105-
106-
if envId == "" {
107-
return errors.New("cannot proceed with empty environment id")
108-
}
109103

110-
// SetProperties before START
111104
setPropertiesRequest := &odcpb.SetPropertiesRequest{
112105
Partitionid: envId,
113106
Path: "",
@@ -252,6 +245,25 @@ func handleStart(ctx context.Context, odcClient *RpcClient, arguments map[string
252245
}).
253246
Debug("call to ODC complete: odc.SetProperties")
254247

248+
return nil
249+
}
250+
251+
func handleStart(ctx context.Context, odcClient *RpcClient, arguments map[string]string, paddingTimeout time.Duration, envId string, runNumber uint64, call *callable.Call) error {
252+
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("odcclient").WithField("partition", envId))
253+
254+
var err error = nil
255+
var rep *odcpb.StateReply
256+
257+
if envId == "" {
258+
return errors.New("cannot proceed with empty environment id")
259+
}
260+
261+
// SetProperties before START
262+
err = setProperties(ctx, odcClient, arguments, paddingTimeout, envId, runNumber, call)
263+
if err != nil {
264+
return err
265+
}
266+
255267
// The actual START operation starts here
256268
req := &odcpb.StartRequest{
257269
Request: &odcpb.StateRequest{
@@ -262,15 +274,15 @@ func handleStart(ctx context.Context, odcClient *RpcClient, arguments map[string
262274
},
263275
}
264276
// We ask this ODC call to complete within our own DEADLINE, minus 1 second
265-
ctxDeadline, ok = ctx.Deadline()
277+
ctxDeadline, ok := ctx.Deadline()
266278
if ok {
267279
req.Request.Timeout = uint32((time.Until(ctxDeadline) - paddingTimeout).Seconds())
268280
}
269281

270-
payload = map[string]interface{}{
282+
payload := map[string]interface{}{
271283
"odcRequest": &req,
272284
}
273-
payloadJson, _ = json.Marshal(payload)
285+
payloadJson, _ := json.Marshal(payload)
274286
the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
275287
Name: call.GetName(),
276288
OperationName: call.Func,
@@ -382,6 +394,20 @@ func handleStart(ctx context.Context, odcClient *RpcClient, arguments map[string
382394

383395
func handleStop(ctx context.Context, odcClient *RpcClient, arguments map[string]string, paddingTimeout time.Duration, envId string, runNumber uint64, call *callable.Call) error {
384396
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("odcclient").WithField("partition", envId))
397+
var err error = nil
398+
399+
// SetProperties before STOP
400+
if len(arguments) > 0 {
401+
err = setProperties(ctx, odcClient, arguments, paddingTimeout, envId, runNumber, call)
402+
if err != nil {
403+
log.WithField("partition", envId).
404+
WithField("level", infologger.IL_Support).
405+
WithError(err).
406+
Warn("setProperties call to ODC failed. will continue with odc.Stop")
407+
}
408+
}
409+
410+
// The actual STOP operation starts here
385411
req := &odcpb.StopRequest{
386412
Request: &odcpb.StateRequest{
387413
Partitionid: envId,
@@ -396,7 +422,6 @@ func handleStop(ctx context.Context, odcClient *RpcClient, arguments map[string]
396422
req.Request.Timeout = uint32((time.Until(ctxDeadline) - paddingTimeout).Seconds())
397423
}
398424

399-
var err error = nil
400425
var rep *odcpb.StateReply
401426

402427
if envId == "" {

core/integration/odc/plugin.go

+24-9
Original file line numberDiff line numberDiff line change
@@ -1363,6 +1363,12 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
13631363
WithField("call", "Start").
13641364
Warn("cannot acquire FairMQ devices cleanup count for ODC")
13651365
}
1366+
runStartTimeMs, ok := varStack["run_start_time_ms"]
1367+
if !ok {
1368+
log.WithField("partition", envId).
1369+
WithField("call", "Start").
1370+
Warn("cannot acquire run_start_time_ms")
1371+
}
13661372

13671373
var (
13681374
runNumberu64 uint64
@@ -1391,6 +1397,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
13911397
arguments := make(map[string]string)
13921398
arguments["run_number"] = rn
13931399
arguments["runNumber"] = rn
1400+
arguments["run_start_time_ms"] = runStartTimeMs
13941401
arguments["cleanup"] = strconv.Itoa(cleanupCount)
13951402

13961403
ctx, cancel := context.WithTimeout(context.Background(), timeout)
@@ -1409,32 +1416,40 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
14091416
}
14101417
stack["Stop"] = func() (out string) {
14111418
// ODC Stop
1419+
callFailedStr := "EPN Stop call failed"
1420+
var (
1421+
runNumberu64 uint64
1422+
err error
1423+
)
14121424

14131425
rn, ok := varStack["run_number"]
14141426
if !ok {
14151427
log.WithField("partition", envId).
14161428
WithField("call", "Start").
1417-
Warn("cannot acquire run number for ODC")
1429+
Warn("cannot acquire run number for ODC Stop")
14181430
}
1419-
var (
1420-
runNumberu64 uint64
1421-
err error
1422-
)
1423-
callFailedStr := "EPN Stop call failed"
1424-
14251431
runNumberu64, err = strconv.ParseUint(rn, 10, 32)
14261432
if err != nil {
14271433
log.WithField("partition", envId).
14281434
WithError(err).
1429-
Error("cannot acquire run number for DCS SOR")
1435+
Error("cannot acquire run number for ODC EOR")
14301436
runNumberu64 = 0
14311437
}
1438+
runEndTimeMs, ok := varStack["run_end_time_ms"]
1439+
if !ok {
1440+
log.WithField("partition", envId).
1441+
WithField("call", "Start").
1442+
Warn("cannot acquire run_end_time_ms")
1443+
}
1444+
1445+
arguments := make(map[string]string)
1446+
arguments["run_end_time_ms"] = runEndTimeMs
14321447

14331448
timeout := callable.AcquireTimeout(ODC_STOP_TIMEOUT, varStack, "Stop", envId)
14341449

14351450
ctx, cancel := context.WithTimeout(context.Background(), timeout)
14361451
defer cancel()
1437-
err = handleStop(ctx, p.odcClient, nil, paddingTimeout, envId, runNumberu64, call)
1452+
err = handleStop(ctx, p.odcClient, arguments, paddingTimeout, envId, runNumberu64, call)
14381453
if err != nil {
14391454
log.WithError(err).
14401455
WithField("level", infologger.IL_Support).

docs/handbook/configuration.md

+4-1
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,10 @@ In addition to the above, which varies depending on the configuration of the env
455455
* `pdp_beam_type`
456456
* `pdp_override_run_start_time`
457457

458-
FairMQ task implementors should expect that these values are written to the FairMQ properties map right before the `RUN` transition via `SetProperty` calls.
458+
The following values are pushed by AliECS during `STOP_ACTIVITY`:
459+
* `run_end_time_ms`
460+
461+
FairMQ task implementors should expect that these values are written to the FairMQ properties map right before the `RUN` and `STOP` transitions via `SetProperty` calls.
459462

460463
## Resource wants and limits
461464

0 commit comments

Comments
 (0)