From d5f9625cf94e3b032759d7ef35a5256287c183cd Mon Sep 17 00:00:00 2001 From: Josh van Leeuwen Date: Thu, 1 Jun 2023 15:16:01 +0100 Subject: [PATCH] Move `runtime/{init_error.go,retriable_error.go}` into separate shared package (#6323) * Move runtime/init_error.go into separate package Signed-off-by: joshvanl * Move retryable error to errors package Signed-off-by: joshvanl * Adds tests to ensure init and retriable implement error Signed-off-by: joshvanl * golanci-lint Signed-off-by: joshvanl --------- Signed-off-by: joshvanl Co-authored-by: Loong Dai Co-authored-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com> Co-authored-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com> --- pkg/runtime/cli.go | 3 +- pkg/runtime/{init_error.go => errors/init.go} | 6 +- pkg/runtime/errors/init_test.go | 20 +++++ .../retriable.go} | 6 +- pkg/runtime/errors/retriable_test.go | 20 +++++ pkg/runtime/runtime.go | 79 ++++++++++--------- pkg/runtime/runtime_test.go | 21 ++--- 7 files changed, 99 insertions(+), 56 deletions(-) rename pkg/runtime/{init_error.go => errors/init.go} (88%) create mode 100644 pkg/runtime/errors/init_test.go rename pkg/runtime/{retriable_error.go => errors/retriable.go} (85%) create mode 100644 pkg/runtime/errors/retriable_test.go diff --git a/pkg/runtime/cli.go b/pkg/runtime/cli.go index 6386942b269..13d335b8099 100644 --- a/pkg/runtime/cli.go +++ b/pkg/runtime/cli.go @@ -36,6 +36,7 @@ import ( "github.com/dapr/dapr/pkg/operator/client" operatorV1 "github.com/dapr/dapr/pkg/proto/operator/v1" resiliencyConfig "github.com/dapr/dapr/pkg/resiliency" + rterrors "github.com/dapr/dapr/pkg/runtime/errors" "github.com/dapr/dapr/pkg/runtime/security" "github.com/dapr/dapr/pkg/validation" "github.com/dapr/dapr/utils" @@ -404,7 +405,7 @@ func FromFlags(args []string) (*DaprRuntime, error) { // Initialize metrics only if MetricSpec is enabled. if globalConfig.Spec.MetricSpec.Enabled { if mErr := diag.InitMetrics(runtimeConfig.ID, namespace, globalConfig.Spec.MetricSpec.Rules); mErr != nil { - log.Errorf(NewInitError(InitFailure, "metrics", mErr).Error()) + log.Errorf(rterrors.NewInit(rterrors.InitFailure, "metrics", mErr).Error()) } } diff --git a/pkg/runtime/init_error.go b/pkg/runtime/errors/init.go similarity index 88% rename from pkg/runtime/init_error.go rename to pkg/runtime/errors/init.go index 96751528345..4ab8c666c8a 100644 --- a/pkg/runtime/init_error.go +++ b/pkg/runtime/errors/init.go @@ -11,7 +11,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package runtime +package errors import ( "fmt" @@ -43,8 +43,8 @@ func (e *InitError) Unwrap() error { return e.err } -// NewInitError returns an InitError wrapping an existing context error. -func NewInitError(kind InitErrorKind, entity string, err error) *InitError { +// NewInit returns an InitError wrapping an existing context error. +func NewInit(kind InitErrorKind, entity string, err error) *InitError { return &InitError{ err: err, kind: kind, diff --git a/pkg/runtime/errors/init_test.go b/pkg/runtime/errors/init_test.go new file mode 100644 index 00000000000..28d74e01a9b --- /dev/null +++ b/pkg/runtime/errors/init_test.go @@ -0,0 +1,20 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package errors + +import "testing" + +func TestInitError(t *testing.T) { + var _ error = new(InitError) +} diff --git a/pkg/runtime/retriable_error.go b/pkg/runtime/errors/retriable.go similarity index 85% rename from pkg/runtime/retriable_error.go rename to pkg/runtime/errors/retriable.go index 0a1ccf85b26..abc7c0caa51 100644 --- a/pkg/runtime/retriable_error.go +++ b/pkg/runtime/errors/retriable.go @@ -11,7 +11,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package runtime +package errors type RetriableError struct { err error @@ -32,8 +32,8 @@ func (e *RetriableError) Unwrap() error { return e.err } -// NewRetriableError returns a RetriableError wrapping an existing context error. -func NewRetriableError(err error) *RetriableError { +// NewRetriable returns a RetriableError wrapping an existing context error. +func NewRetriable(err error) *RetriableError { return &RetriableError{ err: err, } diff --git a/pkg/runtime/errors/retriable_test.go b/pkg/runtime/errors/retriable_test.go new file mode 100644 index 00000000000..c7ff3fd37f0 --- /dev/null +++ b/pkg/runtime/errors/retriable_test.go @@ -0,0 +1,20 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package errors + +import "testing" + +func TestRetriableError(t *testing.T) { + var _ error = new(RetriableError) +} diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index b0a4cd5fe2c..4e475336a85 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -100,6 +100,7 @@ import ( stateLoader "github.com/dapr/dapr/pkg/components/state" workflowsLoader "github.com/dapr/dapr/pkg/components/workflows" "github.com/dapr/dapr/pkg/runtime/compstore" + rterrors "github.com/dapr/dapr/pkg/runtime/errors" "github.com/dapr/components-contrib/bindings" "github.com/dapr/components-contrib/configuration" @@ -898,7 +899,7 @@ func (a *DaprRuntime) subscribeTopic(parentCtx context.Context, name string, top } else { pErr = a.publishMessageGRPC(ctx, psm) } - var rErr *RetriableError + var rErr *rterrors.RetriableError if errors.As(pErr, &rErr) { log.Warnf("encountered a retriable error while publishing a subscribed message to topic %s, err: %v", msgTopic, rErr.Unwrap()) } else if pErr != nil { @@ -1701,12 +1702,12 @@ func (a *DaprRuntime) initInputBinding(c componentsV1alpha1.Component) error { binding, err := a.bindingsRegistry.CreateInputBinding(c.Spec.Type, c.Spec.Version, fName) if err != nil { diag.DefaultMonitoring.ComponentInitFailed(c.Spec.Type, "creation", c.ObjectMeta.Name) - return NewInitError(CreateComponentFailure, fName, err) + return rterrors.NewInit(rterrors.CreateComponentFailure, fName, err) } err = binding.Init(context.TODO(), bindings.Metadata{Base: a.toBaseMetadata(c)}) if err != nil { diag.DefaultMonitoring.ComponentInitFailed(c.Spec.Type, "init", c.ObjectMeta.Name) - return NewInitError(InitComponentFailure, fName, err) + return rterrors.NewInit(rterrors.InitComponentFailure, fName, err) } log.Infof("successful init for input binding %s (%s/%s)", c.ObjectMeta.Name, c.Spec.Type, c.Spec.Version) @@ -1727,14 +1728,14 @@ func (a *DaprRuntime) initOutputBinding(c componentsV1alpha1.Component) error { binding, err := a.bindingsRegistry.CreateOutputBinding(c.Spec.Type, c.Spec.Version, fName) if err != nil { diag.DefaultMonitoring.ComponentInitFailed(c.Spec.Type, "creation", c.ObjectMeta.Name) - return NewInitError(CreateComponentFailure, fName, err) + return rterrors.NewInit(rterrors.CreateComponentFailure, fName, err) } if binding != nil { err := binding.Init(context.TODO(), bindings.Metadata{Base: a.toBaseMetadata(c)}) if err != nil { diag.DefaultMonitoring.ComponentInitFailed(c.Spec.Type, "init", c.ObjectMeta.Name) - return NewInitError(InitComponentFailure, fName, err) + return rterrors.NewInit(rterrors.InitComponentFailure, fName, err) } log.Infof("successful init for output binding %s (%s/%s)", c.ObjectMeta.Name, c.Spec.Type, c.Spec.Version) a.compStore.AddOutputBinding(c.ObjectMeta.Name, binding) @@ -1748,13 +1749,13 @@ func (a *DaprRuntime) initConfiguration(s componentsV1alpha1.Component) error { store, err := a.configurationStoreRegistry.Create(s.Spec.Type, s.Spec.Version, fName) if err != nil { diag.DefaultMonitoring.ComponentInitFailed(s.Spec.Type, "creation", s.ObjectMeta.Name) - return NewInitError(CreateComponentFailure, fName, err) + return rterrors.NewInit(rterrors.CreateComponentFailure, fName, err) } if store != nil { err := store.Init(context.TODO(), configuration.Metadata{Base: a.toBaseMetadata(s)}) if err != nil { diag.DefaultMonitoring.ComponentInitFailed(s.Spec.Type, "init", s.ObjectMeta.Name) - return NewInitError(InitComponentFailure, fName, err) + return rterrors.NewInit(rterrors.InitComponentFailure, fName, err) } a.compStore.AddConfiguration(s.ObjectMeta.Name, store) @@ -1770,7 +1771,7 @@ func (a *DaprRuntime) initLock(s componentsV1alpha1.Component) error { store, err := a.lockStoreRegistry.Create(s.Spec.Type, s.Spec.Version, fName) if err != nil { diag.DefaultMonitoring.ComponentInitFailed(s.Spec.Type, "creation", s.ObjectMeta.Name) - return NewInitError(CreateComponentFailure, fName, err) + return rterrors.NewInit(rterrors.CreateComponentFailure, fName, err) } if store == nil { return nil @@ -1781,7 +1782,7 @@ func (a *DaprRuntime) initLock(s componentsV1alpha1.Component) error { err = store.InitLockStore(context.TODO(), lock.Metadata{Base: baseMetadata}) if err != nil { diag.DefaultMonitoring.ComponentInitFailed(s.Spec.Type, "init", s.ObjectMeta.Name) - return NewInitError(InitComponentFailure, fName, err) + return rterrors.NewInit(rterrors.InitComponentFailure, fName, err) } // save lock related configuration a.compStore.AddLock(s.ObjectMeta.Name, store) @@ -1789,7 +1790,7 @@ func (a *DaprRuntime) initLock(s componentsV1alpha1.Component) error { if err != nil { diag.DefaultMonitoring.ComponentInitFailed(s.Spec.Type, "init", s.ObjectMeta.Name) wrapError := fmt.Errorf("failed to save lock keyprefix: %s", err.Error()) - return NewInitError(InitComponentFailure, fName, wrapError) + return rterrors.NewInit(rterrors.InitComponentFailure, fName, wrapError) } diag.DefaultMonitoring.ComponentInitialized(s.Spec.Type) @@ -1815,7 +1816,7 @@ func (a *DaprRuntime) initWorkflowComponent(s componentsV1alpha1.Component) erro err = workflowComp.Init(wfs.Metadata{Base: baseMetadata}) if err != nil { diag.DefaultMonitoring.ComponentInitFailed(s.Spec.Type, "init", s.ObjectMeta.Name) - return NewInitError(InitComponentFailure, fName, err) + return rterrors.NewInit(rterrors.InitComponentFailure, fName, err) } // save workflow related configuration a.compStore.AddWorkflow(s.ObjectMeta.Name, workflowComp) @@ -1830,7 +1831,7 @@ func (a *DaprRuntime) initState(s componentsV1alpha1.Component) error { store, err := a.stateStoreRegistry.Create(s.Spec.Type, s.Spec.Version, fName) if err != nil { diag.DefaultMonitoring.ComponentInitFailed(s.Spec.Type, "creation", s.ObjectMeta.Name) - return NewInitError(CreateComponentFailure, fName, err) + return rterrors.NewInit(rterrors.CreateComponentFailure, fName, err) } if store != nil { secretStoreName := a.authSecretStoreOrDefault(s) @@ -1839,7 +1840,7 @@ func (a *DaprRuntime) initState(s componentsV1alpha1.Component) error { encKeys, encErr := encryption.ComponentEncryptionKey(s, secretStore) if encErr != nil { diag.DefaultMonitoring.ComponentInitFailed(s.Spec.Type, "creation", s.ObjectMeta.Name) - return NewInitError(CreateComponentFailure, fName, err) + return rterrors.NewInit(rterrors.CreateComponentFailure, fName, err) } if encKeys.Primary.Key != "" { @@ -1854,7 +1855,7 @@ func (a *DaprRuntime) initState(s componentsV1alpha1.Component) error { err = store.Init(context.TODO(), state.Metadata{Base: baseMetadata}) if err != nil { diag.DefaultMonitoring.ComponentInitFailed(s.Spec.Type, "init", s.ObjectMeta.Name) - return NewInitError(InitComponentFailure, fName, err) + return rterrors.NewInit(rterrors.InitComponentFailure, fName, err) } a.compStore.AddStateStore(s.ObjectMeta.Name, store) @@ -1862,7 +1863,7 @@ func (a *DaprRuntime) initState(s componentsV1alpha1.Component) error { if err != nil { diag.DefaultMonitoring.ComponentInitFailed(s.Spec.Type, "init", s.ObjectMeta.Name) wrapError := fmt.Errorf("failed to save lock keyprefix: %s", err.Error()) - return NewInitError(InitComponentFailure, fName, wrapError) + return rterrors.NewInit(rterrors.InitComponentFailure, fName, wrapError) } // when placement address list is not empty, set specified actor store. @@ -2033,7 +2034,7 @@ func (a *DaprRuntime) initPubSub(c componentsV1alpha1.Component) error { pubSub, err := a.pubSubRegistry.Create(c.Spec.Type, c.Spec.Version, fName) if err != nil { diag.DefaultMonitoring.ComponentInitFailed(c.Spec.Type, "creation", c.ObjectMeta.Name) - return NewInitError(CreateComponentFailure, fName, err) + return rterrors.NewInit(rterrors.CreateComponentFailure, fName, err) } baseMetadata := a.toBaseMetadata(c) @@ -2047,7 +2048,7 @@ func (a *DaprRuntime) initPubSub(c componentsV1alpha1.Component) error { err = pubSub.Init(context.TODO(), pubsub.Metadata{Base: baseMetadata}) if err != nil { diag.DefaultMonitoring.ComponentInitFailed(c.Spec.Type, "init", c.ObjectMeta.Name) - return NewInitError(InitComponentFailure, fName, err) + return rterrors.NewInit(rterrors.InitComponentFailure, fName, err) } pubsubName := c.ObjectMeta.Name @@ -2200,7 +2201,7 @@ func (a *DaprRuntime) initNameResolution() error { resolverName = "mdns" default: fName := utils.ComponentLogName(resolverName, "nameResolution", resolverVersion) - return NewInitError(InitComponentFailure, fName, fmt.Errorf("unable to determine name resolver for %s mode", string(a.runtimeConfig.Mode))) + return rterrors.NewInit(rterrors.InitComponentFailure, fName, fmt.Errorf("unable to determine name resolver for %s mode", string(a.runtimeConfig.Mode))) } } @@ -2222,12 +2223,12 @@ func (a *DaprRuntime) initNameResolution() error { if err != nil { diag.DefaultMonitoring.ComponentInitFailed("nameResolution", "creation", resolverName) - return NewInitError(CreateComponentFailure, fName, err) + return rterrors.NewInit(rterrors.CreateComponentFailure, fName, err) } if err = resolver.Init(resolverMetadata); err != nil { diag.DefaultMonitoring.ComponentInitFailed("nameResolution", "init", resolverName) - return NewInitError(InitComponentFailure, fName, err) + return rterrors.NewInit(rterrors.InitComponentFailure, fName, err) } a.nameResolver = resolver @@ -2264,7 +2265,7 @@ func (a *DaprRuntime) publishMessageHTTP(ctx context.Context, msg *pubsubSubscri if err != nil { diag.DefaultComponentMonitoring.PubsubIngressEvent(ctx, msg.pubsub, strings.ToLower(string(pubsub.Retry)), msg.topic, elapsed) - return fmt.Errorf("error returned from app channel while sending pub/sub event to app: %w", NewRetriableError(err)) + return fmt.Errorf("error returned from app channel while sending pub/sub event to app: %w", rterrors.NewRetriable(err)) } defer resp.Close() @@ -2296,7 +2297,7 @@ func (a *DaprRuntime) publishMessageHTTP(ctx context.Context, msg *pubsubSubscri return nil case pubsub.Retry: diag.DefaultComponentMonitoring.PubsubIngressEvent(ctx, msg.pubsub, strings.ToLower(string(pubsub.Retry)), msg.topic, elapsed) - return fmt.Errorf("RETRY status returned from app while processing pub/sub event %v: %w", cloudEvent[pubsub.IDField], NewRetriableError(nil)) + return fmt.Errorf("RETRY status returned from app while processing pub/sub event %v: %w", cloudEvent[pubsub.IDField], rterrors.NewRetriable(nil)) case pubsub.Drop: diag.DefaultComponentMonitoring.PubsubIngressEvent(ctx, msg.pubsub, strings.ToLower(string(pubsub.Drop)), msg.topic, elapsed) log.Warnf("DROP status returned from app while processing pub/sub event %v", cloudEvent[pubsub.IDField]) @@ -2304,7 +2305,7 @@ func (a *DaprRuntime) publishMessageHTTP(ctx context.Context, msg *pubsubSubscri } // Consider unknown status field as error and retry diag.DefaultComponentMonitoring.PubsubIngressEvent(ctx, msg.pubsub, strings.ToLower(string(pubsub.Retry)), msg.topic, elapsed) - return fmt.Errorf("unknown status returned from app while processing pub/sub event %v, status: %v, err: %w", cloudEvent[pubsub.IDField], appResponse.Status, NewRetriableError(nil)) + return fmt.Errorf("unknown status returned from app while processing pub/sub event %v, status: %v, err: %w", cloudEvent[pubsub.IDField], appResponse.Status, rterrors.NewRetriable(nil)) } body, _ := resp.RawDataFull() @@ -2321,7 +2322,7 @@ func (a *DaprRuntime) publishMessageHTTP(ctx context.Context, msg *pubsubSubscri errMsg := fmt.Sprintf("retriable error returned from app while processing pub/sub event %v, topic: %v, body: %s. status code returned: %v", cloudEvent[pubsub.IDField], cloudEvent[pubsub.TopicField], body, statusCode) log.Warnf(errMsg) diag.DefaultComponentMonitoring.PubsubIngressEvent(ctx, msg.pubsub, strings.ToLower(string(pubsub.Retry)), msg.topic, elapsed) - return NewRetriableError(errors.New(errMsg)) + return rterrors.NewRetriable(errors.New(errMsg)) } func (a *DaprRuntime) publishMessageGRPC(ctx context.Context, msg *pubsubSubscribedMessage) error { @@ -2345,13 +2346,13 @@ func (a *DaprRuntime) publishMessageGRPC(ctx context.Context, msg *pubsubSubscri log.Debugf("unable to base64 decode cloudEvent field data_base64: %s", decodeErr) diag.DefaultComponentMonitoring.PubsubIngressEvent(ctx, msg.pubsub, strings.ToLower(string(pubsub.Retry)), msg.topic, 0) - return fmt.Errorf("error returned from app while processing pub/sub event: %w", NewRetriableError(decodeErr)) + return fmt.Errorf("error returned from app while processing pub/sub event: %w", rterrors.NewRetriable(decodeErr)) } envelope.Data = decoded } else { diag.DefaultComponentMonitoring.PubsubIngressEvent(ctx, msg.pubsub, strings.ToLower(string(pubsub.Retry)), msg.topic, 0) - return fmt.Errorf("error returned from app while processing pub/sub event: %w", NewRetriableError(ErrUnexpectedEnvelopeData)) + return fmt.Errorf("error returned from app while processing pub/sub event: %w", rterrors.NewRetriable(ErrUnexpectedEnvelopeData)) } } else if data, ok := cloudEvent[pubsub.DataField]; ok && data != nil { envelope.Data = nil @@ -2364,7 +2365,7 @@ func (a *DaprRuntime) publishMessageGRPC(ctx context.Context, msg *pubsubSubscri envelope.Data = v default: diag.DefaultComponentMonitoring.PubsubIngressEvent(ctx, msg.pubsub, strings.ToLower(string(pubsub.Retry)), msg.topic, 0) - return fmt.Errorf("error returned from app while processing pub/sub event: %w", NewRetriableError(ErrUnexpectedEnvelopeData)) + return fmt.Errorf("error returned from app while processing pub/sub event: %w", rterrors.NewRetriable(ErrUnexpectedEnvelopeData)) } } else if contenttype.IsJSONContentType(envelope.DataContentType) || contenttype.IsCloudEventContentType(envelope.DataContentType) { envelope.Data, _ = json.Marshal(data) @@ -2429,7 +2430,7 @@ func (a *DaprRuntime) publishMessageGRPC(ctx context.Context, msg *pubsubSubscri return nil } - err = fmt.Errorf("error returned from app while processing pub/sub event %v: %w", cloudEvent[pubsub.IDField], NewRetriableError(err)) + err = fmt.Errorf("error returned from app while processing pub/sub event %v: %w", cloudEvent[pubsub.IDField], rterrors.NewRetriable(err)) log.Debug(err) diag.DefaultComponentMonitoring.PubsubIngressEvent(ctx, msg.pubsub, strings.ToLower(string(pubsub.Retry)), msg.topic, elapsed) @@ -2445,7 +2446,7 @@ func (a *DaprRuntime) publishMessageGRPC(ctx context.Context, msg *pubsubSubscri return nil case runtimev1pb.TopicEventResponse_RETRY: //nolint:nosnakecase diag.DefaultComponentMonitoring.PubsubIngressEvent(ctx, msg.pubsub, strings.ToLower(string(pubsub.Retry)), msg.topic, elapsed) - return fmt.Errorf("RETRY status returned from app while processing pub/sub event %v: %w", cloudEvent[pubsub.IDField], NewRetriableError(nil)) + return fmt.Errorf("RETRY status returned from app while processing pub/sub event %v: %w", cloudEvent[pubsub.IDField], rterrors.NewRetriable(nil)) case runtimev1pb.TopicEventResponse_DROP: //nolint:nosnakecase log.Warnf("DROP status returned from app while processing pub/sub event %v", cloudEvent[pubsub.IDField]) diag.DefaultComponentMonitoring.PubsubIngressEvent(ctx, msg.pubsub, strings.ToLower(string(pubsub.Drop)), msg.topic, elapsed) @@ -2455,7 +2456,7 @@ func (a *DaprRuntime) publishMessageGRPC(ctx context.Context, msg *pubsubSubscri // Consider unknown status field as error and retry diag.DefaultComponentMonitoring.PubsubIngressEvent(ctx, msg.pubsub, strings.ToLower(string(pubsub.Retry)), msg.topic, elapsed) - return fmt.Errorf("unknown status returned from app while processing pub/sub event %v, status: %v, err: %w", cloudEvent[pubsub.IDField], res.GetStatus(), NewRetriableError(nil)) + return fmt.Errorf("unknown status returned from app while processing pub/sub event %v, status: %v, err: %w", cloudEvent[pubsub.IDField], res.GetStatus(), rterrors.NewRetriable(nil)) } func extractCloudEventExtensions(cloudEvent map[string]interface{}) (*structpb.Struct, error) { @@ -2471,12 +2472,12 @@ func extractCloudEventExtensions(cloudEvent map[string]interface{}) (*structpb.S extensionsStruct := structpb.Struct{} extensionBytes, jsonMarshalErr := json.Marshal(extensions) if jsonMarshalErr != nil { - return &extensionsStruct, fmt.Errorf("Error processing internal cloud event data: unable to marshal cloudEvent extensions: %s", jsonMarshalErr) + return &extensionsStruct, fmt.Errorf("error processing internal cloud event data: unable to marshal cloudEvent extensions: %s", jsonMarshalErr) } protoUnmarshalErr := protojson.Unmarshal(extensionBytes, &extensionsStruct) if protoUnmarshalErr != nil { - return &extensionsStruct, fmt.Errorf("Error processing internal cloud event data: unable to unmarshal cloudEvent extensions to proto struct: %s", protoUnmarshalErr) + return &extensionsStruct, fmt.Errorf("error processing internal cloud event data: unable to unmarshal cloudEvent extensions to proto struct: %s", protoUnmarshalErr) } return &extensionsStruct, nil } @@ -2498,7 +2499,7 @@ func extractCloudEventProperty(cloudEvent map[string]interface{}, property strin func (a *DaprRuntime) initActors() error { err := actors.ValidateHostEnvironment(a.runtimeConfig.mtlsEnabled, a.runtimeConfig.Mode, a.namespace) if err != nil { - return NewInitError(InitFailure, "actors", err) + return rterrors.NewInit(rterrors.InitFailure, "actors", err) } a.actorStateStoreLock.Lock() defer a.actorStateStoreLock.Unlock() @@ -2532,7 +2533,7 @@ func (a *DaprRuntime) initActors() error { a.actor = act return nil } - return NewInitError(InitFailure, "actors", err) + return rterrors.NewInit(rterrors.InitFailure, "actors", err) } func (a *DaprRuntime) namespaceComponentAuthorizer(component componentsV1alpha1.Component) bool { @@ -2710,7 +2711,7 @@ func (a *DaprRuntime) processComponentAndDependents(comp componentsV1alpha1.Comp case <-time.After(timeout): diag.DefaultMonitoring.ComponentInitFailed(comp.Spec.Type, "init", comp.ObjectMeta.Name) err := fmt.Errorf("init timeout for component %s exceeded after %s", comp.Name, timeout.String()) - return NewInitError(InitComponentFailure, comp.LogName(), err) + return rterrors.NewInit(rterrors.InitComponentFailure, comp.LogName(), err) } log.Info("Component loaded: " + comp.LogName()) @@ -3338,13 +3339,13 @@ func (a *DaprRuntime) initCryptoProvider(c componentsV1alpha1.Component) error { component, err := a.cryptoProviderRegistry.Create(c.Spec.Type, c.Spec.Version, fName) if err != nil { diag.DefaultMonitoring.ComponentInitFailed(c.Spec.Type, "creation", c.ObjectMeta.Name) - return NewInitError(CreateComponentFailure, fName, err) + return rterrors.NewInit(rterrors.CreateComponentFailure, fName, err) } err = component.Init(context.TODO(), contribCrypto.Metadata{Base: a.toBaseMetadata(c)}) if err != nil { diag.DefaultMonitoring.ComponentInitFailed(c.Spec.Type, "init", c.ObjectMeta.Name) - return NewInitError(InitComponentFailure, fName, err) + return rterrors.NewInit(rterrors.InitComponentFailure, fName, err) } a.compStore.AddCryptoProvider(c.ObjectMeta.Name, component) @@ -3357,13 +3358,13 @@ func (a *DaprRuntime) initSecretStore(c componentsV1alpha1.Component) error { secretStore, err := a.secretStoresRegistry.Create(c.Spec.Type, c.Spec.Version, fName) if err != nil { diag.DefaultMonitoring.ComponentInitFailed(c.Spec.Type, "creation", c.ObjectMeta.Name) - return NewInitError(CreateComponentFailure, fName, err) + return rterrors.NewInit(rterrors.CreateComponentFailure, fName, err) } err = secretStore.Init(context.TODO(), secretstores.Metadata{Base: a.toBaseMetadata(c)}) if err != nil { diag.DefaultMonitoring.ComponentInitFailed(c.Spec.Type, "init", c.ObjectMeta.Name) - return NewInitError(InitComponentFailure, fName, err) + return rterrors.NewInit(rterrors.InitComponentFailure, fName, err) } a.compStore.AddSecretStore(c.ObjectMeta.Name, secretStore) diff --git a/pkg/runtime/runtime_test.go b/pkg/runtime/runtime_test.go index 9684986d269..9ca6ac4bbef 100644 --- a/pkg/runtime/runtime_test.go +++ b/pkg/runtime/runtime_test.go @@ -92,6 +92,7 @@ import ( runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1" "github.com/dapr/dapr/pkg/resiliency" "github.com/dapr/dapr/pkg/runtime/compstore" + rterrors "github.com/dapr/dapr/pkg/runtime/errors" runtimePubsub "github.com/dapr/dapr/pkg/runtime/pubsub" "github.com/dapr/dapr/pkg/runtime/security" "github.com/dapr/dapr/pkg/scopes" @@ -365,7 +366,7 @@ func TestDoProcessComponent(t *testing.T) { // assert assert.Error(t, err, "expected an error") - assert.Equal(t, err.Error(), NewInitError(InitComponentFailure, "testlock (lock.mockLock/v1)", assert.AnError).Error(), "expected error strings to match") + assert.Equal(t, err.Error(), rterrors.NewInit(rterrors.InitComponentFailure, "testlock (lock.mockLock/v1)", assert.AnError).Error(), "expected error strings to match") }) t.Run("test error when lock version invalid", func(t *testing.T) { @@ -388,7 +389,7 @@ func TestDoProcessComponent(t *testing.T) { // assert assert.Error(t, err, "expected an error") - assert.Equal(t, err.Error(), NewInitError(CreateComponentFailure, "testlock (lock.mockLock/v3)", fmt.Errorf("couldn't find lock store lock.mockLock/v3")).Error()) + assert.Equal(t, err.Error(), rterrors.NewInit(rterrors.CreateComponentFailure, "testlock (lock.mockLock/v3)", fmt.Errorf("couldn't find lock store lock.mockLock/v3")).Error()) }) t.Run("test error when lock prefix strategy invalid", func(t *testing.T) { @@ -466,7 +467,7 @@ func TestDoProcessComponent(t *testing.T) { // assert assert.Error(t, err, "expected an error") - assert.Equal(t, err.Error(), NewInitError(InitComponentFailure, "testpubsub (pubsub.mockPubSub/v1)", assert.AnError).Error(), "expected error strings to match") + assert.Equal(t, err.Error(), rterrors.NewInit(rterrors.InitComponentFailure, "testpubsub (pubsub.mockPubSub/v1)", assert.AnError).Error(), "expected error strings to match") }) t.Run("test invalid category component", func(t *testing.T) { @@ -862,7 +863,7 @@ func TestInitState(t *testing.T) { // assert assert.Error(t, err, "expected error") - assert.Equal(t, err.Error(), NewInitError(InitComponentFailure, "testpubsub (state.mockState/v1)", assert.AnError).Error(), "expected error strings to match") + assert.Equal(t, err.Error(), rterrors.NewInit(rterrors.InitComponentFailure, "testpubsub (state.mockState/v1)", assert.AnError).Error(), "expected error strings to match") }) t.Run("test init state store, encryption not enabled", func(t *testing.T) { @@ -3184,7 +3185,7 @@ func TestOnNewPublishedMessage(t *testing.T) { // assert var cloudEvent map[string]interface{} json.Unmarshal(testPubSubMessage.data, &cloudEvent) - expectedClientError := fmt.Errorf("RETRY status returned from app while processing pub/sub event %v: %w", cloudEvent["id"].(string), NewRetriableError(nil)) + expectedClientError := fmt.Errorf("RETRY status returned from app while processing pub/sub event %v: %w", cloudEvent["id"].(string), rterrors.NewRetriable(nil)) assert.Equal(t, expectedClientError.Error(), err.Error()) mockAppChannel.AssertNumberOfCalls(t, "InvokeMethod", 1) }) @@ -3280,7 +3281,7 @@ func TestOnNewPublishedMessage(t *testing.T) { err := rt.publishMessageHTTP(context.Background(), testPubSubMessage) // assert - expectedError := fmt.Errorf("error returned from app channel while sending pub/sub event to app: %w", NewRetriableError(invokeError)) + expectedError := fmt.Errorf("error returned from app channel while sending pub/sub event to app: %w", rterrors.NewRetriable(invokeError)) assert.Equal(t, expectedError.Error(), err.Error(), "expected errors to match") mockAppChannel.AssertNumberOfCalls(t, "InvokeMethod", 1) }) @@ -3322,7 +3323,7 @@ func TestOnNewPublishedMessage(t *testing.T) { var cloudEvent map[string]interface{} json.Unmarshal(testPubSubMessage.data, &cloudEvent) errMsg := fmt.Sprintf("retriable error returned from app while processing pub/sub event %v, topic: %v, body: Internal Error. status code returned: 500", cloudEvent["id"].(string), cloudEvent["topic"]) - expectedClientError := NewRetriableError(errors.New(errMsg)) + expectedClientError := rterrors.NewRetriable(errors.New(errMsg)) assert.Equal(t, expectedClientError.Error(), err.Error()) mockAppChannel.AssertNumberOfCalls(t, "InvokeMethod", 1) }) @@ -3394,7 +3395,7 @@ func TestOnNewPublishedMessageGRPC(t *testing.T) { expectedError: fmt.Errorf( "error returned from app while processing pub/sub event %v: %w", testPubSubMessage.cloudEvent[pubsub.IDField], - NewRetriableError(status.Error(codes.Unknown, assert.AnError.Error())), + rterrors.NewRetriable(status.Error(codes.Unknown, assert.AnError.Error())), ), }, { @@ -3419,7 +3420,7 @@ func TestOnNewPublishedMessageGRPC(t *testing.T) { expectedError: fmt.Errorf( "RETRY status returned from app while processing pub/sub event %v: %w", testPubSubMessage.cloudEvent[pubsub.IDField], - NewRetriableError(nil), + rterrors.NewRetriable(nil), ), }, { @@ -3435,7 +3436,7 @@ func TestOnNewPublishedMessageGRPC(t *testing.T) { "unknown status returned from app while processing pub/sub event %v, status: %v, err: %w", testPubSubMessage.cloudEvent[pubsub.IDField], runtimev1pb.TopicEventResponse_TopicEventResponseStatus(99), - NewRetriableError(nil), + rterrors.NewRetriable(nil), ), }, {