Skip to content

Commit 82ba7bc

Browse files
authored
Merge pull request #1086 from klihub/fixes/relay-container-events
release-0.9: pkg/cri: implement container event relaying.
2 parents db28d55 + 5db9a45 commit 82ba7bc

File tree

3 files changed

+113
-14
lines changed

3 files changed

+113
-14
lines changed

pkg/cri/relay/relay.go

+12-6
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ import (
1919
"os"
2020
"sync"
2121

22+
criv1 "k8s.io/cri-api/pkg/apis/runtime/v1"
23+
2224
"github.com/intel/cri-resource-manager/pkg/cri/client"
2325
"github.com/intel/cri-resource-manager/pkg/cri/server"
2426
logger "github.com/intel/cri-resource-manager/pkg/log"
@@ -60,19 +62,23 @@ type Relay interface {
6062
// relay is the implementation of Relay.
6163
type relay struct {
6264
logger.Logger
63-
sync.Mutex // hmm... do *we* need to be lockable, or the upper layer(s) ?
64-
options Options // relay options
65-
client client.Client // relay CRI client
66-
server server.Server // relay CRI server
65+
sync.Mutex
66+
options Options // relay options
67+
client client.Client // relay CRI client
68+
server server.Server // relay CRI server
69+
70+
evtClient criv1.RuntimeService_GetContainerEventsClient
71+
evtChans map[*criv1.GetEventsRequest]chan *criv1.ContainerEventResponse
6772
}
6873

6974
// NewRelay creates a new relay instance.
7075
func NewRelay(options Options) (Relay, error) {
7176
var err error
7277

7378
r := &relay{
74-
Logger: logger.NewLogger("cri/relay"),
75-
options: options,
79+
Logger: logger.NewLogger("cri/relay"),
80+
options: options,
81+
evtChans: map[*criv1.GetEventsRequest]chan *criv1.ContainerEventResponse{},
7682
}
7783

7884
imageSocket := r.options.ImageSocket

pkg/cri/relay/runtime-service.go

+96-4
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ package relay
1616

1717
import (
1818
"context"
19+
"fmt"
20+
"time"
1921

20-
"google.golang.org/grpc/codes"
21-
"google.golang.org/grpc/status"
2222
criv1 "k8s.io/cri-api/pkg/apis/runtime/v1"
2323

2424
"github.com/intel/cri-resource-manager/pkg/dump"
@@ -188,8 +188,23 @@ func (r *relay) CheckpointContainer(ctx context.Context, req *criv1.CheckpointCo
188188
return r.client.CheckpointContainer(ctx, req)
189189
}
190190

191-
func (r *relay) GetContainerEvents(_ *criv1.GetEventsRequest, _ criv1.RuntimeService_GetContainerEventsServer) error {
192-
return status.Errorf(codes.Unimplemented, "method GetContainerEvents not implemented")
191+
func (r *relay) GetContainerEvents(req *criv1.GetEventsRequest, srv criv1.RuntimeService_GetContainerEventsServer) error {
192+
evtC := r.addEventServer(req)
193+
194+
if err := r.startEventRelay(req); err != nil {
195+
r.delEventServer(req)
196+
return err
197+
}
198+
199+
for evt := range evtC {
200+
if err := srv.Send(evt); err != nil {
201+
r.Errorf("failed to relay/send container event: %v", err)
202+
r.delEventServer(req)
203+
return err
204+
}
205+
}
206+
207+
return nil
193208
}
194209

195210
func (r *relay) ListMetricDescriptors(ctx context.Context, req *criv1.ListMetricDescriptorsRequest) (*criv1.ListMetricDescriptorsResponse, error) {
@@ -206,3 +221,80 @@ func (r *relay) RuntimeConfig(ctx context.Context, req *criv1.RuntimeConfigReque
206221
r.dump("RuntimeConfig", req)
207222
return r.client.RuntimeConfig(ctx, req)
208223
}
224+
225+
const (
226+
eventRelayTimeout = 1 * time.Second
227+
)
228+
229+
func (r *relay) addEventServer(req *criv1.GetEventsRequest) chan *criv1.ContainerEventResponse {
230+
r.Lock()
231+
defer r.Unlock()
232+
233+
evtC := make(chan *criv1.ContainerEventResponse, 128)
234+
r.evtChans[req] = evtC
235+
236+
return evtC
237+
}
238+
239+
func (r *relay) delEventServer(req *criv1.GetEventsRequest) chan *criv1.ContainerEventResponse {
240+
r.Lock()
241+
defer r.Unlock()
242+
243+
evtC := r.evtChans[req]
244+
delete(r.evtChans, req)
245+
246+
return evtC
247+
}
248+
249+
func (r *relay) startEventRelay(req *criv1.GetEventsRequest) error {
250+
r.Lock()
251+
defer r.Unlock()
252+
253+
if r.evtClient != nil {
254+
return nil
255+
}
256+
257+
c, err := r.client.GetContainerEvents(context.Background(), req)
258+
if err != nil {
259+
return fmt.Errorf("failed to create container event client: %w", err)
260+
}
261+
262+
r.evtClient = c
263+
go r.relayEvents()
264+
265+
return nil
266+
}
267+
268+
func (r *relay) relayEvents() {
269+
for {
270+
evt, err := r.evtClient.Recv()
271+
if err != nil {
272+
r.Errorf("failed to relay/receive container event: %v", err)
273+
}
274+
275+
r.Lock()
276+
277+
if err != nil {
278+
for req, evtC := range r.evtChans {
279+
delete(r.evtChans, req)
280+
close(evtC)
281+
}
282+
r.evtClient = nil
283+
} else {
284+
for req, evtC := range r.evtChans {
285+
select {
286+
case evtC <- evt:
287+
case _ = <-time.After(eventRelayTimeout):
288+
delete(r.evtChans, req)
289+
close(evtC)
290+
}
291+
}
292+
}
293+
294+
r.Unlock()
295+
296+
if err != nil {
297+
return
298+
}
299+
}
300+
}

pkg/cri/server/services.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ import (
1919

2020
"go.opencensus.io/trace"
2121
"google.golang.org/grpc"
22-
grpccodes "google.golang.org/grpc/codes"
23-
grpcstatus "google.golang.org/grpc/status"
2422

2523
criv1 "k8s.io/cri-api/pkg/apis/runtime/v1"
2624
)
@@ -61,6 +59,7 @@ const (
6159
updateRuntimeConfig = "UpdateRuntimeConfig"
6260
status = "Status"
6361
checkpointContainer = "CheckpointContainer"
62+
getContainerEvents = "GetContainerEvents"
6463
listMetricDescriptors = "ListMetricDescriptors"
6564
listPodSandboxMetrics = "ListPodSandboxMetrics"
6665
runtimeConfig = "RuntimeConfig"
@@ -501,8 +500,10 @@ func (s *server) CheckpointContainer(ctx context.Context, req *criv1.CheckpointC
501500
return rsp.(*criv1.CheckpointContainerResponse), err
502501
}
503502

504-
func (s *server) GetContainerEvents(_ *criv1.GetEventsRequest, _ criv1.RuntimeService_GetContainerEventsServer) error {
505-
return grpcstatus.Errorf(grpccodes.Unimplemented, "GetContainerEvents not implemented")
503+
func (s *server) GetContainerEvents(req *criv1.GetEventsRequest, srv criv1.RuntimeService_GetContainerEventsServer) error {
504+
// TODO(klihub): interceptRequest is a unary interceptor. It can't handle streaming
505+
// requests so for now we short-circuit the call to the server here.
506+
return (*s.runtime).GetContainerEvents(req, srv)
506507
}
507508

508509
func (s *server) ListMetricDescriptors(ctx context.Context, req *criv1.ListMetricDescriptorsRequest) (*criv1.ListMetricDescriptorsResponse, error) {

0 commit comments

Comments
 (0)