Skip to content

Commit

Permalink
refactor source and sink
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Sep 23, 2024
1 parent 32a0120 commit ab1c9b4
Show file tree
Hide file tree
Showing 2 changed files with 245 additions and 209 deletions.
164 changes: 89 additions & 75 deletions pkg/sinker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,36 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*sinkpb.ReadyRespon
// SinkFn applies a sink function to a every element.
func (fs *Service) SinkFn(stream sinkpb.Sink_SinkFnServer) error {
ctx := stream.Context()

// Perform handshake before entering the main loop
if err := fs.performHandshake(stream); err != nil {
return err
}

for {
datumStreamCh := make(chan Datum)
g, ctx := errgroup.WithContext(ctx)

g.Go(func() error {
return fs.receiveRequests(stream, datumStreamCh)
})

g.Go(func() error {
return fs.processData(ctx, stream, datumStreamCh)
})

// Wait for the goroutines to finish
if err := g.Wait(); err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return err
}
}
}

// performHandshake performs the handshake with the client.
func (fs *Service) performHandshake(stream sinkpb.Sink_SinkFnServer) error {
req, err := stream.Recv()
if err != nil {
log.Printf("error receiving handshake from stream: %v", err)
Expand All @@ -86,7 +115,6 @@ func (fs *Service) SinkFn(stream sinkpb.Sink_SinkFnServer) error {
return fmt.Errorf("expected handshake message")
}

// Send handshake response
handshakeResponse := &sinkpb.SinkResponse{
Result: &sinkpb.SinkResponse_Result{
Status: sinkpb.Status_SUCCESS,
Expand All @@ -99,86 +127,72 @@ func (fs *Service) SinkFn(stream sinkpb.Sink_SinkFnServer) error {
return err
}

return nil
}

// receiveRequests receives the requests from the client writes them to the datumStreamCh channel.
func (fs *Service) receiveRequests(stream sinkpb.Sink_SinkFnServer, datumStreamCh chan<- Datum) error {
defer close(datumStreamCh)
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside sink handler: %v %v", r, string(debug.Stack()))
fs.shutdownCh <- struct{}{}
}
}()

for {
datumStreamCh := make(chan Datum)
g, ctx := errgroup.WithContext(ctx)
req, err := stream.Recv()
if err == io.EOF {
log.Printf("end of sink stream")
return err
}
if err != nil {
log.Printf("error receiving from sink stream: %v", err)
return err
}

g.Go(func() error {
defer close(datumStreamCh)
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside sink handler: %v %v", r, string(debug.Stack()))
fs.shutdownCh <- struct{}{}
}
}()

for {
// Receive sink requests from the stream
req, err := stream.Recv()
if err == io.EOF {
log.Printf("end of sink stream")
return err
}
if err != nil {
log.Printf("error receiving from sink stream: %v", err)
return err
}

if req.Status != nil && req.Status.Eot {
// End of transmission, break to start a new sink invocation
break
}

datum := &handlerDatum{
id: req.GetRequest().GetId(),
value: req.GetRequest().GetValue(),
keys: req.GetRequest().GetKeys(),
eventTime: req.GetRequest().GetEventTime().AsTime(),
watermark: req.GetRequest().GetWatermark().AsTime(),
headers: req.GetRequest().GetHeaders(),
}

// Send datum to the channel
datumStreamCh <- datum
}
return nil
})
if req.Status != nil && req.Status.Eot {
break
}

// invoke the sink function, and send the responses back to the client
g.Go(func() error {
responses := fs.Sinker.Sink(ctx, datumStreamCh)
for _, response := range responses {
var status sinkpb.Status
if response.Fallback {
status = sinkpb.Status_FALLBACK
} else if response.Success {
status = sinkpb.Status_SUCCESS
} else {
status = sinkpb.Status_FAILURE
}

sinkResponse := &sinkpb.SinkResponse{
Result: &sinkpb.SinkResponse_Result{
Id: response.ID,
Status: status,
ErrMsg: response.Err,
},
}
if err := stream.Send(sinkResponse); err != nil {
log.Printf("error sending sink response: %v", err)
return err
}
}
return nil
})
datum := &handlerDatum{
id: req.GetRequest().GetId(),
value: req.GetRequest().GetValue(),
keys: req.GetRequest().GetKeys(),
eventTime: req.GetRequest().GetEventTime().AsTime(),
watermark: req.GetRequest().GetWatermark().AsTime(),
headers: req.GetRequest().GetHeaders(),
}

// Wait for the goroutines to finish
err := g.Wait()
if errors.Is(err, io.EOF) {
return nil
datumStreamCh <- datum
}
return nil
}

// processData invokes the sinker to process the data and sends the response back to the client.
func (fs *Service) processData(ctx context.Context, stream sinkpb.Sink_SinkFnServer, datumStreamCh chan Datum) error {
responses := fs.Sinker.Sink(ctx, datumStreamCh)
for _, response := range responses {
var status sinkpb.Status
if response.Fallback {
status = sinkpb.Status_FALLBACK
} else if response.Success {
status = sinkpb.Status_SUCCESS
} else {
status = sinkpb.Status_FAILURE
}
if err != nil {

sinkResponse := &sinkpb.SinkResponse{
Result: &sinkpb.SinkResponse_Result{
Id: response.ID,
Status: status,
ErrMsg: response.Err,
},
}
if err := stream.Send(sinkResponse); err != nil {
log.Printf("error sending sink response: %v", err)
return err
}
}
return nil
}
Loading

0 comments on commit ab1c9b4

Please sign in to comment.