Skip to content

Commit

Permalink
feat: completed wiring for serving store
Browse files Browse the repository at this point in the history
Signed-off-by: Vigith Maurice <[email protected]>
  • Loading branch information
vigith committed Feb 22, 2025
1 parent 1721be1 commit 0bf0936
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pkg/info/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
Sessionreducer ContainerType = "sessionreducer"
Sideinput ContainerType = "sideinput"
Fbsinker ContainerType = "fb-sinker"
Serving ContainerType = "serving"
)

type MapMode string
Expand All @@ -53,6 +54,7 @@ var MinimumNumaflowVersion = map[ContainerType]string{
Sessionreducer: "1.4.0-z",
Sideinput: "1.4.0-z",
Fbsinker: "1.4.0-z",
Serving: "1.5.0-z",
}

// ServerInfo is the information about the server
Expand Down
5 changes: 5 additions & 0 deletions pkg/servingstore/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// Package servingstore implements the server code for writing user-defined serving store.

// Examples: https://github.com/numaproj/numaflow-go/tree/main/pkg/servingstore/examples/

package servingstore
35 changes: 35 additions & 0 deletions pkg/servingstore/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package servingstore

import (
"context"
)

// ServingStorer is the interface for serving store to store and retrieve from a custom store.
type ServingStorer interface {
// Put is to put data into the Serving Store.
Put(ctx context.Context, put PutRequester)

// Get is to retrieve data from the Serving Store.
Get(ctx context.Context) StoredResults
}

// PutRequester interface exposes methods to retrieve data from the Put rpc.
type PutRequester interface {
Origin() string
Payload() [][]byte
}

type PutRequest struct {
origin string
payloads [][]byte
}

// Origin returns the origin name.
func (p *PutRequest) Origin() string {
return p.origin
}

// Payload returns the payloads to be stored.
func (p *PutRequest) Payload() [][]byte {
return p.payloads
}
41 changes: 41 additions & 0 deletions pkg/servingstore/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package servingstore

// StoredResult is the data stored in the store per origin.
type StoredResult struct {
origin string
payloads []Payload
}

// NewStoredResult creates a new StoreResult from the provided origin and payloads.
func NewStoredResult(origin string, payloads []Payload) StoredResult {
return StoredResult{origin: origin, payloads: payloads}
}

// Payload is each independent result stored in the Store for the given ID.
type Payload struct {
value []byte
}

// NewPayload creates a new Payload from the given value.
func NewPayload(value []byte) Payload {
return Payload{value: value}
}

// StoredResults contains 0, 1, or more StoredResult.
type StoredResults []StoredResult

// StoredResultsBuilder returns an empty instance of StoredResults
func StoredResultsBuilder() StoredResults {
return StoredResults{}
}

// Append appends a StoredResult
func (r StoredResults) Append(msg StoredResult) StoredResults {
r = append(r, msg)
return r
}

// Items returns the StoredResults list
func (r StoredResults) Items() []StoredResult {
return r
}
41 changes: 41 additions & 0 deletions pkg/servingstore/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package servingstore

// options is the struct to hold the server options.
type options struct {
sockAddr string
maxMessageSize int
serverInfoFilePath string
}

// Option is the interface to apply options.
type Option func(*options)

// defaultOptions returns the default options.
func defaultOptions() *options {
return &options{
sockAddr: address,
maxMessageSize: defaultMaxMessageSize,
serverInfoFilePath: serverInfoFilePath,
}
}

// WithMaxMessageSize sets the server max receive message size and the server max send message size to the given size.
func WithMaxMessageSize(size int) Option {
return func(opts *options) {
opts.maxMessageSize = size
}
}

// WithSockAddr start the server with the given sock addr. This is mainly used for testing purpose.
func WithSockAddr(addr string) Option {
return func(opts *options) {
opts.sockAddr = addr
}
}

// WithServerInfoFilePath sets the server info file path to the given path.
func WithServerInfoFilePath(f string) Option {
return func(opts *options) {
opts.serverInfoFilePath = f
}
}
94 changes: 94 additions & 0 deletions pkg/servingstore/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package servingstore

