Skip to content

✨ cloudevent client for klusterlet agent to talk to grpc server #106

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions pkg/cloudevents/clients/addon/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package addon

import (
"context"
"net/http"

addonapiv1alpha1 "open-cluster-management.io/api/addon/v1alpha1"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubetypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/klog/v2"

addonv1alpha1client "open-cluster-management.io/api/client/addon/clientset/versioned/typed/addon/v1alpha1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/common"
cloudeventserrors "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/errors"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
)

// ManagedClusterAddOnClient implements the ManagedClusterAddonInterface.
type ManagedClusterAddOnClient struct {
cloudEventsClient *generic.CloudEventAgentClient[*addonapiv1alpha1.ManagedClusterAddOn]
watcherStore store.ClientWatcherStore[*addonapiv1alpha1.ManagedClusterAddOn]
namespace string
}

var _ addonv1alpha1client.ManagedClusterAddOnInterface = &ManagedClusterAddOnClient{}

func NewManagedClusterAddOnClient(
cloudEventsClient *generic.CloudEventAgentClient[*addonapiv1alpha1.ManagedClusterAddOn],
watcherStore store.ClientWatcherStore[*addonapiv1alpha1.ManagedClusterAddOn],
) *ManagedClusterAddOnClient {
return &ManagedClusterAddOnClient{
cloudEventsClient: cloudEventsClient,
watcherStore: watcherStore,
}
}

func (c *ManagedClusterAddOnClient) Namespace(namespace string) *ManagedClusterAddOnClient {
c.namespace = namespace
return c
}

func (c *ManagedClusterAddOnClient) Create(
ctx context.Context, addon *addonapiv1alpha1.ManagedClusterAddOn, opts metav1.CreateOptions) (*addonapiv1alpha1.ManagedClusterAddOn, error) {
return nil, errors.NewMethodNotSupported(common.ManagedClusterAddOnGR, "create")
}

func (c *ManagedClusterAddOnClient) Update(ctx context.Context, addon *addonapiv1alpha1.ManagedClusterAddOn, opts metav1.UpdateOptions) (*addonapiv1alpha1.ManagedClusterAddOn, error) {
return nil, errors.NewMethodNotSupported(common.ManagedClusterAddOnGR, "update")
}

func (c *ManagedClusterAddOnClient) UpdateStatus(ctx context.Context, addon *addonapiv1alpha1.ManagedClusterAddOn, opts metav1.UpdateOptions) (*addonapiv1alpha1.ManagedClusterAddOn, error) {
return nil, errors.NewMethodNotSupported(common.ManagedClusterAddOnGR, "updatestatus")
}

func (c *ManagedClusterAddOnClient) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error {
return errors.NewMethodNotSupported(common.ManagedClusterAddOnGR, "delete")
}

func (c *ManagedClusterAddOnClient) DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error {
return errors.NewMethodNotSupported(common.ManagedClusterAddOnGR, "deletecollection")
}

func (c *ManagedClusterAddOnClient) Get(ctx context.Context, name string, opts metav1.GetOptions) (*addonapiv1alpha1.ManagedClusterAddOn, error) {
klog.V(4).Infof("getting ManagedClusterAddOn %s/%s", c.namespace, name)
addon, exists, err := c.watcherStore.Get(c.namespace, name)
if err != nil {
return nil, errors.NewInternalError(err)
}
if !exists {
return nil, errors.NewNotFound(common.ManagedClusterAddOnGR, c.namespace+"/"+name)
}

return addon, nil
}

func (c *ManagedClusterAddOnClient) List(ctx context.Context, opts metav1.ListOptions) (*addonapiv1alpha1.ManagedClusterAddOnList, error) {
klog.V(4).Info("list ManagedClusterAddon")
addonList, err := c.watcherStore.List(c.namespace, opts)
if err != nil {
return nil, errors.NewInternalError(err)
}

items := []addonapiv1alpha1.ManagedClusterAddOn{}
for _, cluster := range addonList.Items {
items = append(items, *cluster)
}

return &addonapiv1alpha1.ManagedClusterAddOnList{ListMeta: addonList.ListMeta, Items: items}, nil
}

func (c *ManagedClusterAddOnClient) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
klog.V(4).Info("watch ManagedClusterAddOn")
watcher, err := c.watcherStore.GetWatcher(c.namespace, opts)
if err != nil {
return nil, errors.NewInternalError(err)
}

return watcher, nil
}

