Skip to content

Commit 04da96b

Browse files
authored
Merge pull request #144 from Icinga/service-pods
Sync pods that belong to a service
2 parents 9994bd3 + 78f7255 commit 04da96b

File tree

4 files changed

+156
-5
lines changed

4 files changed

+156
-5
lines changed

cmd/icinga-kubernetes/main.go

+114-2
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@ import (
3030
"github.com/spf13/pflag"
3131
"golang.org/x/sync/errgroup"
3232
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33+
"k8s.io/apimachinery/pkg/labels"
3334
"k8s.io/apimachinery/pkg/util/runtime"
3435
"k8s.io/client-go/informers"
36+
v2 "k8s.io/client-go/informers/core/v1"
3537
"k8s.io/client-go/kubernetes"
3638
kclientcmd "k8s.io/client-go/tools/clientcmd"
3739
"k8s.io/klog/v2"
@@ -293,6 +295,10 @@ func main() {
293295
})
294296
}
295297

298+
g.Go(func() error {
299+
return SyncServicePods(ctx, db, factory.Core().V1().Services(), factory.Core().V1().Pods())
300+
})
301+
296302
if cfg.Prometheus.Url != "" {
297303
promClient, err := promapi.NewClient(promapi.Config{Address: cfg.Prometheus.Url})
298304
if err != nil {
@@ -436,9 +442,13 @@ func main() {
436442
})
437443

438444
g.Go(func() error {
439-
s := syncv1.NewSync(db, factory.Core().V1().Services().Informer(), log.WithName("services"), schemav1.NewService)
445+
f := schemav1.NewServiceFactory(clientset)
446+
s := syncv1.NewSync(db, factory.Core().V1().Services().Informer(), log.WithName("services"), f.NewService)
440447

441-
return s.Run(ctx)
448+
return s.Run(
449+
ctx,
450+
syncv1.WithOnUpsert(com.ForwardBulk(cachev1.Multiplexers().Services().UpsertEvents().In())),
451+
)
442452
})
443453

444454
g.Go(func() error {
@@ -561,3 +571,105 @@ func dbHasSchema(db *database.Database, dbName string) (bool, error) {
561571

562572
return rows.Next(), rows.Err()
563573
}
574+
575+
func SyncServicePods(ctx context.Context, db *database.Database, serviceList v2.ServiceInformer, podList v2.PodInformer) error {
576+
servicePods := make(chan any)
577+
578+
g, ctx := errgroup.WithContext(ctx)
579+
g.Go(func() error {
580+
return db.UpsertStreamed(ctx, servicePods)
581+
})
582+
583+
g.Go(func() error {
584+
ch := cachev1.Multiplexers().Pods().UpsertEvents().Out()
585+
for {
586+
select {
587+
case pod, more := <-ch:
588+
if !more {
589+
return nil
590+
}
591+
592+
services, err := serviceList.Lister().List(labels.NewSelector())
593+
if err != nil {
594+
return err
595+
}
596+
597+
podLabels := make(labels.Set)
598+
for _, label := range pod.(*schemav1.Pod).Labels {
599+
podLabels[label.Name] = label.Value
600+
}
601+
602+
for _, service := range services {
603+
if len(service.Spec.Selector) == 0 {
604+
continue
605+
}
606+
607+
labelSelector := &v1.LabelSelector{MatchLabels: service.Spec.Selector}
608+
selector, err := v1.LabelSelectorAsSelector(labelSelector)
609+
if err != nil {
610+
return err
611+
}
612+
613+
if selector.Matches(podLabels) {
614+
select {
615+
case servicePods <- schemav1.ServicePod{
616+
ServiceUuid: schemav1.EnsureUUID(service.UID),
617+
PodUuid: pod.(*schemav1.Pod).Uuid,
618+
}:
619+
case <-ctx.Done():
620+
return ctx.Err()
621+
}
622+
}
623+
}
624+
case <-ctx.Done():
625+
return ctx.Err()
626+
}
627+
}
628+
})
629+
630+
g.Go(func() error {
631+
ch := cachev1.Multiplexers().Services().UpsertEvents().Out()
632+
for {
633+
select {
634+
case service, more := <-ch:
635+
if !more {
636+
return nil
637+
}
638+
639+
if len(service.(*schemav1.Service).Selectors) == 0 {
640+
continue
641+
}
642+
643+
labelSelector := &v1.LabelSelector{MatchLabels: map[string]string{}}
644+
for _, selector := range service.(*schemav1.Service).Selectors {
645+
labelSelector.MatchLabels[selector.Name] = selector.Value
646+
}
647+
648+
selector, err := v1.LabelSelectorAsSelector(labelSelector)
649+
if err != nil {
650+
return err
651+
}
652+
653+
pods, err := podList.Lister().List(selector)
654+
if err != nil {
655+
return err
656+
}
657+
658+
for _, pod := range pods {
659+
select {
660+
case servicePods <- schemav1.ServicePod{
661+
ServiceUuid: service.(*schemav1.Service).Uuid,
662+
PodUuid: schemav1.EnsureUUID(pod.UID),
663+
}:
664+
case <-ctx.Done():
665+
return ctx.Err()
666+
}
667+
}
668+
case <-ctx.Done():
669+
return ctx.Err()
670+
}
671+
}
672+
})
673+
674+
return g.Wait()
675+
}

internal/cache/v1/multiplexers.go

+14
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type EventsMultiplexers interface {
1818
Nodes() EventsMultiplexer
1919
Pods() EventsMultiplexer
2020
ReplicaSets() EventsMultiplexer
21+
Services() EventsMultiplexer
2122
StatefulSets() EventsMultiplexer
2223
Run(context.Context) error
2324
}
@@ -59,6 +60,7 @@ type multiplexers struct {
5960
nodes events
6061
pods events
6162
replicaSets events
63+
services events
6264
statefulSets events
6365
}
6466

@@ -82,6 +84,10 @@ func (m multiplexers) ReplicaSets() EventsMultiplexer {
8284
return m.replicaSets
8385
}
8486

87+
func (m multiplexers) Services() EventsMultiplexer {
88+
return m.services
89+
}
90+
8591
func (m multiplexers) StatefulSets() EventsMultiplexer {
8692
return m.statefulSets
8793
}
@@ -109,6 +115,10 @@ func (m multiplexers) Run(ctx context.Context) error {
109115
return m.replicaSets.Run(ctx)
110116
})
111117

118+
g.Go(func() error {
119+
return m.services.Run(ctx)
120+
})
121+
112122
g.Go(func() error {
113123
return m.statefulSets.Run(ctx)
114124
})
@@ -140,6 +150,10 @@ func init() {
140150
upsertEvents: internal.NewChannelMux[any](),
141151
deleteEvents: internal.NewChannelMux[any](),
142152
},
153+
services: events{
154+
upsertEvents: internal.NewChannelMux[any](),
155+
deleteEvents: internal.NewChannelMux[any](),
156+
},
143157
statefulSets: events{
144158
upsertEvents: internal.NewChannelMux[any](),
145159
deleteEvents: internal.NewChannelMux[any](),

pkg/schema/v1/service.go

+22-3
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,14 @@ import (
1010
kruntime "k8s.io/apimachinery/pkg/runtime"
1111
kserializer "k8s.io/apimachinery/pkg/runtime/serializer"
1212
kjson "k8s.io/apimachinery/pkg/runtime/serializer/json"
13+
"k8s.io/client-go/kubernetes"
1314
"strings"
1415
)
1516

17+
type ServiceFactory struct {
18+
clientset *kubernetes.Clientset
19+
}
20+
1621
type Service struct {
1722
Meta
1823
ClusterIP string
@@ -40,6 +45,8 @@ type Service struct {
4045
Annotations []Annotation `db:"-"`
4146
ServiceAnnotations []ServiceAnnotation `db:"-"`
4247
ResourceAnnotations []ResourceAnnotation `db:"-"`
48+
ServicePods []ServicePod `db:"-"`
49+
factory *ServiceFactory
4350
}
4451

4552
type ServiceSelector struct {
@@ -77,8 +84,19 @@ type ServiceAnnotation struct {
7784
AnnotationUuid types.UUID
7885
}
7986

80-
func NewService() Resource {
81-
return &Service{}
87+
type ServicePod struct {
88+
ServiceUuid types.UUID
89+
PodUuid types.UUID
90+
}
91+
92+
func NewServiceFactory(clientset *kubernetes.Clientset) *ServiceFactory {
93+
return &ServiceFactory{
94+
clientset: clientset,
95+
}
96+
}
97+
98+
func (f *ServiceFactory) NewService() Resource {
99+
return &Service{factory: f}
82100
}
83101

84102
func (s *Service) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) {
@@ -216,7 +234,6 @@ func (s *Service) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) {
216234
internalTrafficPolicy = string(*service.Spec.InternalTrafficPolicy)
217235
}
218236
s.InternalTrafficPolicy = internalTrafficPolicy
219-
220237
scheme := kruntime.NewScheme()
221238
_ = kcorev1.AddToScheme(scheme)
222239
codec := kserializer.NewCodecFactory(scheme).EncoderForVersion(kjson.NewYAMLSerializer(kjson.DefaultMetaFactory, scheme, scheme), kcorev1.SchemeGroupVersion)
@@ -238,5 +255,7 @@ func (s *Service) Relations() []database.Relation {
238255
database.HasMany(s.ResourceAnnotations, database.WithForeignKey("resource_uuid")),
239256
database.HasMany(s.Annotations, database.WithoutCascadeDelete()),
240257
database.HasMany(s.ServiceAnnotations, fk),
258+
database.HasMany(s.ResourceAnnotations, fk),
259+
database.HasMany(s.ServicePods, fk),
241260
}
242261
}

schema/mysql/schema.sql

+6
Original file line numberDiff line numberDiff line change
@@ -920,6 +920,12 @@ CREATE TABLE service_label (
920920
PRIMARY KEY (service_uuid, label_uuid)
921921
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
922922

923+
CREATE TABLE service_pod (
924+
service_uuid binary(16) NOT NULL,
925+
pod_uuid binary(16) NOT NULL,
926+
PRIMARY KEY (service_uuid, pod_uuid)
927+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
928+
923929
CREATE TABLE service_port (
924930
service_uuid binary(16) NOT NULL,
925931
name varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL,

0 commit comments

Comments
 (0)