import (
"context"
"fmt"
"log"
"os/signal"
"sync"
"syscall"

"google.golang.org/grpc"

numaflow "github.com/numaproj/numaflow-go/pkg"
servingpb "github.com/numaproj/numaflow-go/pkg/apis/proto/serving/v1"
"github.com/numaproj/numaflow-go/pkg/info"
"github.com/numaproj/numaflow-go/pkg/shared"
)

type server struct {
grpcServer *grpc.Server
svc *Service
opts *options
shutdownCh <-chan struct{}
}

// Start starts the gRPC Server.
func (s server) Start(ctx context.Context) error {
// write server info to the file
serverInfo := info.GetDefaultServerInfo()
serverInfo.MinimumNumaflowVersion = info.MinimumNumaflowVersion[info.Sourcer]
// start listening on unix domain socket
lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath, serverInfo)
if err != nil {
return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err)
}

ctxWithSignal, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer stop()

// close the listener
defer func() { _ = lis.Close() }()

// create a grpc server
s.grpcServer = shared.CreateGRPCServer(s.opts.maxMessageSize)

servingpb.RegisterServingStoreServer(s.grpcServer, s.svc)

// start a go routine to stop the server gracefully when the context is done
// or a shutdown signal is received from the service
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-s.shutdownCh:
log.Printf("shutdown signal received")
case <-ctxWithSignal.Done():
}
shared.StopGRPCServer(s.grpcServer)
}()

// start the grpc server
if err := s.grpcServer.Serve(lis); err != nil {
return fmt.Errorf("failed to start the gRPC server: %v", err)
}

// wait for the graceful shutdown to complete
wg.Wait()
return nil
}

// NewServer creates a new server object.
func NewServer(
servingStore ServingStorer,
inputOptions ...Option) numaflow.Server {
var opts = defaultOptions()

for _, inputOption := range inputOptions {
inputOption(opts)
}
shutdownCh := make(chan struct{})

// create a new service and server
svc := &Service{
ServingStore: servingStore,
shutdownCh: shutdownCh,
}

return &server{
svc: svc,
shutdownCh: shutdownCh,
opts: opts,
}
}
67 changes: 67 additions & 0 deletions pkg/servingstore/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package servingstore

import (
"context"
"errors"
"log"
"runtime/debug"

epb "google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"

servingpb "github.com/numaproj/numaflow-go/pkg/apis/proto/serving/v1"
)

const (
uds = "unix"
address = "/var/run/numaflow/serving.sock"
defaultMaxMessageSize = 1024 * 1024 * 64 // 64MB
serverInfoFilePath = "/var/run/numaflow/serving-server-info"
)

// Service implements the proto gen server interface
type Service struct {
servingpb.UnimplementedServingStoreServer
ServingStore ServingStorer
shutdownCh chan<- struct{}
}

var errServingStorePanic = errors.New("UDF_EXECUTION_ERROR(serving)")

func handlePanic() (err error) {
if r := recover(); r != nil {
log.Printf("panic inside map handler: %v %v", r, string(debug.Stack()))
st, _ := status.Newf(codes.Internal, "%s: %v", errServingStorePanic, r).WithDetails(&epb.DebugInfo{
Detail: string(debug.Stack()),
})
err = st.Err()
}

return err
}

func (s *Service) Put(ctx context.Context, request *servingpb.PutRequest) (*servingpb.PutResponse, error) {
var err error
// handle panic
defer func() { err = handlePanic() }()

var payloads = make([][]byte, 0, len(request.Payloads))
for _, payload := range request.Payloads {
payloads = append(payloads, payload.Value)
}

s.ServingStore.Put(ctx, &PutRequest{origin: request.Origin, payloads: payloads})

return &servingpb.PutResponse{Success: true}, err
}

func (s *Service) Get(ctx context.Context, request *servingpb.GetRequest) (*servingpb.GetResponse, error) {
//TODO implement me
panic("implement me")
}

func (s *Service) IsReady(_ context.Context, _ *emptypb.Empty) (*servingpb.ReadyResponse, error) {
return &servingpb.ReadyResponse{Ready: true}, nil
}

0 comments on commit 0bf0936

Please sign in to comment.