Skip to content

Commit

Permalink
batch map
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Jul 24, 2024
1 parent 9006d72 commit cbd246b
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 13 deletions.
55 changes: 42 additions & 13 deletions pkg/batchmapper/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package batchmapper
import (
"context"
"fmt"
"log"
"os/signal"
"sync"
"syscall"

"google.golang.org/grpc"

"github.com/numaproj/numaflow-go/pkg"
batchmappb "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1"
"github.com/numaproj/numaflow-go/pkg/info"
Expand All @@ -15,8 +17,10 @@ import (

// server is a map gRPC server.
type server struct {
svc *Service
opts *options
grpcServer *grpc.Server
svc *Service
opts *options
shutdownCh <-chan struct{}
}

// NewServer creates a new batch map server.
Expand All @@ -25,11 +29,19 @@ func NewServer(m BatchMapper, inputOptions ...Option) numaflow.Server {
for _, inputOption := range inputOptions {
inputOption(opts)
}
s := new(server)
s.svc = new(Service)
s.svc.BatchMapper = m
s.opts = opts
return s
shutdownCh := make(chan struct{})

// create a new service and server
svc := &Service{
BatchMapper: m,
shutdownCh: shutdownCh,
}

return &server{
svc: svc,
shutdownCh: shutdownCh,
opts: opts,
}
}

// Start starts the batch map server.
Expand All @@ -51,13 +63,30 @@ func (m *server) Start(ctx context.Context) error {
defer func() { _ = lis.Close() }()

// create a grpc server
grpcServer := shared.CreateGRPCServer(m.opts.maxMessageSize)
defer log.Println("Successfully stopped the gRPC server")
defer grpcServer.GracefulStop()
m.grpcServer = shared.CreateGRPCServer(m.opts.maxMessageSize)

// register the batch map service
batchmappb.RegisterBatchMapServer(grpcServer, m.svc)
batchmappb.RegisterBatchMapServer(m.grpcServer, m.svc)

// start a go routine to stop the server gracefully when the context is done
// or a shutdown signal is received from the service
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-m.shutdownCh:
case <-ctxWithSignal.Done():
}
shared.StopGRPCServer(m.grpcServer)
}()

// start the grpc server
return shared.StartGRPCServer(ctxWithSignal, grpcServer, lis)
if err := m.grpcServer.Serve(lis); err != nil {
return fmt.Errorf("failed to start the gRPC server: %v", err)
}

// wait for the graceful shutdown to complete
wg.Wait()
return nil
}
7 changes: 7 additions & 0 deletions pkg/batchmapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
type Service struct {
batchmappb.UnimplementedBatchMapServer
BatchMapper BatchMapper
shutdownCh chan<- struct{}
}

// IsReady returns true to indicate the gRPC connection is ready.
Expand All @@ -51,6 +52,12 @@ func (fs *Service) BatchMapFn(stream batchmappb.BatchMap_BatchMapFnServer) error

// go routine to invoke the user handler function, and process the responses.
g.Go(func() error {
// handle panic
defer func() {
if r := recover(); r != nil {
fs.shutdownCh <- struct{}{}
}
}()
// Apply the user BatchMap implementation function
responses := fs.BatchMapper.BatchMap(ctx, datumStreamCh)

Expand Down

0 comments on commit cbd246b

Please sign in to comment.