Skip to content

Commit 54994ff

Browse files
teoknopers8
authored andcommitted
[core] Handle gRPC code DeadlineExceeded in DCS client
1 parent 9b967cc commit 54994ff

File tree

1 file changed

+166
-47
lines changed

1 file changed

+166
-47
lines changed

core/integration/dcs/plugin.go

Lines changed: 166 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ import (
5353
"github.com/spf13/viper"
5454
"golang.org/x/exp/maps"
5555
"google.golang.org/grpc"
56+
"google.golang.org/grpc/codes"
5657
"google.golang.org/grpc/connectivity"
58+
"google.golang.org/grpc/status"
5759
)
5860

5961
const (
@@ -782,22 +784,59 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
782784

783785
break
784786
}
785-
if err != nil { // stream termination in case of general error
786-
logMsg := "bad DCS PFR event received, any future DCS events are ignored"
787-
log.WithError(err).
788-
WithField("partition", envId).
789-
Warn(logMsg)
787+
if err != nil { // stream termination in case of unknown or gRPC error
788+
got := status.Code(err)
790789

791-
the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
792-
Name: call.GetName(),
793-
OperationName: call.Func,
794-
OperationStatus: pb.OpStatus_ONGOING,
795-
OperationStep: "perform DCS call: PrepareForRun",
796-
OperationStepStatus: pb.OpStatus_DONE_ERROR,
797-
EnvironmentId: envId,
798-
Payload: string(payloadJson[:]),
799-
Error: logMsg,
800-
})
790+
if got == codes.DeadlineExceeded {
791+
log.WithError(err).
792+
WithField("partition", envId).
793+
WithField("timeout", timeout.String()).
794+
Debug("DCS PFR timed out")
795+
err = fmt.Errorf("DCS PFR timed out after %s: %w", timeout.String(), err)
796+
797+
the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
798+
Name: call.GetName(),
799+
OperationName: call.Func,
800+
OperationStatus: pb.OpStatus_ONGOING,
801+
OperationStep: "perform DCS call: PrepareForRun",
802+
OperationStepStatus: pb.OpStatus_DONE_TIMEOUT,
803+
EnvironmentId: envId,
804+
Payload: string(payloadJson[:]),
805+
Error: err.Error(),
806+
})
807+
} else if got == codes.Unknown { // unknown error, likely not a gRPC code
808+
logMsg := "bad DCS PFR event received, any future DCS events are ignored"
809+
log.WithError(err).
810+
WithField("partition", envId).
811+
Warn(logMsg)
812+
813+
the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
814+
Name: call.GetName(),
815+
OperationName: call.Func,
816+
OperationStatus: pb.OpStatus_ONGOING,
817+
OperationStep: "perform DCS call: PrepareForRun",
818+
OperationStepStatus: pb.OpStatus_DONE_ERROR,
819+
EnvironmentId: envId,
820+
Payload: string(payloadJson[:]),
821+
Error: logMsg,
822+
})
823+
} else { // some other gRPC error code
824+
log.WithError(err).
825+
WithField("partition", envId).
826+
Error("DCS PFR call error")
827+
err = fmt.Errorf("DCS PFR call error: %w", err)
828+
829+
the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
830+
Name: call.GetName(),
831+
OperationName: call.Func,
832+
OperationStatus: pb.OpStatus_ONGOING,
833+
OperationStep: "perform DCS call: PrepareForRun",
834+
OperationStepStatus: pb.OpStatus_DONE_ERROR,
835+
EnvironmentId: envId,
836+
Payload: string(payloadJson[:]),
837+
Error: err.Error(),
838+
})
839+
}
801840

802841
break
803842
}
@@ -1452,23 +1491,63 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
14521491

