Skip to content

Commit b14ce6b

Browse files
Michal Tichákteo
Michal Tichák
authored andcommitted
[core] ddsched plugin gRPC calls have timeout by default
1 parent c6df054 commit b14ce6b

File tree

3 files changed

+76
-15
lines changed

3 files changed

+76
-15
lines changed

core/config.go

+2
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ func setDefaults() error {
102102
viper.SetDefault("ccdbEndpoint", "http://ccdb-test.cern.ch:8080")
103103
viper.SetDefault("dcsServiceEndpoint", "//127.0.0.1:50051")
104104
viper.SetDefault("dcsServiceUseSystemProxy", false)
105+
viper.SetDefault("ddSchedulergRPCTimeout", "5s")
105106
viper.SetDefault("ddSchedulerEndpoint", "//127.0.0.1:50052")
106107
viper.SetDefault("ddSchedulerUseSystemProxy", false)
107108
viper.SetDefault("trgServiceEndpoint", "//127.0.0.1:50060")
@@ -170,6 +171,7 @@ func setFlags() error {
170171
pflag.String("dcsServiceEndpoint", viper.GetString("dcsServiceEndpoint"), "Endpoint of the DCS gRPC service (`host:port`)")
171172
pflag.Bool("dcsServiceUseSystemProxy", viper.GetBool("dcsServiceUseSystemProxy"), "When true the https_proxy, http_proxy and no_proxy environment variables are obeyed")
172173
pflag.String("ddSchedulerEndpoint", viper.GetString("ddSchedulerEndpoint"), "Endpoint of the DD scheduler gRPC service (`host:port`)")
174+
pflag.Duration("ddSchedulergRPCTimeout", viper.GetDuration("ddSchedulergRPCTimeout"), "Timeout for gRPC calls in ddshed plugin")
173175
pflag.Bool("ddSchedulerUseSystemProxy", viper.GetBool("ddSchedulerUseSystemProxy"), "When true the https_proxy, http_proxy and no_proxy environment variables are obeyed")
174176
pflag.String("trgServiceEndpoint", viper.GetString("trgServiceEndpoint"), "Endpoint of the TRG gRPC service (`host:port`)")
175177
pflag.String("trgPollingInterval", viper.GetString("trgPollingInterval"), "How often to query the TRG gRPC service for run status (default: 3s)")

core/integration/ddsched/client.go

+16-15
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"time"
3030

3131
"github.com/AliceO2Group/Control/common/logger"
32+
"github.com/AliceO2Group/Control/core/integration"
3233
ddpb "github.com/AliceO2Group/Control/core/integration/ddsched/protos"
3334
"github.com/sirupsen/logrus"
3435
"github.com/spf13/viper"
@@ -38,12 +39,11 @@ import (
3839
"google.golang.org/grpc/keepalive"
3940
)
4041

41-
var log = logger.New(logrus.StandardLogger(),"ddschedclient")
42-
42+
var log = logger.New(logrus.StandardLogger(), "ddschedclient")
4343

4444
type RpcClient struct {
4545
ddpb.DataDistributionControlClient
46-
conn *grpc.ClientConn
46+
conn *grpc.ClientConn
4747
cancel context.CancelFunc
4848
}
4949

@@ -52,10 +52,10 @@ func NewClient(cxt context.Context, cancel context.CancelFunc, endpoint string)
5252
"endpoint": endpoint,
5353
}).Debug("dialing DD scheduler endpoint")
5454

55-
dialOptions := []grpc.DialOption {
55+
dialOptions := []grpc.DialOption{
5656
grpc.WithInsecure(),
5757
grpc.WithConnectParams(grpc.ConnectParams{
58-
Backoff: backoff.Config{
58+
Backoff: backoff.Config{
5959
BaseDelay: backoff.DefaultConfig.BaseDelay,
6060
Multiplier: backoff.DefaultConfig.Multiplier,
6161
Jitter: backoff.DefaultConfig.Jitter,
@@ -68,14 +68,15 @@ func NewClient(cxt context.Context, cancel context.CancelFunc, endpoint string)
6868
Timeout: time.Second,
6969
PermitWithoutStream: true,
7070
}),
71+
grpc.WithUnaryInterceptor(integration.UnaryTimeoutInterceptor(viper.GetDuration("ddSchedulergRPCTimeout"), "ddsched gRPC call failed")),
7172
}
7273
if !viper.GetBool("ddSchedulerUseSystemProxy") {
7374
dialOptions = append(dialOptions, grpc.WithNoProxy())
7475
}
7576
conn, err := grpc.DialContext(cxt,
76-
endpoint,
77-
dialOptions...,
78-
)
77+
endpoint,
78+
dialOptions...,
79+
)
7980
if err != nil {
8081
log.WithField("error", err.Error()).
8182
WithField("endpoint", endpoint).
@@ -95,27 +96,27 @@ func NewClient(cxt context.Context, cancel context.CancelFunc, endpoint string)
9596

9697
for {
9798
select {
98-
case ok := <- stateChangedNotify:
99+
case ok := <-stateChangedNotify:
99100
if !ok {
100101
return
101102
}
102103
connState = conn.GetState()
103104
log.Debugf("DD scheduler client %s", connState.String())
104105
go notifyFunc(connState)
105-
case <- time.After(2 * time.Minute):
106+
case <-time.After(2 * time.Minute):
106107
if conn.GetState() != connectivity.Ready {
107108
conn.ResetConnectBackoff()
108109
}
109-
case <- cxt.Done():
110+
case <-cxt.Done():
110111
return
111112
}
112113
}
113114
}()
114115

115-
client := &RpcClient {
116+
client := &RpcClient{
116117
DataDistributionControlClient: ddpb.NewDataDistributionControlClient(conn),
117-
conn: conn,
118-
cancel: cancel,
118+
conn: conn,
119+
cancel: cancel,
119120
}
120121

121122
return client
@@ -131,4 +132,4 @@ func (m *RpcClient) GetConnState() connectivity.State {
131132
func (m *RpcClient) Close() error {
132133
m.cancel()
133134
return m.conn.Close()
134-
}
135+
}
+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* === This file is part of ALICE O² ===
3+
*
4+
* Copyright 2021 CERN and copyright holders of ALICE O².
5+
* Author: Teo Mrnjavac <[email protected]>
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*
20+
* In applying this license CERN does not waive the privileges and
21+
* immunities granted to it by virtue of its status as an
22+
* Intergovernmental Organization or submit itself to any jurisdiction.
23+
*/
24+
25+
package integration
26+
27+
import (
28+
"context"
29+
"errors"
30+
"time"
31+
32+
"google.golang.org/grpc"
33+
"google.golang.org/grpc/status"
34+
)
35+
36+
func UnaryTimeoutInterceptor(timeout time.Duration, cause string) grpc.UnaryClientInterceptor {
37+
return func(
38+
ctx context.Context,
39+
method string,
40+
req, reply interface{},
41+
connection *grpc.ClientConn,
42+
invoker grpc.UnaryInvoker,
43+
opts ...grpc.CallOption,
44+
) error {
45+
// Create a context with a timeout
46+
ctx, cancel := context.WithTimeoutCause(ctx, timeout, errors.New(cause))
47+
defer cancel() // Ensure the context is canceled after the call
48+
49+
// Invoke the RPC call with the new context
50+
err := invoker(ctx, method, req, reply, connection, opts...)
51+
if err != nil {
52+
st, _ := status.FromError(err)
53+
// Handle error, maybe logging or processing the error status
54+
return st.Err()
55+
}
56+
return nil
57+
}
58+
}

0 commit comments

Comments
 (0)