Skip to content

Commit

Permalink
feature: Support Bindings Pluggable Components (dapr#5195)
Browse files Browse the repository at this point in the history
* Support Bindings Pluggable Components

Signed-off-by: Marcos Candeia <[email protected]>

* Outputbindings grpc unittesting

Signed-off-by: Marcos Candeia <[email protected]>

* Inputbinding unit testing

Signed-off-by: Marcos Candeia <[email protected]>

* Add e2e bindings test

Signed-off-by: Marcos Candeia <[email protected]>

* Added kafka bindings test

Signed-off-by: Marcos Candeia <[email protected]>

* Move category to components package

Signed-off-by: Marcos Candeia <[email protected]>

* Rebase fixes

Signed-off-by: Marcos Candeia <[email protected]>

* Add grpc app protocol

Signed-off-by: Marcos Candeia <[email protected]>

* Add test topic grpc as env var

Signed-off-by: Marcos Candeia <[email protected]>

* Add custom route for pluggable component grpc input binding

Signed-off-by: Marcos Candeia <[email protected]>

* Add different name for containers

Signed-off-by: Marcos Candeia <[email protected]>

* Add consumer group pluggable for consumerGroup name

Signed-off-by: Marcos Candeia <[email protected]>

* Add output component for outputbinding app

Signed-off-by: Marcos Candeia <[email protected]>

* Add category docs

Signed-off-by: Marcos Candeia <[email protected]>

* Fix inputbinding service name

Signed-off-by: Marcos Candeia <[email protected]>

* Skip bindings test

Signed-off-by: Marcos Candeia <[email protected]>

* Fix memory ref on loop

Signed-off-by: Marcos Candeia <[email protected]>

* Fix grpc socket test

Signed-off-by: Marcos Candeia <[email protected]>

* Add pluggable custom path

Signed-off-by: Marcos Candeia <[email protected]>

* Fix bindings docs

Signed-off-by: Marcos Candeia <[email protected]>

* Use component name as spec pluggable components CRD

Signed-off-by: Marcos Candeia <[email protected]>

* Reuse grpc test server

Signed-off-by: Marcos Candeia <[email protected]>

* Fix bindings proto comments

Signed-off-by: Marcos Candeia <[email protected]>

Signed-off-by: Marcos Candeia <[email protected]>
  • Loading branch information
mcandeia authored Sep 26, 2022
1 parent 5b468cb commit 0844e7a
Show file tree
Hide file tree
Showing 28 changed files with 3,231 additions and 114 deletions.
5 changes: 5 additions & 0 deletions charts/dapr/crds/pluggable-components.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ spec:
spec:
description: PluggableComponentSpec is the spec for a pluggable component.
properties:
componentName:
description:
ComponentName is the component name. if not specified,
metadata.Name will be used.
type: string
type:
type: string
version:
Expand Down
121 changes: 121 additions & 0 deletions dapr/proto/components/v1/bindings.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
Copyright 2022 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
syntax = "proto3";

package dapr.proto.components.v1;

import "dapr/proto/components/v1/common.proto";

option go_package = "github.com/dapr/dapr/pkg/proto/components/v1;components";

// Interface for input bindings
service InputBinding {
// Initializes the inputbinding component component with the given metadata.
rpc Init(InputBindingInitRequest) returns (InputBindingInitResponse) {}

// Establishes a stream with the server, which sends messages down to the
// client. The client streams acknowledgements back to the server. The server
// will close the stream and return the status on any error. In case of closed
// connection, the client should re-establish the stream.
rpc Read(stream ReadRequest) returns (stream ReadResponse) {}

// Ping the InputBinding. Used for liveness porpuses.
rpc Ping(PingRequest) returns (PingResponse) {}
}

service OutputBinding {
// Initializes the outputbinding component component with the given metadata.
rpc Init(OutputBindingInitRequest) returns (OutputBindingInitResponse) {}

// Invoke remote systems with optional payloads.
rpc Invoke(InvokeRequest) returns (InvokeResponse) {}

// ListOperations list system supported operations.
rpc ListOperations(ListOperationsRequest) returns (ListOperationsResponse) {}

// Ping the OutputBinding. Used for liveness porpuses.
rpc Ping(PingRequest) returns (PingResponse) {}
}
// reserved for future-proof extensibility
message ListOperationsRequest {}

message ListOperationsResponse {
// the list of all supported component operations.
repeated string operations = 1;
}

// InputBindingInitRequest is the request for initializing the input binding
// component.
message InputBindingInitRequest {
// The metadata request.
MetadataRequest metadata = 1;
}

// reserved for future-proof extensibility
message InputBindingInitResponse {}

// OutputBindingInitRequest is the request for initializing the output binding
// component.
message OutputBindingInitRequest {
// The metadata request.
MetadataRequest metadata = 1;
}

// reserved for future-proof extensibility
message OutputBindingInitResponse {}

// Used for describing errors when ack'ing messages.
message AckResponseError {
string message = 1;
}

message ReadRequest {
// The handle response.
bytes response_data = 1;
// The unique message ID.
string message_id = 2;
// Optional, should not be fulfilled when the message was successfully
// handled.
AckResponseError response_error = 3;
}

message ReadResponse {
// The Read binding Data.
bytes data = 1;
// The message metadata
map<string, string> metadata = 2;
// The message content type.
string content_type = 3;
// The {transient} message ID used for ACK-ing it later.
string message_id = 4;
}

// Used for invoking systems with optional payload.
message InvokeRequest {
// The invoke payload.
bytes data = 1;
// The invoke metadata.
map<string, string> metadata = 2;
// The system supported operation.
string operation = 3;
}

// Response from the invoked system.
message InvokeResponse {
// The response payload.
bytes data = 1;
// The response metadata.
map<string, string> metadata = 2;
// The response content-type.
string content_type = 3;
}
3 changes: 3 additions & 0 deletions pkg/apis/components/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ type PluggableComponentList struct {
type PluggableComponentSpec struct {
Type string `json:"type"`
Version string `json:"version"`
// ComponentName is the component name. if not specified, metadata.Name will be used.
//+optional
ComponentName string `json:"componentName"`
}

// ComponentSpec is the spec for a component.
Expand Down
151 changes: 151 additions & 0 deletions pkg/components/bindings/input_pluggable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
Copyright 2022 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package bindings

import (
"context"
"io"
"sync"

"github.com/dapr/dapr/pkg/components"
"github.com/dapr/dapr/pkg/components/pluggable"
proto "github.com/dapr/dapr/pkg/proto/components/v1"

"github.com/dapr/components-contrib/bindings"

"github.com/dapr/kit/logger"

"github.com/pkg/errors"
)

// grpcInputBinding is a implementation of a inputbinding over a gRPC Protocol.
type grpcInputBinding struct {
*pluggable.GRPCConnector[proto.InputBindingClient]
bindings.InputBinding
logger logger.Logger
}

// Init initializes the grpc inputbinding passing out the metadata to the grpc component.
func (b *grpcInputBinding) Init(metadata bindings.Metadata) error {
if err := b.Dial(metadata.Name); err != nil {
return err
}

protoMetadata := &proto.MetadataRequest{
Properties: metadata.Properties,
}

_, err := b.Client.Init(b.Context, &proto.InputBindingInitRequest{
Metadata: protoMetadata,
})
return err
}

type readHandler = func(*proto.ReadResponse)

// adaptHandler returns a non-error function that handle the message with the given handler and ack when returns.
//
//nolint:nosnakecase
func (b *grpcInputBinding) adaptHandler(ctx context.Context, streamingPull proto.InputBinding_ReadClient, handler bindings.Handler) readHandler {
safeSend := &sync.Mutex{}
return func(msg *proto.ReadResponse) {
var contentType *string
if len(msg.ContentType) != 0 {
contentType = &msg.ContentType
}
m := bindings.ReadResponse{
Data: msg.Data,
Metadata: msg.Metadata,
ContentType: contentType,
}

var respErr *proto.AckResponseError
bts, err := handler(ctx, &m)
if err != nil {
b.logger.Errorf("error when handling message for message: %s", msg.MessageId)
respErr = &proto.AckResponseError{
Message: err.Error(),
}
}

// As per documentation:
// When using streams,
// one must take care to avoid calling either SendMsg or RecvMsg multiple times against the same Stream from different goroutines.
// In other words, it's safe to have a goroutine calling SendMsg and another goroutine calling RecvMsg on the same stream at the same time.
// But it is not safe to call SendMsg on the same stream in different goroutines, or to call RecvMsg on the same stream in different goroutines.
// https://github.com/grpc/grpc-go/blob/master/Documentation/concurrency.md#streams
safeSend.Lock()
defer safeSend.Unlock()

if err := streamingPull.Send(&proto.ReadRequest{
ResponseData: bts,
ResponseError: respErr,
MessageId: msg.MessageId,
}); err != nil {
b.logger.Errorf("error when ack'ing message %s", msg.MessageId)
}
}
}

// Read starts a bi-di stream reading messages from component and handling it used the given handler.
func (b *grpcInputBinding) Read(ctx context.Context, handler bindings.Handler) error {
readStream, err := b.Client.Read(ctx)
if err != nil {
return errors.Wrapf(err, "unable to read from binding")
}

streamCtx, cancel := context.WithCancel(readStream.Context())
handle := b.adaptHandler(streamCtx, readStream, handler)

go func() {
defer cancel()
for {
msg, err := readStream.Recv()
if err == io.EOF { // no more reads
return
}

// TODO reconnect on error
if err != nil {
b.logger.Errorf("failed to receive message: %v", err)
return
}
go handle(msg)
}
}()

return nil
}

// inputFromConnector creates a new GRPC inputbinding using the given underlying connector.
func inputFromConnector(l logger.Logger, connector *pluggable.GRPCConnector[proto.InputBindingClient]) *grpcInputBinding {
return &grpcInputBinding{
GRPCConnector: connector,
logger: l,
}
}

// NewGRPCInputBinding creates a new grpc inputbindingusing the given socket factory.
func NewGRPCInputBinding(l logger.Logger, socketFactory func(string) string) *grpcInputBinding {
return inputFromConnector(l, pluggable.NewGRPCConnectorWithFactory(socketFactory, proto.NewInputBindingClient))
}

// newGRPCInputBinding creates a new input binding for the given pluggable component.
func newGRPCInputBinding(l logger.Logger, pc components.Pluggable) bindings.InputBinding {
return inputFromConnector(l, pluggable.NewGRPCConnector(pc, proto.NewInputBindingClient))
}

func init() {
pluggable.AddRegistryFor(components.InputBinding, DefaultRegistry.RegisterInputBinding, newGRPCInputBinding)
}
Loading

0 comments on commit 0844e7a

Please sign in to comment.