func (c *ManagedClusterAddOnClient) Patch(
ctx context.Context, name string, pt kubetypes.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (*addonapiv1alpha1.ManagedClusterAddOn, error) {
klog.V(4).Infof("patching ManagedClusterAddon %s/%s", c.namespace, name)
last, exists, err := c.watcherStore.Get(c.namespace, name)
if err != nil {
return nil, errors.NewInternalError(err)
}
if !exists {
return nil, errors.NewNotFound(common.ManagedClusterAddOnGR, c.namespace+"/"+name)
}

patchedAddon, err := utils.Patch(pt, last, data)
if err != nil {
return nil, errors.NewInternalError(err)
}

eventType := types.CloudEventsType{
CloudEventsDataType: ManagedClusterAddOnEventDataType,
SubResource: types.SubResourceStatus,
}

newAddon := patchedAddon.DeepCopy()

if !utils.IsStatusPatch(subresources) {
msg := "subresources \"status\" is required"
return nil, errors.NewGenericServerResponse(http.StatusMethodNotAllowed, "patch", common.ManagedClusterAddOnGR, name, msg, 0, false)
}

// publish the status update event to source, source will check the resource version
// and reject the update if it's status update is outdated.
eventType.Action = common.UpdateRequestAction
if err := c.cloudEventsClient.Publish(ctx, eventType, newAddon); err != nil {
return nil, cloudeventserrors.ToStatusError(common.ManagedClusterAddOnGR, name, err)
}

return newAddon, nil
}
103 changes: 103 additions & 0 deletions pkg/cloudevents/clients/addon/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package addon

import (
"context"
"encoding/json"
"testing"
"time"

"github.com/cloudevents/sdk-go/v2/protocol/gochan"
jsonpatch "github.com/evanphx/json-patch"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

addonapiv1alpha1 "open-cluster-management.io/api/addon/v1alpha1"
addoninformers "open-cluster-management.io/api/client/addon/informers/externalversions"

"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/statushash"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/fake"
)

func TestPatch(t *testing.T) {
cases := []struct {
name string
clusterName string
addon *addonapiv1alpha1.ManagedClusterAddOn
patch []byte
}{
{
name: "patch addon",
clusterName: "cluster1",
addon: &addonapiv1alpha1.ManagedClusterAddOn{
ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "cluster1"},
},
patch: func() []byte {
old := &addonapiv1alpha1.ManagedClusterAddOn{
ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "cluster1"},
}
oldData, err := json.Marshal(old)
if err != nil {
t.Error(err)
}

new := old.DeepCopy()
new.Status = addonapiv1alpha1.ManagedClusterAddOnStatus{
Namespace: "install",
}

newData, err := json.Marshal(new)
if err != nil {
t.Error(err)
}

patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
t.Error(err)
}
return patchBytes
}(),
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
watcherStore := store.NewAgentInformerWatcherStore[*addonapiv1alpha1.ManagedClusterAddOn]()

ceClientOpt := fake.NewAgentOptions(gochan.New(), nil, c.clusterName, c.clusterName+"agent")
ceClient, err := generic.NewCloudEventAgentClient(
context.Background(),
ceClientOpt,
store.NewAgentWatcherStoreLister(watcherStore),
statushash.StatusHash,
NewManagedClusterAddOnCodec())
if err != nil {
t.Error(err)
}
addonClientSet := &AddonClientSetWrapper{&AddonV1Alpha1ClientWrapper{
NewManagedClusterAddOnClient(ceClient, watcherStore),
}}

addonInformerFactory := addoninformers.NewSharedInformerFactory(addonClientSet, time.Minute*10)
informer := addonInformerFactory.Addon().V1alpha1().ManagedClusterAddOns().Informer()
store := informer.GetStore()
if err := store.Add(c.addon); err != nil {
t.Error(err)
}
watcherStore.SetInformer(informer)

if _, err = addonClientSet.AddonV1alpha1().ManagedClusterAddOns(c.clusterName).Patch(
context.Background(),
c.addon.Name,
types.MergePatchType,
c.patch,
metav1.PatchOptions{},
"status",
); err != nil {
t.Error(err)
}
})
}
}
67 changes: 67 additions & 0 deletions pkg/cloudevents/clients/addon/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package addon

import (
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
addonapiv1alpha1 "open-cluster-management.io/api/addon/v1alpha1"

cloudevents "github.com/cloudevents/sdk-go/v2"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
)

var ManagedClusterAddOnEventDataType = types.CloudEventsDataType{
Group: addonapiv1alpha1.GroupVersion.Group,
Version: addonapiv1alpha1.GroupVersion.Version,
Resource: "managedclusteraddons",
}

// ManagedClusterAddOnCodec is a codec to encode/decode a ManagedClusterAddOn/cloudevent for an agent.
type ManagedClusterAddOnCodec struct{}

func NewManagedClusterAddOnCodec() *ManagedClusterAddOnCodec {
return &ManagedClusterAddOnCodec{}
}

