From 3d79f034006223b2c741a64ed714942db73ebbac Mon Sep 17 00:00:00 2001 From: Nicolas Bigler Date: Wed, 22 Jan 2025 18:38:45 +0100 Subject: [PATCH] Continue refactoring Signed-off-by: Nicolas Bigler --- apis/exoscale/v1/dbaas_types.go | 13 +-- apis/exoscale/v1/iam_types.go | 2 +- go.mod | 5 +- go.sum | 6 ++ operator/common/utils.go | 25 +++++ operator/iamkeycontroller/connector.go | 5 +- operator/iamkeycontroller/create.go | 4 +- operator/iamkeycontroller/delete.go | 4 +- operator/iamkeycontroller/observe.go | 4 +- operator/iamkeycontroller/update.go | 2 +- operator/iamkeycontroller/util.go | 10 -- operator/kafkacontroller/connector.go | 39 ++++++++ operator/kafkacontroller/create.go | 42 ++++---- operator/kafkacontroller/delete.go | 6 +- operator/kafkacontroller/observe.go | 95 ++++++++++--------- operator/kafkacontroller/pipeline.go | 23 +++++ operator/kafkacontroller/settings.go | 17 ++-- .../{controller.go => setup.go} | 49 +++------- operator/kafkacontroller/update.go | 36 +++---- operator/kafkacontroller/webhook.go | 14 +-- operator/mapper/alias.go | 32 +++---- operator/mysqlcontroller/connector.go | 10 +- operator/mysqlcontroller/create.go | 31 +++--- operator/mysqlcontroller/delete.go | 4 +- operator/mysqlcontroller/observe.go | 62 ++++++------ operator/mysqlcontroller/pipeline.go | 2 +- operator/mysqlcontroller/settings.go | 4 +- operator/opensearchcontroller/settings.go | 2 +- operator/operator.go | 25 ++--- operator/pipelineutil/exoscale_connector.go | 6 +- 30 files changed, 313 insertions(+), 266 deletions(-) create mode 100644 operator/common/utils.go create mode 100644 operator/kafkacontroller/connector.go create mode 100644 operator/kafkacontroller/pipeline.go rename operator/kafkacontroller/{controller.go => setup.go} (53%) diff --git a/apis/exoscale/v1/dbaas_types.go b/apis/exoscale/v1/dbaas_types.go index d908e2c1..29a101dd 100644 --- a/apis/exoscale/v1/dbaas_types.go +++ b/apis/exoscale/v1/dbaas_types.go @@ -7,7 +7,8 @@ import ( "strings" xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" - exoscaleoapi "github.com/exoscale/egoscale/v2/oapi" + exoscalesdk "github.com/exoscale/egoscale/v3" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -27,19 +28,19 @@ type NodeState struct { // Name of the service node Name string `json:"name,omitempty"` // Role of this node. - Role exoscaleoapi.DbaasNodeStateRole `json:"role,omitempty"` + Role exoscalesdk.DBAASNodeStateRole `json:"role,omitempty"` // State of the service node. - State exoscaleoapi.DbaasNodeStateState `json:"state,omitempty"` + State exoscalesdk.DBAASNodeStateState `json:"state,omitempty"` } // Notification contains a service message. type Notification struct { // Level of the notification. - Level exoscaleoapi.DbaasServiceNotificationLevel `json:"level,omitempty"` + Level exoscalesdk.DBAASServiceNotificationLevel `json:"level,omitempty"` // Message contains the notification. Message string `json:"message,omitempty"` // Type of the notification. - Type exoscaleoapi.DbaasServiceNotificationType `json:"type,omitempty"` + Type exoscalesdk.DBAASServiceNotificationType `json:"type,omitempty"` // Metadata contains additional data. Metadata runtime.RawExtension `json:"metadata,omitempty"` } @@ -61,7 +62,7 @@ type MaintenanceSpec struct { // DayOfWeek specifies at which weekday the maintenance is held place. // Allowed values are [monday, tuesday, wednesday, thursday, friday, saturday, sunday, never] - DayOfWeek exoscaleoapi.DbaasServiceMaintenanceDow `json:"dayOfWeek,omitempty"` + DayOfWeek exoscalesdk.DBAASServiceMaintenanceDow `json:"dayOfWeek,omitempty"` // TimeOfDay for installing updates in UTC. // Format: "hh:mm:ss". diff --git a/apis/exoscale/v1/iam_types.go b/apis/exoscale/v1/iam_types.go index bd0ec5ee..413e1950 100644 --- a/apis/exoscale/v1/iam_types.go +++ b/apis/exoscale/v1/iam_types.go @@ -47,7 +47,7 @@ type IAMKeyParameters struct { // Zone is the name of the zone where the IAM key is created. // The zone must be available in the S3 endpoint. // Cannot be changed after IAMKey is created. - Zone string `json:"zone"` + Zone Zone `json:"zone"` // +kubebuilder:validation:Required diff --git a/go.mod b/go.mod index d81879b1..fb29eeb0 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/exoscale/egoscale/v3 v3.1.8 github.com/go-logr/logr v1.4.2 github.com/go-logr/zapr v1.3.0 + github.com/google/go-cmp v0.6.0 github.com/hashicorp/go-version v1.6.0 github.com/minio/minio-go/v7 v7.0.70 github.com/stretchr/testify v1.9.0 @@ -47,13 +48,15 @@ require ( github.com/go-playground/validator/v10 v10.9.0 // indirect github.com/gobuffalo/flect v1.0.2 // indirect github.com/goccy/go-json v0.10.2 // indirect + github.com/gofrs/uuid v4.4.0+incompatible // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/gnostic-models v0.6.8 // indirect - github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/hashicorp/go-cleanhttp v0.5.2 // indirect + github.com/hashicorp/go-retryablehttp v0.7.7 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/imdario/mergo v0.3.16 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect diff --git a/go.sum b/go.sum index 8ad5722c..8484ccfe 100644 --- a/go.sum +++ b/go.sum @@ -108,6 +108,12 @@ github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLe github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= +github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= +github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k= +github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= +github.com/hashicorp/go-retryablehttp v0.7.7 h1:C8hUCYzor8PIfXHa4UrZkU4VvK8o9ISHxT2Q8+VepXU= +github.com/hashicorp/go-retryablehttp v0.7.7/go.mod h1:pkQpWZeYWskR+D1tR2O5OcBFOxfA7DoAO6xtkuQnHTk= github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= diff --git a/operator/common/utils.go b/operator/common/utils.go new file mode 100644 index 00000000..04f67bea --- /dev/null +++ b/operator/common/utils.go @@ -0,0 +1,25 @@ +package common + +import ( + exoscalesdk "github.com/exoscale/egoscale/v3" + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" +) + +var ( + ZoneTranslation = map[exoscalev1.Zone]exoscalesdk.Endpoint{ + "CH-DK-2": exoscalesdk.CHDk2, + "CH-GVA-2": exoscalesdk.CHGva2, + "DE-FRA-1": exoscalesdk.DEFra1, + "DE-MUC-1": exoscalesdk.DEMuc1, + "AT-VIE-1": exoscalesdk.ATVie1, + "AT-VIE-2": exoscalesdk.ATVie2, + "BG-SOF-1": exoscalesdk.BGSof1, + "ch-dk-2": exoscalesdk.CHDk2, + "ch-gva-2": exoscalesdk.CHGva2, + "de-fra-1": exoscalesdk.DEFra1, + "de-muc-1": exoscalesdk.DEMuc1, + "at-vie-1": exoscalesdk.ATVie1, + "at-vie-2": exoscalesdk.ATVie2, + "bg-sof-1": exoscalesdk.BGSof1, + } +) diff --git a/operator/iamkeycontroller/connector.go b/operator/iamkeycontroller/connector.go index 21b82a1f..ff432c65 100644 --- a/operator/iamkeycontroller/connector.go +++ b/operator/iamkeycontroller/connector.go @@ -7,7 +7,10 @@ import ( "github.com/crossplane/crossplane-runtime/pkg/event" "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" "github.com/crossplane/crossplane-runtime/pkg/resource" + exoscalesdk "github.com/exoscale/egoscale/v3" + "github.com/vshn/provider-exoscale/operator/common" "github.com/vshn/provider-exoscale/operator/pipelineutil" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -25,7 +28,7 @@ func (c *connector) Connect(ctx context.Context, mg resource.Managed) (managed.E iamKey := fromManaged(mg) - exo, err := pipelineutil.OpenExoscaleClient(ctx, c.Kube, iamKey.GetProviderConfigName()) + exo, err := pipelineutil.OpenExoscaleClient(ctx, c.Kube, iamKey.GetProviderConfigName(), exoscalesdk.ClientOptWithEndpoint(common.ZoneTranslation[iamKey.Spec.ForProvider.Zone])) if err != nil { return nil, err } diff --git a/operator/iamkeycontroller/create.go b/operator/iamkeycontroller/create.go index 6c4b8146..5f4b213a 100644 --- a/operator/iamkeycontroller/create.go +++ b/operator/iamkeycontroller/create.go @@ -70,7 +70,7 @@ func (p *IAMKeyPipeline) createIAMKey(ctx *pipelineContext) error { log.Info("IAM Role doesnt exists, creating", "keyName", ctx.iamKey.Spec.ForProvider.KeyName) autogeneratedAppcatRole := createRole(iamKey.Spec.ForProvider.KeyName, iamKey.Spec.ForProvider.Services.SOS.Buckets) - op, err := p.exoscaleClient.WithEndpoint(zoneTranslation[ctx.iamKey.Spec.ForProvider.Zone]).CreateIAMRole(ctx, *autogeneratedAppcatRole) + op, err := p.exoscaleClient.CreateIAMRole(ctx, *autogeneratedAppcatRole) if err != nil || op.State != exoscalesdk.OperationStateSuccess { return err @@ -90,7 +90,7 @@ func (p *IAMKeyPipeline) createIAMKey(ctx *pipelineContext) error { // since their API is async and it needs a moment to create the IAM Role, we need to wait for it for i := 0; i < 10; i++ { // send request - iamKeyCreated, err = p.exoscaleClient.WithEndpoint(zoneTranslation[ctx.iamKey.Spec.ForProvider.Zone]).CreateAPIKey(ctx, newIamKeyRequest) + iamKeyCreated, err = p.exoscaleClient.CreateAPIKey(ctx, newIamKeyRequest) if err != nil { time.Sleep(time.Millisecond * 500) continue diff --git a/operator/iamkeycontroller/delete.go b/operator/iamkeycontroller/delete.go index f74a80d4..f121fd54 100644 --- a/operator/iamkeycontroller/delete.go +++ b/operator/iamkeycontroller/delete.go @@ -40,11 +40,11 @@ func (p *IAMKeyPipeline) deleteIAMKey(ctx *pipelineContext) error { log.Info("Starting IAM key deletion", "keyName", iamKey.Spec.ForProvider.KeyName) - op, err := p.exoscaleClient.WithEndpoint(zoneTranslation[ctx.iamKey.Spec.ForProvider.Zone]).DeleteAPIKey(ctx, iamKey.Status.AtProvider.KeyID) + op, err := p.exoscaleClient.DeleteAPIKey(ctx, iamKey.Status.AtProvider.KeyID) if err != nil || op.State != exoscalesdk.OperationStateSuccess { return err } - op, err = p.exoscaleClient.WithEndpoint(zoneTranslation[ctx.iamKey.Spec.ForProvider.Zone]).DeleteIAMRole(ctx, iamKey.Status.AtProvider.RoleID) + op, err = p.exoscaleClient.DeleteIAMRole(ctx, iamKey.Status.AtProvider.RoleID) if err != nil || op.State != exoscalesdk.OperationStateSuccess { return err } diff --git a/operator/iamkeycontroller/observe.go b/operator/iamkeycontroller/observe.go index 45d677bb..9f7b7ead 100644 --- a/operator/iamkeycontroller/observe.go +++ b/operator/iamkeycontroller/observe.go @@ -54,7 +54,7 @@ func (p *IAMKeyPipeline) Observe(ctx context.Context, mg resource.Managed) (mana pctx := &pipelineContext{Context: ctx, iamKey: iamKey} - apiKey, err := p.exoscaleClient.WithEndpoint(zoneTranslation[iamKey.Spec.ForProvider.Zone]).GetAPIKey(ctx, iamKey.Status.AtProvider.KeyID) + apiKey, err := p.exoscaleClient.GetAPIKey(ctx, iamKey.Status.AtProvider.KeyID) if err != nil { if iamKey.GetDeletionTimestamp() != nil { return managed.ExternalObservation{ResourceExists: false}, nil @@ -161,7 +161,7 @@ func (p *IAMKeyPipeline) isRoleUptodate(ctx *pipelineContext) error { func (p *IAMKeyPipeline) observeRole(ctx *pipelineContext) (*exoscalesdk.IAMRole, error) { - respRole, err := p.exoscaleClient.WithEndpoint(zoneTranslation[ctx.iamKey.Spec.ForProvider.Zone]).GetIAMRole(ctx, ctx.iamKey.Status.AtProvider.RoleID) + respRole, err := p.exoscaleClient.GetIAMRole(ctx, ctx.iamKey.Status.AtProvider.RoleID) if err != nil || respRole.ID == "" { return nil, err } diff --git a/operator/iamkeycontroller/update.go b/operator/iamkeycontroller/update.go index 520f12e7..0bd60600 100644 --- a/operator/iamkeycontroller/update.go +++ b/operator/iamkeycontroller/update.go @@ -26,7 +26,7 @@ func (p *IAMKeyPipeline) Update(ctx context.Context, mg resource.Managed) (manag Labels: role.Labels, Permissions: role.Permissions, } - _, err := p.exoscaleClient.WithEndpoint(zoneTranslation[iamKey.Spec.ForProvider.Zone]).UpdateIAMRole(ctx, iamKey.Status.AtProvider.RoleID, updateRole) + _, err := p.exoscaleClient.UpdateIAMRole(ctx, iamKey.Status.AtProvider.RoleID, updateRole) return managed.ExternalUpdate{}, err } diff --git a/operator/iamkeycontroller/util.go b/operator/iamkeycontroller/util.go index c4d7995a..fe971615 100644 --- a/operator/iamkeycontroller/util.go +++ b/operator/iamkeycontroller/util.go @@ -13,16 +13,6 @@ const ( var ( policyAllow = exoscalesdk.IAMServicePolicyRuleActionAllow policyDeny = exoscalesdk.IAMServicePolicyRuleActionDeny - - zoneTranslation = map[string]exoscalesdk.Endpoint{ - "CH-DK-2": exoscalesdk.CHDk2, - "CH-GVA-2": exoscalesdk.CHGva2, - "DE-FRA-1": exoscalesdk.DEFra1, - "DE-MUC-1": exoscalesdk.DEMuc1, - "AT-VIE-1": exoscalesdk.ATVie1, - "AT-VIE-2": exoscalesdk.ATVie2, - "BG-SOF-1": exoscalesdk.BGSof1, - } ) func createRole(keyName string, buckets []string) *exoscalesdk.CreateIAMRoleRequest { diff --git a/operator/kafkacontroller/connector.go b/operator/kafkacontroller/connector.go new file mode 100644 index 00000000..9dcd4253 --- /dev/null +++ b/operator/kafkacontroller/connector.go @@ -0,0 +1,39 @@ +package kafkacontroller + +import ( + "context" + "fmt" + + "github.com/crossplane/crossplane-runtime/pkg/event" + "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" + "github.com/crossplane/crossplane-runtime/pkg/resource" + exoscalesdk "github.com/exoscale/egoscale/v3" + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + "github.com/vshn/provider-exoscale/operator/common" + "github.com/vshn/provider-exoscale/operator/pipelineutil" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type connector struct { + Kube client.Client + Recorder event.Recorder +} + +// Connect to the exoscale kafka provider. +func (c *connector) Connect(ctx context.Context, mg resource.Managed) (managed.ExternalClient, error) { + log := ctrl.LoggerFrom(ctx) + log.V(1).Info("connecting resource") + + kafkaInstance, ok := mg.(*exoscalev1.Kafka) + if !ok { + return nil, fmt.Errorf("invalid managed resource type %T for kafka connector", mg) + } + + exo, err := pipelineutil.OpenExoscaleClient(ctx, c.Kube, kafkaInstance.GetProviderConfigName(), exoscalesdk.ClientOptWithEndpoint(common.ZoneTranslation[kafkaInstance.Spec.ForProvider.Zone])) + if err != nil { + return nil, err + } + return newPipeline(c.Kube, c.Recorder, exo.Exoscale), nil + +} diff --git a/operator/kafkacontroller/create.go b/operator/kafkacontroller/create.go index 80ef3069..6934c16a 100644 --- a/operator/kafkacontroller/create.go +++ b/operator/kafkacontroller/create.go @@ -2,23 +2,23 @@ package kafkacontroller import ( "context" + "encoding/json" "errors" "fmt" "strings" + exoscalesdk "github.com/exoscale/egoscale/v3" exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" - "github.com/vshn/provider-exoscale/operator/mapper" "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" "github.com/crossplane/crossplane-runtime/pkg/resource" exoscaleapi "github.com/exoscale/egoscale/v2/api" - "github.com/exoscale/egoscale/v2/oapi" controllerruntime "sigs.k8s.io/controller-runtime" ) // Create idempotently creates a Kafka instance. // It will not return an "already exits" error. -func (c connection) Create(ctx context.Context, mg resource.Managed) (managed.ExternalCreation, error) { +func (p *pipeline) Create(ctx context.Context, mg resource.Managed) (managed.ExternalCreation, error) { log := controllerruntime.LoggerFrom(ctx) log.V(1).Info("creating resource") @@ -28,39 +28,33 @@ func (c connection) Create(ctx context.Context, mg resource.Managed) (managed.Ex } spec := instance.Spec.ForProvider - ipFilter := []string(spec.IPFilter) - settings, err := mapper.ToMap(spec.KafkaSettings) + ipFilter := spec.IPFilter + settings := exoscalesdk.JSONSchemaKafka{} + err := json.Unmarshal(spec.KafkaSettings.Raw, &settings) if err != nil { - return managed.ExternalCreation{}, fmt.Errorf("invalid kafka settings: %w", err) + return managed.ExternalCreation{}, fmt.Errorf("cannot map kafkaInstance settings: %w", err) } - - restSettings, err := mapper.ToMap(spec.KafkaRestSettings) + restSettings := exoscalesdk.JSONSchemaKafkaRest{} + err = json.Unmarshal(spec.KafkaRestSettings.Raw, &restSettings) if err != nil { return managed.ExternalCreation{}, fmt.Errorf("invalid kafka rest settings: %w", err) } - var version *string - if spec.Version != "" { - version = &spec.Version - } - body := oapi.CreateDbaasServiceKafkaJSONRequestBody{ - IpFilter: &ipFilter, - KafkaSettings: &settings, - Maintenance: &struct { - Dow oapi.CreateDbaasServiceKafkaJSONBodyMaintenanceDow "json:\"dow\"" - Time string "json:\"time\"" - }{ - Dow: oapi.CreateDbaasServiceKafkaJSONBodyMaintenanceDow(spec.Maintenance.DayOfWeek), + body := exoscalesdk.CreateDBAASServiceKafkaRequest{ + IPFilter: ipFilter, + KafkaSettings: settings, + Maintenance: &exoscalesdk.CreateDBAASServiceKafkaRequestMaintenance{ + Dow: exoscalesdk.CreateDBAASServiceKafkaRequestMaintenanceDow(spec.Maintenance.DayOfWeek), Time: spec.Maintenance.TimeOfDay.String(), }, Plan: spec.Size.Plan, - Version: version, + Version: spec.Version, TerminationProtection: &spec.TerminationProtection, KafkaRestEnabled: &spec.KafkaRestEnabled, - KafkaRestSettings: &restSettings, + KafkaRestSettings: restSettings, } - resp, err := c.exo.CreateDbaasServiceKafkaWithResponse(ctx, oapi.DbaasServiceName(instance.GetInstanceName()), body) + resp, err := p.exo.CreateDBAASServiceKafka(ctx, instance.GetInstanceName(), body) if err != nil { if errors.Is(err, exoscaleapi.ErrInvalidRequest) && strings.Contains(err.Error(), "Service name is already taken") { // According to the ExternalClient Interface, create needs to be idempotent. @@ -69,6 +63,6 @@ func (c connection) Create(ctx context.Context, mg resource.Managed) (managed.Ex } return managed.ExternalCreation{}, fmt.Errorf("unable to create instance: %w", err) } - log.V(2).Info("response", "body", string(resp.Body)) + log.V(2).Info("response", "message", string(resp.Message)) return managed.ExternalCreation{}, nil } diff --git a/operator/kafkacontroller/delete.go b/operator/kafkacontroller/delete.go index 0b0349cc..56102430 100644 --- a/operator/kafkacontroller/delete.go +++ b/operator/kafkacontroller/delete.go @@ -14,7 +14,7 @@ import ( // Delete idempotently deletes a kafka instance. // It will not return a "not found" error. -func (c connection) Delete(ctx context.Context, mg resource.Managed) error { +func (p *pipeline) Delete(ctx context.Context, mg resource.Managed) error { log := controllerruntime.LoggerFrom(ctx) log.V(1).Info("deleting resource") @@ -22,13 +22,13 @@ func (c connection) Delete(ctx context.Context, mg resource.Managed) error { if !ok { return fmt.Errorf("invalid managed resource type %T for kafka connection", mg) } - resp, err := c.exo.DeleteDbaasServiceWithResponse(ctx, instance.GetInstanceName()) + resp, err := p.exo.DeleteDBAASServiceKafka(ctx, instance.GetInstanceName()) if err != nil { if errors.Is(err, exoscaleapi.ErrNotFound) { return nil } return fmt.Errorf("cannot delete kafka instance: %w", err) } - log.V(2).Info("response", "body", string(resp.Body)) + log.V(2).Info("response", "message", string(resp.Message)) return nil } diff --git a/operator/kafkacontroller/observe.go b/operator/kafkacontroller/observe.go index 90f37072..16c30387 100644 --- a/operator/kafkacontroller/observe.go +++ b/operator/kafkacontroller/observe.go @@ -2,16 +2,20 @@ package kafkacontroller import ( "context" + "encoding/json" "errors" "fmt" - "k8s.io/utils/ptr" "strings" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/utils/ptr" + xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" "github.com/crossplane/crossplane-runtime/pkg/resource" exoscaleapi "github.com/exoscale/egoscale/v2/api" - "github.com/exoscale/egoscale/v2/oapi" + exoscalesdk "github.com/exoscale/egoscale/v3" + "github.com/google/go-cmp/cmp" controllerruntime "sigs.k8s.io/controller-runtime" @@ -22,7 +26,7 @@ import ( // Observe the external kafka instance. // Will return wether the the instance exits and if it is up-to-date. // Observe will also update the status to the observed state and return connection details to connect to the instance. -func (c connection) Observe(ctx context.Context, mg resource.Managed) (managed.ExternalObservation, error) { +func (p *pipeline) Observe(ctx context.Context, mg resource.Managed) (managed.ExternalObservation, error) { log := controllerruntime.LoggerFrom(ctx) log.V(1).Info("observing resource") @@ -31,7 +35,7 @@ func (c connection) Observe(ctx context.Context, mg resource.Managed) (managed.E return managed.ExternalObservation{}, fmt.Errorf("invalid managed resource type %T for kafka connection", mg) } - res, err := c.exo.GetDbaasServiceKafkaWithResponse(ctx, oapi.DbaasServiceName(instance.GetInstanceName())) + res, err := p.exo.GetDBAASServiceKafka(ctx, instance.GetInstanceName()) if err != nil { if errors.Is(err, exoscaleapi.ErrNotFound) { return managed.ExternalObservation{ResourceExists: false}, nil @@ -39,40 +43,34 @@ func (c connection) Observe(ctx context.Context, mg resource.Managed) (managed.E return managed.ExternalObservation{}, err } - external := res.JSON200 - - instance.Status.AtProvider, err = getObservation(external) + instance.Status.AtProvider, err = getObservation(res) if err != nil { log.Error(err, "failed to observe kafka instance") } - condition, err := getCondition(external) + condition, err := getCondition(res) if err != nil { log.Error(err, "failed to update kafka condition") } instance.SetConditions(condition) - caRes, err := c.exo.GetDbaasCaCertificateWithResponse(ctx) + caCert, err := p.exo.GetDBAASCACertificate(ctx) if err != nil { return managed.ExternalObservation{}, fmt.Errorf("cannot retrieve CA certificate: %w", err) } - ca := "" - if caRes.JSON200 != nil && caRes.JSON200.Certificate != nil { - ca = *caRes.JSON200.Certificate - } - connDetails, err := getConnectionDetails(external, ca) + connDetails, err := getConnectionDetails(res, caCert.Certificate) if err != nil { return managed.ExternalObservation{}, fmt.Errorf("failed to get kafka connection details: %w", err) } - currentParams, err := setSettingsDefaults(ctx, c.exo, &instance.Spec.ForProvider) + currentParams, err := setSettingsDefaults(ctx, *p.exo, &instance.Spec.ForProvider) if err != nil { log.Error(err, "unable to set kafka settings schema") currentParams = &instance.Spec.ForProvider } - upToDate, diff := diffParameters(external, *currentParams) + upToDate, diff := diffParameters(res, *currentParams) return managed.ExternalObservation{ ResourceExists: true, @@ -82,28 +80,31 @@ func (c connection) Observe(ctx context.Context, mg resource.Managed) (managed.E }, nil } -func getObservation(external *oapi.DbaasServiceKafka) (exoscalev1.KafkaObservation, error) { +func getObservation(external *exoscalesdk.DBAASServiceKafka) (exoscalev1.KafkaObservation, error) { notifications, err := mapper.ToNotifications(external.Notifications) if err != nil { return exoscalev1.KafkaObservation{}, fmt.Errorf("error parsing notifications: %w", err) } - settings, err := mapper.ToRawExtension(external.KafkaSettings) + jsonSettings, err := json.Marshal(external.KafkaSettings) if err != nil { - return exoscalev1.KafkaObservation{}, fmt.Errorf("error parsing kafka settings: %w", err) + return exoscalev1.KafkaObservation{}, fmt.Errorf("error parsing KafkaSettings") } + settings := runtime.RawExtension{Raw: jsonSettings} + nodeStates := []exoscalev1.NodeState{} if external.NodeStates != nil { nodeStates = mapper.ToNodeStates(external.NodeStates) } - restSettings, err := mapper.ToRawExtension(external.KafkaRestSettings) + jsonRestSettings, err := json.Marshal(external.KafkaRestSettings) if err != nil { return exoscalev1.KafkaObservation{}, fmt.Errorf("error parsing kafka REST settings: %w", err) } + restSettings := runtime.RawExtension{Raw: jsonRestSettings} return exoscalev1.KafkaObservation{ - Version: ptr.Deref(external.Version, ""), + Version: external.Version, KafkaSettings: settings, KafkaRestEnabled: ptr.Deref(external.KafkaRestEnabled, false), KafkaRestSettings: restSettings, @@ -111,51 +112,51 @@ func getObservation(external *oapi.DbaasServiceKafka) (exoscalev1.KafkaObservati Notifications: notifications, }, nil } -func getCondition(external *oapi.DbaasServiceKafka) (xpv1.Condition, error) { - var state oapi.EnumServiceState - if external.State != nil { - state = *external.State +func getCondition(external *exoscalesdk.DBAASServiceKafka) (xpv1.Condition, error) { + var state exoscalesdk.EnumServiceState + if external.State != "" { + state = external.State } switch state { - case oapi.EnumServiceStateRunning: + case exoscalesdk.EnumServiceStateRunning: return exoscalev1.Running(), nil - case oapi.EnumServiceStateRebuilding: + case exoscalesdk.EnumServiceStateRebuilding: return exoscalev1.Rebuilding(), nil - case oapi.EnumServiceStatePoweroff: + case exoscalesdk.EnumServiceStatePoweroff: return exoscalev1.PoweredOff(), nil - case oapi.EnumServiceStateRebalancing: + case exoscalesdk.EnumServiceStateRebalancing: return exoscalev1.Rebalancing(), nil default: return xpv1.Condition{}, fmt.Errorf("unknown state %q", state) } } -func getConnectionDetails(external *oapi.DbaasServiceKafka, ca string) (map[string][]byte, error) { +func getConnectionDetails(external *exoscalesdk.DBAASServiceKafka, ca string) (map[string][]byte, error) { if external.ConnectionInfo == nil { return nil, errors.New("no connection details") } nodes := "" if external.ConnectionInfo.Nodes != nil { - nodes = strings.Join(*external.ConnectionInfo.Nodes, " ") + nodes = strings.Join(external.ConnectionInfo.Nodes, " ") } - if external.ConnectionInfo.AccessCert == nil { + if external.ConnectionInfo.AccessCert == "" { return nil, errors.New("no certificate returned") } - cert := *external.ConnectionInfo.AccessCert + cert := external.ConnectionInfo.AccessCert - if external.ConnectionInfo.AccessKey == nil { + if external.ConnectionInfo.AccessKey == "" { return nil, errors.New("no key returned") } - key := *external.ConnectionInfo.AccessKey + key := external.ConnectionInfo.AccessKey - if external.Uri == nil { + if external.URI == "" { return nil, errors.New("no URI returned") } - uri := *external.Uri + uri := external.URI host := "" port := "" - if external.UriParams != nil { - uriParams := *external.UriParams + if external.URIParams != nil { + uriParams := external.URIParams host, _ = uriParams["host"].(string) port, _ = uriParams["port"].(string) } @@ -170,28 +171,30 @@ func getConnectionDetails(external *oapi.DbaasServiceKafka, ca string) (map[stri "ca.crt": []byte(ca), } - if external.KafkaRestEnabled != nil && *external.KafkaRestEnabled && external.ConnectionInfo.RestUri != nil { - details["KAFKA_REST_URI"] = []byte(*external.ConnectionInfo.RestUri) + if external.KafkaRestEnabled != nil && *external.KafkaRestEnabled && external.ConnectionInfo.RestURI != "" { + details["KAFKA_REST_URI"] = []byte(external.ConnectionInfo.RestURI) } return details, nil } -func diffParameters(external *oapi.DbaasServiceKafka, expected exoscalev1.KafkaParameters) (bool, string) { +func diffParameters(external *exoscalesdk.DBAASServiceKafka, expected exoscalev1.KafkaParameters) (bool, string) { actualIPFilter := []string{} - if external.IpFilter != nil { - actualIPFilter = *external.IpFilter + if external.IPFilter != nil { + actualIPFilter = external.IPFilter } - actualKafkaSettings, err := mapper.ToRawExtension(external.KafkaSettings) + jsonKafkaSettings, err := json.Marshal(external.KafkaSettings) if err != nil { return false, err.Error() } + actualKafkaSettings := runtime.RawExtension{Raw: jsonKafkaSettings} - actualKafkaRestSettings, err := mapper.ToRawExtension(external.KafkaRestSettings) + jsonKafkaRestSettings, err := json.Marshal(external.KafkaRestSettings) if err != nil { return false, err.Error() } + actualKafkaRestSettings := runtime.RawExtension{Raw: jsonKafkaRestSettings} actual := exoscalev1.KafkaParameters{ Maintenance: exoscalev1.MaintenanceSpec{ diff --git a/operator/kafkacontroller/pipeline.go b/operator/kafkacontroller/pipeline.go new file mode 100644 index 00000000..118e1e02 --- /dev/null +++ b/operator/kafkacontroller/pipeline.go @@ -0,0 +1,23 @@ +package kafkacontroller + +import ( + "github.com/crossplane/crossplane-runtime/pkg/event" + exoscalesdk "github.com/exoscale/egoscale/v3" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// pipeline is a managed.ExternalClient and implements a crossplane reconciler for MySQL. +type pipeline struct { + kube client.Client + recorder event.Recorder + exo *exoscalesdk.Client +} + +// newPipeline returns a new instance of pipeline. +func newPipeline(client client.Client, recorder event.Recorder, exoscaleClient *exoscalesdk.Client) *pipeline { + return &pipeline{ + kube: client, + recorder: recorder, + exo: exoscaleClient, + } +} diff --git a/operator/kafkacontroller/settings.go b/operator/kafkacontroller/settings.go index d46223c1..0e0101a1 100644 --- a/operator/kafkacontroller/settings.go +++ b/operator/kafkacontroller/settings.go @@ -2,19 +2,20 @@ package kafkacontroller import ( "context" + "encoding/json" + exoscalesdk "github.com/exoscale/egoscale/v3" exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" - "github.com/exoscale/egoscale/v2/oapi" "github.com/vshn/provider-exoscale/internal/settings" ) type settingsFetcher interface { - GetDbaasSettingsKafkaWithResponse(ctx context.Context, reqEditors ...oapi.RequestEditorFn) (*oapi.GetDbaasSettingsKafkaResponse, error) + GetDBAASSettingsKafka(ctx context.Context) (*exoscalesdk.GetDBAASSettingsKafkaResponse, error) } -func setSettingsDefaults(ctx context.Context, f settingsFetcher, in *exoscalev1.KafkaParameters) (*exoscalev1.KafkaParameters, error) { - s, err := fetchSettingSchema(ctx, f) +func setSettingsDefaults(ctx context.Context, c exoscalesdk.Client, in *exoscalev1.KafkaParameters) (*exoscalev1.KafkaParameters, error) { + s, err := fetchSettingSchema(ctx, c) if err != nil { return nil, err } @@ -33,11 +34,15 @@ func setSettingsDefaults(ctx context.Context, f settingsFetcher, in *exoscalev1. } func fetchSettingSchema(ctx context.Context, f settingsFetcher) (settings.Schemas, error) { - resp, err := f.GetDbaasSettingsKafkaWithResponse(ctx) + resp, err := f.GetDBAASSettingsKafka(ctx) if err != nil { return nil, err } - schemas, err := settings.ParseSchemas(resp.Body) + settingsJson, err := json.Marshal(resp.Settings) + if err != nil { + return nil, err + } + schemas, err := settings.ParseSchemas(settingsJson) if err != nil { return nil, err } diff --git a/operator/kafkacontroller/controller.go b/operator/kafkacontroller/setup.go similarity index 53% rename from operator/kafkacontroller/controller.go rename to operator/kafkacontroller/setup.go index 917cef51..839915a1 100644 --- a/operator/kafkacontroller/controller.go +++ b/operator/kafkacontroller/setup.go @@ -1,8 +1,6 @@ package kafkacontroller import ( - "context" - "fmt" "strings" "time" @@ -10,42 +8,10 @@ import ( "github.com/crossplane/crossplane-runtime/pkg/logging" "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" "github.com/crossplane/crossplane-runtime/pkg/resource" - exoscalesdk "github.com/exoscale/egoscale/v2" - "github.com/exoscale/egoscale/v2/oapi" exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" - "github.com/vshn/provider-exoscale/operator/pipelineutil" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" ) -type connector struct { - kube client.Client - recorder event.Recorder -} - -type connection struct { - exo oapi.ClientWithResponsesInterface -} - -// Connect to the exoscale kafka provider. -func (c *connector) Connect(ctx context.Context, mg resource.Managed) (managed.ExternalClient, error) { - log := ctrl.LoggerFrom(ctx) - log.V(1).Info("connecting resource") - - kafkaInstance, ok := mg.(*exoscalev1.Kafka) - if !ok { - return nil, fmt.Errorf("invalid managed resource type %T for kafka connector", mg) - } - - exo, err := pipelineutil.OpenExoscaleClient(ctx, c.kube, kafkaInstance.GetProviderConfigName(), exoscalesdk.ClientOptWithAPIEndpoint(fmt.Sprintf("https://api-%s.exoscale.com", kafkaInstance.Spec.ForProvider.Zone))) - if err != nil { - return nil, err - } - return connection{ - exo: exo.Exoscale, - }, nil -} - // SetupController adds a controller that reconciles kafka resources. func SetupController(mgr ctrl.Manager) error { name := strings.ToLower(exoscalev1.KafkaGroupKind) @@ -56,8 +22,8 @@ func SetupController(mgr ctrl.Manager) error { r := managed.NewReconciler(mgr, resource.ManagedKind(exoscalev1.KafkaGroupVersionKind), managed.WithExternalConnecter(&connector{ - kube: mgr.GetClient(), - recorder: recorder, + Kube: mgr.GetClient(), + Recorder: recorder, }), managed.WithLogger(logging.NewLogrLogger(mgr.GetLogger().WithValues("controller", name))), managed.WithRecorder(recorder), @@ -70,3 +36,14 @@ func SetupController(mgr ctrl.Manager) error { For(&exoscalev1.Kafka{}). Complete(r) } + +// SetupWebhook adds a webhook for kafka resources. +func SetupWebhook(mgr ctrl.Manager) error { + return ctrl.NewWebhookManagedBy(mgr). + For(&exoscalev1.Kafka{}). + WithValidator(&Validator{ + log: mgr.GetLogger().WithName("webhook").WithName(strings.ToLower(exoscalev1.KafkaKind)), + kube: mgr.GetClient(), + }). + Complete() +} diff --git a/operator/kafkacontroller/update.go b/operator/kafkacontroller/update.go index 724a370c..5ffa5c15 100644 --- a/operator/kafkacontroller/update.go +++ b/operator/kafkacontroller/update.go @@ -2,19 +2,19 @@ package kafkacontroller import ( "context" + "encoding/json" "fmt" + exoscalesdk "github.com/exoscale/egoscale/v3" exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" - "github.com/vshn/provider-exoscale/operator/mapper" "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" "github.com/crossplane/crossplane-runtime/pkg/resource" - "github.com/exoscale/egoscale/v2/oapi" controllerruntime "sigs.k8s.io/controller-runtime" ) // Update the provided kafka instance. -func (c connection) Update(ctx context.Context, mg resource.Managed) (managed.ExternalUpdate, error) { +func (p *pipeline) Update(ctx context.Context, mg resource.Managed) (managed.ExternalUpdate, error) { log := controllerruntime.LoggerFrom(ctx) log.V(1).Info("updating resource") @@ -25,35 +25,35 @@ func (c connection) Update(ctx context.Context, mg resource.Managed) (managed.Ex spec := instance.Spec.ForProvider ipFilter := []string(spec.IPFilter) - settings, err := mapper.ToMap(spec.KafkaSettings) + settings := exoscalesdk.JSONSchemaKafka{} + err := json.Unmarshal(spec.KafkaSettings.Raw, &settings) if err != nil { - return managed.ExternalUpdate{}, fmt.Errorf("invalid kafka settings: %w", err) + return managed.ExternalUpdate{}, fmt.Errorf("cannot map kafkaInstance settings: %w", err) } - restSettings, err := mapper.ToMap(spec.KafkaRestSettings) + + restSettings := exoscalesdk.JSONSchemaKafkaRest{} + err = json.Unmarshal(spec.KafkaRestSettings.Raw, &restSettings) if err != nil { return managed.ExternalUpdate{}, fmt.Errorf("invalid kafka rest settings: %w", err) } - body := oapi.UpdateDbaasServiceKafkaJSONRequestBody{ - IpFilter: &ipFilter, - KafkaSettings: &settings, - Maintenance: &struct { - Dow oapi.UpdateDbaasServiceKafkaJSONBodyMaintenanceDow "json:\"dow\"" - Time string "json:\"time\"" - }{ - Dow: oapi.UpdateDbaasServiceKafkaJSONBodyMaintenanceDow(spec.Maintenance.DayOfWeek), + body := exoscalesdk.UpdateDBAASServiceKafkaRequest{ + IPFilter: ipFilter, + KafkaSettings: settings, + Maintenance: &exoscalesdk.UpdateDBAASServiceKafkaRequestMaintenance{ + Dow: exoscalesdk.UpdateDBAASServiceKafkaRequestMaintenanceDow(spec.Maintenance.DayOfWeek), Time: spec.Maintenance.TimeOfDay.String(), }, - Plan: &spec.Size.Plan, + Plan: spec.Size.Plan, TerminationProtection: &spec.TerminationProtection, KafkaRestEnabled: &spec.KafkaRestEnabled, - KafkaRestSettings: &restSettings, + KafkaRestSettings: restSettings, } - resp, err := c.exo.UpdateDbaasServiceKafkaWithResponse(ctx, oapi.DbaasServiceName(instance.GetInstanceName()), body) + resp, err := p.exo.UpdateDBAASServiceKafka(ctx, instance.GetInstanceName(), body) if err != nil { return managed.ExternalUpdate{}, fmt.Errorf("unable to update instance: %w", err) } - log.V(2).Info("response", "body", string(resp.Body)) + log.V(2).Info("response", "message", string(resp.Message)) return managed.ExternalUpdate{}, nil } diff --git a/operator/kafkacontroller/webhook.go b/operator/kafkacontroller/webhook.go index 4763af14..9b520835 100644 --- a/operator/kafkacontroller/webhook.go +++ b/operator/kafkacontroller/webhook.go @@ -3,15 +3,14 @@ package kafkacontroller import ( "context" "fmt" + "github.com/exoscale/egoscale/v2/oapi" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" - "strings" exoscalesdk "github.com/exoscale/egoscale/v2" exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" "github.com/vshn/provider-exoscale/operator/pipelineutil" "github.com/vshn/provider-exoscale/operator/webhook" - ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/go-logr/logr" @@ -20,17 +19,6 @@ import ( const serviceType = "kafka" -// SetupWebhook adds a webhook for kafka resources. -func SetupWebhook(mgr ctrl.Manager) error { - return ctrl.NewWebhookManagedBy(mgr). - For(&exoscalev1.Kafka{}). - WithValidator(&Validator{ - log: mgr.GetLogger().WithName("webhook").WithName(strings.ToLower(exoscalev1.KafkaKind)), - kube: mgr.GetClient(), - }). - Complete() -} - // Validator validates kafka admission requests. type Validator struct { log logr.Logger diff --git a/operator/mapper/alias.go b/operator/mapper/alias.go index dc0c8b6a..1185d2e5 100644 --- a/operator/mapper/alias.go +++ b/operator/mapper/alias.go @@ -2,8 +2,11 @@ package mapper import ( "fmt" + "github.com/exoscale/egoscale/v2/oapi" + exoscalesdk "github.com/exoscale/egoscale/v3" exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/json" "k8s.io/utils/ptr" @@ -11,15 +14,15 @@ import ( // BackupSchedule is a type alias for the embedded struct in opai.CreateDbaasServicePgJSONRequestBody. type BackupSchedule = struct { - BackupHour *int64 `json:"backup-hour,omitempty"` - BackupMinute *int64 `json:"backup-minute,omitempty"` + BackupHour int64 `json:"backup-hour,omitempty"` + BackupMinute int64 `json:"backup-minute,omitempty"` } func ToBackupSchedule(day exoscalev1.TimeOfDay) (BackupSchedule, error) { backupHour, backupMin, _, err := day.Parse() return BackupSchedule{ - BackupHour: ptr.To(backupHour), - BackupMinute: ptr.To(backupMin), + BackupHour: backupHour, + BackupMinute: backupMin, }, err } @@ -34,33 +37,28 @@ func ToSlice(arr *[]string) []string { return []string{} } -func ToNodeStates(states *[]oapi.DbaasNodeState) []exoscalev1.NodeState { +func ToNodeStates(states []exoscalesdk.DBAASNodeState) []exoscalev1.NodeState { if states == nil { return nil } - s := make([]exoscalev1.NodeState, len(*states)) - for i, state := range *states { - var role oapi.DbaasNodeStateRole - if state.Role != nil { - role = *state.Role - } - + s := make([]exoscalev1.NodeState, len(states)) + for i, state := range states { s[i] = exoscalev1.NodeState{ Name: state.Name, - Role: role, + Role: state.Role, State: state.State, } } return s } -func ToNotifications(notifications *[]oapi.DbaasServiceNotification) ([]exoscalev1.Notification, error) { +func ToNotifications(notifications []exoscalesdk.DBAASServiceNotification) ([]exoscalev1.Notification, error) { if notifications == nil { return nil, nil } - s := make([]exoscalev1.Notification, len(*notifications)) - for i, notification := range *notifications { + s := make([]exoscalev1.Notification, len(notifications)) + for i, notification := range notifications { metadata, err := ToRawExtension(¬ification.Metadata) if err != nil { return nil, fmt.Errorf("unable to convert metadata: %w", err) @@ -79,7 +77,7 @@ func ToBackupSpec(schedule *BackupSchedule) exoscalev1.BackupSpec { if schedule == nil { return exoscalev1.BackupSpec{} } - hour, min := ptr.Deref(schedule.BackupHour, 0), ptr.Deref(schedule.BackupMinute, 0) + hour, min := schedule.BackupHour, schedule.BackupMinute return exoscalev1.BackupSpec{TimeOfDay: exoscalev1.TimeOfDay(fmt.Sprintf("%02d:%02d:00", hour, min))} } diff --git a/operator/mysqlcontroller/connector.go b/operator/mysqlcontroller/connector.go index 085df887..9499de34 100644 --- a/operator/mysqlcontroller/connector.go +++ b/operator/mysqlcontroller/connector.go @@ -7,8 +7,9 @@ import ( "github.com/crossplane/crossplane-runtime/pkg/event" "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" "github.com/crossplane/crossplane-runtime/pkg/resource" - exoscalesdk "github.com/exoscale/egoscale/v2" + exoscalesdk "github.com/exoscale/egoscale/v3" exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + "github.com/vshn/provider-exoscale/operator/common" "github.com/vshn/provider-exoscale/operator/pipelineutil" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -24,9 +25,12 @@ func (c *connector) Connect(ctx context.Context, mg resource.Managed) (managed.E log := ctrl.LoggerFrom(ctx) log.V(1).Info("connecting resource") - mySQLInstance := mg.(*exoscalev1.MySQL) + mySQLInstance, ok := mg.(*exoscalev1.MySQL) + if !ok { + return nil, fmt.Errorf("invalid managed resource type %T for mysql connector", mg) + } - exo, err := pipelineutil.OpenExoscaleClient(ctx, c.Kube, mySQLInstance.GetProviderConfigName(), exoscalesdk.ClientOptWithAPIEndpoint(fmt.Sprintf("https://api-%s.exoscale.com", mySQLInstance.Spec.ForProvider.Zone))) + exo, err := pipelineutil.OpenExoscaleClient(ctx, c.Kube, mySQLInstance.GetProviderConfigName(), exoscalesdk.ClientOptWithEndpoint(common.ZoneTranslation[mySQLInstance.Spec.ForProvider.Zone])) if err != nil { return nil, err } diff --git a/operator/mysqlcontroller/create.go b/operator/mysqlcontroller/create.go index e515df9e..3f6d723c 100644 --- a/operator/mysqlcontroller/create.go +++ b/operator/mysqlcontroller/create.go @@ -2,12 +2,14 @@ package mysqlcontroller import ( "context" + "encoding/json" "fmt" "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" "github.com/crossplane/crossplane-runtime/pkg/resource" - "github.com/exoscale/egoscale/v2/oapi" + exoscalesdk "github.com/exoscale/egoscale/v3" exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + "github.com/vshn/provider-exoscale/operator/mapper" controllerruntime "sigs.k8s.io/controller-runtime" ) @@ -21,7 +23,8 @@ func (p *pipeline) Create(ctx context.Context, mg resource.Managed) (managed.Ext spec := mySQLInstance.Spec.ForProvider ipFilter := []string(spec.IPFilter) - settings, err := mapper.ToMap(spec.MySQLSettings) + settings := exoscalesdk.JSONSchemaMysql{} + err := json.Unmarshal(spec.MySQLSettings.Raw, &settings) if err != nil { return managed.ExternalCreation{}, fmt.Errorf("cannot map mySQLInstance settings: %w", err) } @@ -30,32 +33,26 @@ func (p *pipeline) Create(ctx context.Context, mg resource.Managed) (managed.Ext return managed.ExternalCreation{}, fmt.Errorf("cannot parse mySQLInstance backup schedule: %w", err) } - body := oapi.CreateDbaasServiceMysqlJSONRequestBody{ - Maintenance: &struct { - Dow oapi.CreateDbaasServiceMysqlJSONBodyMaintenanceDow `json:"dow"` - Time string `json:"time"` - }{ - Dow: oapi.CreateDbaasServiceMysqlJSONBodyMaintenanceDow(spec.Maintenance.DayOfWeek), + body := exoscalesdk.CreateDBAASServiceMysqlRequest{ + Maintenance: &exoscalesdk.CreateDBAASServiceMysqlRequestMaintenance{ + Dow: exoscalesdk.CreateDBAASServiceMysqlRequestMaintenanceDow(spec.Maintenance.DayOfWeek), Time: spec.Maintenance.TimeOfDay.String(), }, - BackupSchedule: &struct { - BackupHour *int64 `json:"backup-hour,omitempty"` - BackupMinute *int64 `json:"backup-minute,omitempty"` - }{ + BackupSchedule: &exoscalesdk.CreateDBAASServiceMysqlRequestBackupSchedule{ BackupHour: backupSchedule.BackupHour, BackupMinute: backupSchedule.BackupMinute, }, - Version: &spec.Version, + Version: spec.Version, TerminationProtection: &spec.TerminationProtection, Plan: spec.Size.Plan, - IpFilter: &ipFilter, - MysqlSettings: &settings, + IPFilter: ipFilter, + MysqlSettings: settings, } - resp, err := p.exo.CreateDbaasServiceMysqlWithResponse(ctx, oapi.DbaasServiceName(mySQLInstance.GetInstanceName()), body) + resp, err := p.exo.CreateDBAASServiceMysql(ctx, mySQLInstance.GetInstanceName(), body) if err != nil { return managed.ExternalCreation{}, fmt.Errorf("cannot create mySQLInstance: %w", err) } - log.V(1).Info("response", "body", string(resp.Body)) + log.V(1).Info("response", "message", string(resp.Message)) return managed.ExternalCreation{}, nil } diff --git a/operator/mysqlcontroller/delete.go b/operator/mysqlcontroller/delete.go index 2dadd591..c70e4817 100644 --- a/operator/mysqlcontroller/delete.go +++ b/operator/mysqlcontroller/delete.go @@ -15,10 +15,10 @@ func (p *pipeline) Delete(ctx context.Context, mg resource.Managed) error { log.Info("deleting resource") mySQLInstance := mg.(*exoscalev1.MySQL) - resp, err := p.exo.DeleteDbaasServiceWithResponse(ctx, mySQLInstance.GetInstanceName()) + resp, err := p.exo.DeleteDBAASServiceMysql(ctx, mySQLInstance.GetInstanceName()) if err != nil { return fmt.Errorf("cannot delete mySQLInstance: %w", err) } - log.V(1).Info("response", "json", string(resp.Body)) + log.V(1).Info("response", "json", string(resp.Message)) return nil } diff --git a/operator/mysqlcontroller/observe.go b/operator/mysqlcontroller/observe.go index 06fef92a..1db9e0d2 100644 --- a/operator/mysqlcontroller/observe.go +++ b/operator/mysqlcontroller/observe.go @@ -4,14 +4,14 @@ import ( "context" "errors" "fmt" - "k8s.io/utils/ptr" "net/url" "strings" "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" "github.com/crossplane/crossplane-runtime/pkg/resource" exoscaleapi "github.com/exoscale/egoscale/v2/api" - "github.com/exoscale/egoscale/v2/oapi" + exoscalesdk "github.com/exoscale/egoscale/v3" + "github.com/go-logr/logr" exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" "github.com/vshn/provider-exoscale/operator/mapper" @@ -25,7 +25,7 @@ func (p *pipeline) Observe(ctx context.Context, mg resource.Managed) (managed.Ex mySQLInstance := mg.(*exoscalev1.MySQL) - resp, err := p.exo.GetDbaasServiceMysqlWithResponse(ctx, oapi.DbaasServiceName(mySQLInstance.GetInstanceName())) + mysql, err := p.exo.GetDBAASServiceMysql(ctx, mySQLInstance.GetInstanceName()) if err != nil { if errors.Is(err, exoscaleapi.ErrNotFound) { return managed.ExternalObservation{ResourceExists: false}, nil @@ -33,37 +33,35 @@ func (p *pipeline) Observe(ctx context.Context, mg resource.Managed) (managed.Ex return managed.ExternalObservation{}, fmt.Errorf("cannot observe mySQLInstance: %w", err) } - mysql := *resp.JSON200 - log.V(2).Info("response", "raw", string(resp.Body)) log.V(1).Info("retrieved mySQLInstance", "state", mysql.State) mySQLInstance.Status.AtProvider, err = mapObservation(mysql) if err != nil { log.Error(err, "cannot map mySQLInstance observation, ignoring") } - var state oapi.EnumServiceState - if mysql.State != nil { - state = *mysql.State + var state exoscalesdk.EnumServiceState + if mysql.State != "" { + state = mysql.State } switch state { - case oapi.EnumServiceStateRunning: + case exoscalesdk.EnumServiceStateRunning: mySQLInstance.SetConditions(exoscalev1.Running()) - case oapi.EnumServiceStateRebuilding: + case exoscalesdk.EnumServiceStateRebuilding: mySQLInstance.SetConditions(exoscalev1.Rebuilding()) - case oapi.EnumServiceStatePoweroff: + case exoscalesdk.EnumServiceStatePoweroff: mySQLInstance.SetConditions(exoscalev1.PoweredOff()) - case oapi.EnumServiceStateRebalancing: + case exoscalesdk.EnumServiceStateRebalancing: mySQLInstance.SetConditions(exoscalev1.Rebalancing()) default: log.V(2).Info("ignoring unknown mySQLInstance state", "state", state) } - caCert, err := p.exo.GetDatabaseCACertificate(ctx, mySQLInstance.Spec.ForProvider.Zone.String()) + caCert, err := p.exo.GetDBAASCACertificate(ctx) if err != nil { return managed.ExternalObservation{}, fmt.Errorf("cannot retrieve CA certificate: %w", err) } - connDetails, err := connectionDetails(mysql, caCert) + connDetails, err := connectionDetails(mysql, caCert.Certificate) if err != nil { return managed.ExternalObservation{}, fmt.Errorf("cannot parse connection details: %w", err) } @@ -85,14 +83,14 @@ func (p *pipeline) Observe(ctx context.Context, mg resource.Managed) (managed.Ex }, nil } -func connectionDetails(in oapi.DbaasServiceMysql, ca string) (managed.ConnectionDetails, error) { - uri := ptr.Deref(in.Uri, "") +func connectionDetails(in *exoscalesdk.DBAASServiceMysql, ca string) (managed.ConnectionDetails, error) { + uri := in.URI // uri may be absent if uri == "" { - if in.ConnectionInfo == nil || in.ConnectionInfo.Uri == nil || len(*in.ConnectionInfo.Uri) == 0 { + if in.ConnectionInfo == nil || in.ConnectionInfo.URI == nil || len(in.ConnectionInfo.URI) == 0 { return map[string][]byte{}, nil } - uri = (*in.ConnectionInfo.Uri)[0] + uri = in.ConnectionInfo.URI[0] } parsed, err := url.Parse(uri) if err != nil { @@ -139,16 +137,14 @@ func isUpToDate(current, external *exoscalev1.MySQLParameters, log logr.Logger) return ok } -func mapObservation(instance oapi.DbaasServiceMysql) (exoscalev1.MySQLObservation, error) { +func mapObservation(instance *exoscalesdk.DBAASServiceMysql) (exoscalev1.MySQLObservation, error) { observation := exoscalev1.MySQLObservation{ - Version: ptr.Deref(instance.Version, ""), - NodeStates: mapper.ToNodeStates(instance.NodeStates), + Version: instance.Version, + NodeStates: instance.NodeStates, } - settings, err := mapper.ToRawExtension(instance.MysqlSettings) - if err != nil { - return observation, fmt.Errorf("mySQLInstance settings: %w", err) - } + settings := instance.MysqlSettings + observation.MySQLSettings = settings observation.DBaaSParameters = mapper.ToDBaaSParameters(instance.TerminationProtection, instance.Plan, instance.IpFilter) @@ -164,25 +160,23 @@ func mapObservation(instance oapi.DbaasServiceMysql) (exoscalev1.MySQLObservatio return observation, nil } -func mapParameters(in oapi.DbaasServiceMysql, zone string) (*exoscalev1.MySQLParameters, error) { - settings, err := mapper.ToRawExtension(in.MysqlSettings) - if err != nil { - return nil, fmt.Errorf("cannot parse mySQLInstance settings: %w", err) - } +func mapParameters(in *exoscalesdk.DBAASServiceMysql, zone string) (*exoscalev1.MySQLParameters, error) { + settings := in.MysqlSettings + return &exoscalev1.MySQLParameters{ Maintenance: exoscalev1.MaintenanceSpec{ DayOfWeek: in.Maintenance.Dow, TimeOfDay: exoscalev1.TimeOfDay(in.Maintenance.Time), }, - Backup: mapper.ToBackupSpec(in.BackupSchedule), + Backup: *in.BackupSchedule, Zone: exoscalev1.Zone(zone), - Version: *in.Version, + Version: in.Version, DBaaSParameters: exoscalev1.DBaaSParameters{ - TerminationProtection: ptr.Deref(in.TerminationProtection, false), + TerminationProtection: *in.TerminationProtection, Size: exoscalev1.SizeSpec{ Plan: in.Plan, }, - IPFilter: mapper.ToSlice(in.IpFilter), + IPFilter: in.IPFilter, }, MySQLSettings: settings, }, nil diff --git a/operator/mysqlcontroller/pipeline.go b/operator/mysqlcontroller/pipeline.go index 0e21e5f8..aaf302ca 100644 --- a/operator/mysqlcontroller/pipeline.go +++ b/operator/mysqlcontroller/pipeline.go @@ -2,7 +2,7 @@ package mysqlcontroller import ( "github.com/crossplane/crossplane-runtime/pkg/event" - exoscalesdk "github.com/exoscale/egoscale/v2" + exoscalesdk "github.com/exoscale/egoscale/v3" "sigs.k8s.io/controller-runtime/pkg/client" ) diff --git a/operator/mysqlcontroller/settings.go b/operator/mysqlcontroller/settings.go index 7b76892f..dcc9c632 100644 --- a/operator/mysqlcontroller/settings.go +++ b/operator/mysqlcontroller/settings.go @@ -3,14 +3,14 @@ package mysqlcontroller import ( "context" + exoscalesdk "github.com/exoscale/egoscale/v3" exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" - "github.com/exoscale/egoscale/v2/oapi" "github.com/vshn/provider-exoscale/internal/settings" ) type settingsFetcher interface { - GetDbaasSettingsMysqlWithResponse(ctx context.Context, reqEditors ...oapi.RequestEditorFn) (*oapi.GetDbaasSettingsMysqlResponse, error) + GetDbaasSettingsMysqlWithResponse(ctx context.Context) (*exoscalesdk.GetDBAASSettingsMysqlResponse, error) } func setSettingsDefaults(ctx context.Context, f settingsFetcher, in *exoscalev1.MySQLParameters) (*exoscalev1.MySQLParameters, error) { diff --git a/operator/opensearchcontroller/settings.go b/operator/opensearchcontroller/settings.go index c638e907..47fe2720 100644 --- a/operator/opensearchcontroller/settings.go +++ b/operator/opensearchcontroller/settings.go @@ -10,7 +10,7 @@ import ( ) type settingsFetcher interface { - GetDbaasSettingsOpensearchWithResponse(ctx context.Context, reqEditors ...oapi.RequestEditorFn) (*oapi.GetDbaasSettingsOpensearchResponse, error) + GetDbaasSettingsOpensearchWithResponse(ctx context.Context) (*oapi.GetDbaasSettingsOpensearchResponse, error) } func setSettingsDefaults(ctx context.Context, f settingsFetcher, in *exoscalev1.OpenSearchParameters) (*exoscalev1.OpenSearchParameters, error) { diff --git a/operator/operator.go b/operator/operator.go index 6a09ab29..1fbdb7fc 100644 --- a/operator/operator.go +++ b/operator/operator.go @@ -4,11 +4,6 @@ import ( "github.com/vshn/provider-exoscale/operator/bucketcontroller" "github.com/vshn/provider-exoscale/operator/configcontroller" "github.com/vshn/provider-exoscale/operator/iamkeycontroller" - "github.com/vshn/provider-exoscale/operator/kafkacontroller" - "github.com/vshn/provider-exoscale/operator/mysqlcontroller" - "github.com/vshn/provider-exoscale/operator/opensearchcontroller" - "github.com/vshn/provider-exoscale/operator/postgresqlcontroller" - "github.com/vshn/provider-exoscale/operator/rediscontroller" ctrl "sigs.k8s.io/controller-runtime" ) @@ -19,11 +14,11 @@ func SetupControllers(mgr ctrl.Manager) error { bucketcontroller.SetupController, configcontroller.SetupController, iamkeycontroller.SetupController, - mysqlcontroller.SetupController, - postgresqlcontroller.SetupController, - rediscontroller.SetupController, - kafkacontroller.SetupController, - opensearchcontroller.SetupController, + // mysqlcontroller.SetupController, + // postgresqlcontroller.SetupController, + // rediscontroller.SetupController, + // kafkacontroller.SetupController, + // opensearchcontroller.SetupController, } { if err := setup(mgr); err != nil { return err @@ -37,11 +32,11 @@ func SetupWebhooks(mgr ctrl.Manager) error { for _, setup := range []func(ctrl.Manager) error{ bucketcontroller.SetupWebhook, iamkeycontroller.SetupWebhook, - mysqlcontroller.SetupWebhook, - postgresqlcontroller.SetupWebhook, - rediscontroller.SetupWebhook, - kafkacontroller.SetupWebhook, - opensearchcontroller.SetupWebhook, + // mysqlcontroller.SetupWebhook, + // postgresqlcontroller.SetupWebhook, + // rediscontroller.SetupWebhook, + // kafkacontroller.SetupWebhook, + // opensearchcontroller.SetupWebhook, } { if err := setup(mgr); err != nil { return err diff --git a/operator/pipelineutil/exoscale_connector.go b/operator/pipelineutil/exoscale_connector.go index ce95ec1c..a8ad5f10 100644 --- a/operator/pipelineutil/exoscale_connector.go +++ b/operator/pipelineutil/exoscale_connector.go @@ -6,7 +6,8 @@ import ( pipeline "github.com/ccremer/go-command-pipeline" "github.com/crossplane/crossplane-runtime/pkg/errors" - exoscalesdk "github.com/exoscale/egoscale/v2" + exoscalesdk "github.com/exoscale/egoscale/v3" + "github.com/exoscale/egoscale/v3/credentials" providerv1 "github.com/vshn/provider-exoscale/apis/provider/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" @@ -95,7 +96,8 @@ func validateSecret(ctx *connectContext) error { } func createExoscaleClient(ctx *connectContext) error { - ec, err := exoscalesdk.NewClient(ctx.apiKey, ctx.apiSecret, ctx.opts...) + creds := credentials.NewStaticCredentials(ctx.apiKey, ctx.apiSecret) + ec, err := exoscalesdk.NewClient(creds, ctx.opts...) ctx.exoscaleClient = ec return err }