14531492
break
14541493
}
1455-
if err != nil { // stream termination in case of general error
1456-
logMsg := "bad DCS SOR event received, any future DCS events are ignored"
1457-
log.WithError(err).
1458-
WithField("partition", envId).
1459-
WithField("run", runNumber64).
1460-
Warn(logMsg)
1494+
if err != nil { // stream termination in case of unknown or gRPC error
1495+
got := status.Code(err)
14611496

1462-
the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
1463-
Name: call.GetName(),
1464-
OperationName: call.Func,
1465-
OperationStatus: pb.OpStatus_ONGOING,
1466-
OperationStep: "perform DCS call: StartOfRun",
1467-
OperationStepStatus: pb.OpStatus_DONE_ERROR,
1468-
EnvironmentId: envId,
1469-
Payload: string(payloadJson[:]),
1470-
Error: logMsg,
1471-
})
1497+
if got == codes.DeadlineExceeded {
1498+
log.WithError(err).
1499+
WithField("partition", envId).
1500+
WithField("run", runNumber64).
1501+
WithField("timeout", timeout.String()).
1502+
Debug("DCS SOR timed out")
1503+
err = fmt.Errorf("DCS SOR timed out after %s: %w", timeout.String(), err)
1504+
1505+
the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
1506+
Name: call.GetName(),
1507+
OperationName: call.Func,
1508+
OperationStatus: pb.OpStatus_ONGOING,
1509+
OperationStep: "perform DCS call: StartOfRun",
1510+
OperationStepStatus: pb.OpStatus_DONE_TIMEOUT,
1511+
EnvironmentId: envId,
1512+
Payload: string(payloadJson[:]),
1513+
Error: err.Error(),
1514+
})
1515+
1516+
} else if got == codes.Unknown { // unknown error, likely not a gRPC code
1517+
logMsg := "bad DCS SOR event received, any future DCS events are ignored"
1518+
log.WithError(err).
1519+
WithField("partition", envId).
1520+
WithField("run", runNumber64).
1521+
Warn(logMsg)
1522+
1523+
the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
1524+
Name: call.GetName(),
1525+
OperationName: call.Func,
1526+
OperationStatus: pb.OpStatus_ONGOING,
1527+
OperationStep: "perform DCS call: StartOfRun",
1528+
OperationStepStatus: pb.OpStatus_DONE_ERROR,
1529+
EnvironmentId: envId,
1530+
Payload: string(payloadJson[:]),
1531+
Error: logMsg,
1532+
})
1533+
} else { // some other gRPC error code
1534+
log.WithError(err).
1535+
WithField("partition", envId).
1536+
WithField("run", runNumber64).
1537+
Debug("DCS SOR call error")
1538+
err = fmt.Errorf("DCS SOR call error: %w", err)
1539+
1540+
the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
1541+
Name: call.GetName(),
1542+
OperationName: call.Func,
1543+
OperationStatus: pb.OpStatus_ONGOING,
1544+
OperationStep: "perform DCS call: StartOfRun",
1545+
OperationStepStatus: pb.OpStatus_DONE_ERROR,
1546+
EnvironmentId: envId,
1547+
Payload: string(payloadJson[:]),
1548+
Error: err.Error(),
1549+
})
1550+
}
14721551

14731552
break
14741553
}
@@ -2001,23 +2080,63 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
20012080

20022081
break
20032082
}
2004-
if err != nil { // stream termination in case of general error
2005-
logMsg := "bad DCS EOR event received, any future DCS events are ignored"
2006-
log.WithError(err).
2007-
WithField("partition", envId).
2008-
WithField("run", runNumber64).
2009-
Warn(logMsg)
2083+
if err != nil { // stream termination in case of unknown or gRPC error
2084+
got := status.Code(err)
20102085

2011-
the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
2012-
Name: call.GetName(),
2013-
OperationName: call.Func,
2014-
OperationStatus: pb.OpStatus_ONGOING,
2015-
OperationStep: "perform DCS call: EndOfRun",
2016-
OperationStepStatus: pb.OpStatus_DONE_ERROR,
2017-
EnvironmentId: envId,
2018-
Payload: string(payloadJson[:]),
2019-
Error: logMsg,
2020-
})
2086+
if got == codes.DeadlineExceeded {
2087+
log.WithError(err).
2088+
WithField("partition", envId).
2089+
WithField("run", runNumber64).
2090+
WithField("timeout", timeout.String()).
2091+
Debug("DCS EOR timed out")
2092+
err = fmt.Errorf("DCS EOR timed out after %s: %w", timeout.String(), err)
2093+
2094+
the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
2095+
Name: call.GetName(),
2096+
OperationName: call.Func,
2097+
OperationStatus: pb.OpStatus_ONGOING,
2098+
OperationStep: "perform DCS call: EndOfRun",
2099+
OperationStepStatus: pb.OpStatus_DONE_TIMEOUT,
2100+
EnvironmentId: envId,
2101+
Payload: string(payloadJson[:]),
2102+
Error: err.Error(),
2103+
})
2104+
2105+
} else if got == codes.Unknown { // unknown error, likely not a gRPC code
2106+
logMsg := "bad DCS EOR event received, any future DCS events are ignored"
2107+
log.WithError(err).
2108+
WithField("partition", envId).
2109+
WithField("run", runNumber64).
2110+
Warn(logMsg)
2111+
2112+
the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
2113+
Name: call.GetName(),
2114+
OperationName: call.Func,
2115+
OperationStatus: pb.OpStatus_ONGOING,
2116+
OperationStep: "perform DCS call: EndOfRun",
2117+
OperationStepStatus: pb.OpStatus_DONE_ERROR,
2118+
EnvironmentId: envId,
2119+
Payload: string(payloadJson[:]),
2120+
Error: logMsg,
2121+
})
2122+
} else { // some other gRPC error code
2123+
log.WithError(err).
2124+
WithField("partition", envId).
2125+
WithField("run", runNumber64).
2126+
Debug("DCS EOR call error")
2127+
err = fmt.Errorf("DCS EOR call error: %w", err)
2128+
2129+
the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
2130+
Name: call.GetName(),
2131+
OperationName: call.Func,
2132+
OperationStatus: pb.OpStatus_ONGOING,
2133+
OperationStep: "perform DCS call: EndOfRun",
2134+
OperationStepStatus: pb.OpStatus_DONE_ERROR,
2135+
EnvironmentId: envId,
2136+
Payload: string(payloadJson[:]),
2137+
Error: err.Error(),
2138+
})
2139+
}
20212140

20222141
break
20232142
}

0 commit comments

Comments
 (0)