// EventDataType always returns the event data type `io.open-cluster-management.addon.v1alpha1.managedclusteraddons`.
func (c *ManagedClusterAddOnCodec) EventDataType() types.CloudEventsDataType {
return ManagedClusterAddOnEventDataType
}

// Encode the ManagedClusterAddOn to a cloudevent
func (c *ManagedClusterAddOnCodec) Encode(source string, eventType types.CloudEventsType, addon *addonapiv1alpha1.ManagedClusterAddOn) (*cloudevents.Event, error) {
if eventType.CloudEventsDataType != ManagedClusterAddOnEventDataType {
return nil, fmt.Errorf("unsupported cloudevents data type %s", eventType.CloudEventsDataType)
}

evt := types.NewEventBuilder(source, eventType).
WithResourceID(addon.Name).
WithClusterName(addon.Namespace).
NewEvent()

if addon.ResourceVersion != "" {
evt.SetExtension(types.ExtensionResourceVersion, addon.ResourceVersion)
}

newAddon := addon.DeepCopy()
newAddon.TypeMeta = metav1.TypeMeta{
APIVersion: addonapiv1alpha1.GroupVersion.String(),
Kind: "ManagedClusterAddOn",
}

if err := evt.SetData(cloudevents.ApplicationJSON, newAddon); err != nil {
return nil, fmt.Errorf("failed to encode managedclusteraddon to a cloudevent: %v", err)
}

return &evt, nil
}

// Decode a cloudevent to a ManagedClusterAddOn
func (c *ManagedClusterAddOnCodec) Decode(evt *cloudevents.Event) (*addonapiv1alpha1.ManagedClusterAddOn, error) {
addon := &addonapiv1alpha1.ManagedClusterAddOn{}
if err := evt.DataAs(addon); err != nil {
return nil, fmt.Errorf("failed to unmarshal event data %s, %v", string(evt.Data()), err)
}

return addon, nil
}
68 changes: 68 additions & 0 deletions pkg/cloudevents/clients/addon/wrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package addon

import (
"context"

"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"

addonapiv1alpha1 "open-cluster-management.io/api/addon/v1alpha1"
addonclientset "open-cluster-management.io/api/client/addon/clientset/versioned"
addonv1alpha1client "open-cluster-management.io/api/client/addon/clientset/versioned/typed/addon/v1alpha1"

"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/options"
)

// AddonClientSetWrapper wraps AddonV1Alpha1ClientWrapper to an addon clientset interface
type AddonClientSetWrapper struct {
*AddonV1Alpha1ClientWrapper
}

var _ addonclientset.Interface = &AddonClientSetWrapper{}

func (a AddonClientSetWrapper) Discovery() discovery.DiscoveryInterface {
panic("Discovery is unsupported")
}

func (a AddonClientSetWrapper) AddonV1alpha1() addonv1alpha1client.AddonV1alpha1Interface {
return a.AddonV1Alpha1ClientWrapper
}

// AddonV1Alpha1ClientWrapper wraps ManagedClusterAddOnClient to AddonV1alpha1Interface
type AddonV1Alpha1ClientWrapper struct {
*ManagedClusterAddOnClient
}

var _ addonv1alpha1client.AddonV1alpha1Interface = &AddonV1Alpha1ClientWrapper{}

func (c *AddonV1Alpha1ClientWrapper) AddOnDeploymentConfigs(namespace string) addonv1alpha1client.AddOnDeploymentConfigInterface {
panic("AddOnDeploymentConfigs is unsupported")
}

func (c *AddonV1Alpha1ClientWrapper) AddOnTemplates() addonv1alpha1client.AddOnTemplateInterface {
panic("AddOnTemplates is unsupported")
}

func (c *AddonV1Alpha1ClientWrapper) ClusterManagementAddOns() addonv1alpha1client.ClusterManagementAddOnInterface {
panic("ClusterManagementAddOns is unsupported")
}

func (c *AddonV1Alpha1ClientWrapper) RESTClient() rest.Interface {
panic("RESTClient is unsupported")
}

func (c *AddonV1Alpha1ClientWrapper) ManagedClusterAddOns(namespace string) addonv1alpha1client.ManagedClusterAddOnInterface {
return c.ManagedClusterAddOnClient.Namespace(namespace)
}

// ManagedClusterAddOnInterface returns a client for ManagedClusterAddOn
func ManagedClusterAddOnInterface(ctx context.Context, opt *options.GenericClientOptions[*addonapiv1alpha1.ManagedClusterAddOn]) (addonclientset.Interface, error) {
cloudEventsClient, err := opt.AgentClient(ctx)
if err != nil {
return nil, err
}

addonClient := NewManagedClusterAddOnClient(cloudEventsClient, opt.WatcherStore())

return &AddonClientSetWrapper{&AddonV1Alpha1ClientWrapper{addonClient}}, nil
}
Loading