diff --git a/pkg/info/types.go b/pkg/info/types.go index 47b9de61..fe24e2d5 100644 --- a/pkg/info/types.go +++ b/pkg/info/types.go @@ -27,6 +27,7 @@ const ( Sessionreducer ContainerType = "sessionreducer" Sideinput ContainerType = "sideinput" Fbsinker ContainerType = "fb-sinker" + Serving ContainerType = "serving" ) type MapMode string @@ -53,6 +54,7 @@ var MinimumNumaflowVersion = map[ContainerType]string{ Sessionreducer: "1.4.0-z", Sideinput: "1.4.0-z", Fbsinker: "1.4.0-z", + Serving: "1.5.0-z", } // ServerInfo is the information about the server diff --git a/pkg/servingstore/doc.go b/pkg/servingstore/doc.go new file mode 100644 index 00000000..90306fe1 --- /dev/null +++ b/pkg/servingstore/doc.go @@ -0,0 +1,5 @@ +// Package servingstore implements the server code for writing user-defined serving store. + +// Examples: https://github.com/numaproj/numaflow-go/tree/main/pkg/servingstore/examples/ + +package servingstore diff --git a/pkg/servingstore/interface.go b/pkg/servingstore/interface.go new file mode 100644 index 00000000..9f8e8302 --- /dev/null +++ b/pkg/servingstore/interface.go @@ -0,0 +1,35 @@ +package servingstore + +import ( + "context" +) + +// ServingStorer is the interface for serving store to store and retrieve from a custom store. +type ServingStorer interface { + // Put is to put data into the Serving Store. + Put(ctx context.Context, put PutRequester) + + // Get is to retrieve data from the Serving Store. + Get(ctx context.Context) StoredResults +} + +// PutRequester interface exposes methods to retrieve data from the Put rpc. +type PutRequester interface { + Origin() string + Payload() [][]byte +} + +type PutRequest struct { + origin string + payloads [][]byte +} + +// Origin returns the origin name. +func (p *PutRequest) Origin() string { + return p.origin +} + +// Payload returns the payloads to be stored. +func (p *PutRequest) Payload() [][]byte { + return p.payloads +} diff --git a/pkg/servingstore/message.go b/pkg/servingstore/message.go new file mode 100644 index 00000000..7a345193 --- /dev/null +++ b/pkg/servingstore/message.go @@ -0,0 +1,41 @@ +package servingstore + +// StoredResult is the data stored in the store per origin. +type StoredResult struct { + origin string + payloads []Payload +} + +// NewStoredResult creates a new StoreResult from the provided origin and payloads. +func NewStoredResult(origin string, payloads []Payload) StoredResult { + return StoredResult{origin: origin, payloads: payloads} +} + +// Payload is each independent result stored in the Store for the given ID. +type Payload struct { + value []byte +} + +// NewPayload creates a new Payload from the given value. +func NewPayload(value []byte) Payload { + return Payload{value: value} +} + +// StoredResults contains 0, 1, or more StoredResult. +type StoredResults []StoredResult + +// StoredResultsBuilder returns an empty instance of StoredResults +func StoredResultsBuilder() StoredResults { + return StoredResults{} +} + +// Append appends a StoredResult +func (r StoredResults) Append(msg StoredResult) StoredResults { + r = append(r, msg) + return r +} + +// Items returns the StoredResults list +func (r StoredResults) Items() []StoredResult { + return r +} diff --git a/pkg/servingstore/options.go b/pkg/servingstore/options.go new file mode 100644 index 00000000..f6502f77 --- /dev/null +++ b/pkg/servingstore/options.go @@ -0,0 +1,41 @@ +package servingstore + +// options is the struct to hold the server options. +type options struct { + sockAddr string + maxMessageSize int + serverInfoFilePath string +} + +// Option is the interface to apply options. +type Option func(*options) + +// defaultOptions returns the default options. +func defaultOptions() *options { + return &options{ + sockAddr: address, + maxMessageSize: defaultMaxMessageSize, + serverInfoFilePath: serverInfoFilePath, + } +} + +// WithMaxMessageSize sets the server max receive message size and the server max send message size to the given size. +func WithMaxMessageSize(size int) Option { + return func(opts *options) { + opts.maxMessageSize = size + } +} + +// WithSockAddr start the server with the given sock addr. This is mainly used for testing purpose. +func WithSockAddr(addr string) Option { + return func(opts *options) { + opts.sockAddr = addr + } +} + +// WithServerInfoFilePath sets the server info file path to the given path. +func WithServerInfoFilePath(f string) Option { + return func(opts *options) { + opts.serverInfoFilePath = f + } +} diff --git a/pkg/servingstore/server.go b/pkg/servingstore/server.go new file mode 100644 index 00000000..2e75ab74 --- /dev/null +++ b/pkg/servingstore/server.go @@ -0,0 +1,94 @@ +package servingstore + +import ( + "context" + "fmt" + "log" + "os/signal" + "sync" + "syscall" + + "google.golang.org/grpc" + + numaflow "github.com/numaproj/numaflow-go/pkg" + servingpb "github.com/numaproj/numaflow-go/pkg/apis/proto/serving/v1" + "github.com/numaproj/numaflow-go/pkg/info" + "github.com/numaproj/numaflow-go/pkg/shared" +) + +type server struct { + grpcServer *grpc.Server + svc *Service + opts *options + shutdownCh <-chan struct{} +} + +// Start starts the gRPC Server. +func (s server) Start(ctx context.Context) error { + // write server info to the file + serverInfo := info.GetDefaultServerInfo() + serverInfo.MinimumNumaflowVersion = info.MinimumNumaflowVersion[info.Sourcer] + // start listening on unix domain socket + lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath, serverInfo) + if err != nil { + return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err) + } + + ctxWithSignal, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) + defer stop() + + // close the listener + defer func() { _ = lis.Close() }() + + // create a grpc server + s.grpcServer = shared.CreateGRPCServer(s.opts.maxMessageSize) + + servingpb.RegisterServingStoreServer(s.grpcServer, s.svc) + + // start a go routine to stop the server gracefully when the context is done + // or a shutdown signal is received from the service + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + select { + case <-s.shutdownCh: + log.Printf("shutdown signal received") + case <-ctxWithSignal.Done(): + } + shared.StopGRPCServer(s.grpcServer) + }() + + // start the grpc server + if err := s.grpcServer.Serve(lis); err != nil { + return fmt.Errorf("failed to start the gRPC server: %v", err) + } + + // wait for the graceful shutdown to complete + wg.Wait() + return nil +} + +// NewServer creates a new server object. +func NewServer( + servingStore ServingStorer, + inputOptions ...Option) numaflow.Server { + var opts = defaultOptions() + + for _, inputOption := range inputOptions { + inputOption(opts) + } + shutdownCh := make(chan struct{}) + + // create a new service and server + svc := &Service{ + ServingStore: servingStore, + shutdownCh: shutdownCh, + } + + return &server{ + svc: svc, + shutdownCh: shutdownCh, + opts: opts, + } +} diff --git a/pkg/servingstore/service.go b/pkg/servingstore/service.go new file mode 100644 index 00000000..9b1234f7 --- /dev/null +++ b/pkg/servingstore/service.go @@ -0,0 +1,67 @@ +package servingstore + +import ( + "context" + "errors" + "log" + "runtime/debug" + + epb "google.golang.org/genproto/googleapis/rpc/errdetails" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/emptypb" + + servingpb "github.com/numaproj/numaflow-go/pkg/apis/proto/serving/v1" +) + +const ( + uds = "unix" + address = "/var/run/numaflow/serving.sock" + defaultMaxMessageSize = 1024 * 1024 * 64 // 64MB + serverInfoFilePath = "/var/run/numaflow/serving-server-info" +) + +// Service implements the proto gen server interface +type Service struct { + servingpb.UnimplementedServingStoreServer + ServingStore ServingStorer + shutdownCh chan<- struct{} +} + +var errServingStorePanic = errors.New("UDF_EXECUTION_ERROR(serving)") + +func handlePanic() (err error) { + if r := recover(); r != nil { + log.Printf("panic inside map handler: %v %v", r, string(debug.Stack())) + st, _ := status.Newf(codes.Internal, "%s: %v", errServingStorePanic, r).WithDetails(&epb.DebugInfo{ + Detail: string(debug.Stack()), + }) + err = st.Err() + } + + return err +} + +func (s *Service) Put(ctx context.Context, request *servingpb.PutRequest) (*servingpb.PutResponse, error) { + var err error + // handle panic + defer func() { err = handlePanic() }() + + var payloads = make([][]byte, 0, len(request.Payloads)) + for _, payload := range request.Payloads { + payloads = append(payloads, payload.Value) + } + + s.ServingStore.Put(ctx, &PutRequest{origin: request.Origin, payloads: payloads}) + + return &servingpb.PutResponse{Success: true}, err +} + +func (s *Service) Get(ctx context.Context, request *servingpb.GetRequest) (*servingpb.GetResponse, error) { + //TODO implement me + panic("implement me") +} + +func (s *Service) IsReady(_ context.Context, _ *emptypb.Empty) (*servingpb.ReadyResponse, error) { + return &servingpb.ReadyResponse{Ready: true}, nil +}