Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
Sidhant Kohli committed Jul 10, 2024
1 parent 017cf7f commit f4a7c9b
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 14 deletions.
9 changes: 5 additions & 4 deletions pkg/batchmapper/examples/batchmap-flatmap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,20 @@ func batchMapFn(_ context.Context, datums <-chan batchmapper.Datum) batchmapper.
msg := d.Value()
_ = d.EventTime() // Event time is available
_ = d.Watermark() // Watermark is available
results := batchmapper.NewBatchResponse(d.Id())
batchResponse := batchmapper.NewBatchResponse(d.Id())
strs := strings.Split(string(msg), ",")
for _, s := range strs {
results = results.Append(batchmapper.NewMessage([]byte(s)))
batchResponse = batchResponse.Append(batchmapper.NewMessage([]byte(s)))
}
batchResponses = batchResponses.Append(results)

batchResponses = batchResponses.Append(batchResponse)
}
return batchResponses
}

func main() {
err := batchmapper.NewServer(batchmapper.BatchMapperFunc(batchMapFn)).Start(context.Background())
if err != nil {
log.Panic("Failed to start map function server: ", err)
log.Panic("Failed to start batch map function server: ", err)
}
}
6 changes: 3 additions & 3 deletions pkg/batchmapper/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ var (
DROP = fmt.Sprintf("%U__DROP__", '\\') // U+005C__DROP__
)

// Message is used to wrap the data return by Map functions
// Message is used to wrap the data return by Batch Map functions
type Message struct {
value []byte
keys []string
Expand Down Expand Up @@ -94,13 +94,13 @@ func BatchResponsesBuilder() BatchResponses {
return BatchResponses{}
}

// Append appends a Message
// Append appends a batchResponse
func (m BatchResponses) Append(msg batchResponse) BatchResponses {
m = append(m, msg)
return m
}

// Items returns the message list
// Items returns the batchResponse list
func (m BatchResponses) Items() []batchResponse {
return m
}
2 changes: 1 addition & 1 deletion pkg/batchmapper/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (m *server) Start(ctx context.Context) error {
defer log.Println("Successfully stopped the gRPC server")
defer grpcServer.GracefulStop()

// register the map service
// register the batch map service
batchmappb.RegisterBatchMapServer(grpcServer, m.svc)

// start the grpc server
Expand Down
9 changes: 4 additions & 5 deletions pkg/batchmapper/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ func TestService_BatchMapFn(t *testing.T) {
input []*batchmappb.BatchMapRequest
expected []*batchmappb.BatchMapResponse
expectedErr bool
streamErr bool
}{
{
name: "batch_map_stream_fn_forward_msg",
Expand Down Expand Up @@ -188,7 +187,6 @@ func TestService_BatchMapFn(t *testing.T) {
},
},
expectedErr: true,
streamErr: true,
},
}
for _, tt := range tests {
Expand All @@ -206,7 +204,7 @@ func TestService_BatchMapFn(t *testing.T) {
result := make([]*batchmappb.BatchMapResponse, 0)

var udfBatchMapFnStream batchmappb.BatchMap_BatchMapFnServer
if tt.streamErr {
if tt.expectedErr {
udfBatchMapFnStream = NewBatchMapFnServerErrTest(ctx, inputCh, outputCh)
} else {
udfBatchMapFnStream = NewBatchBatchMapStreamFnServerTest(ctx, inputCh, outputCh)
Expand Down Expand Up @@ -236,8 +234,9 @@ func TestService_BatchMapFn(t *testing.T) {
close(inputCh)
wg.Wait()

if err != nil {
assert.True(t, tt.expectedErr, "BatchMapFn() error = %v, expectedErr %v", err, tt.expectedErr)
if tt.expectedErr {
// assert err is not nil
assert.NotNil(t, err)
return
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sourcetransformer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (m *server) Start(ctx context.Context) error {
defer log.Println("Successfully stopped the gRPC server")
defer grpcServer.GracefulStop()

// register the map service
// register the source transformer service
v1.RegisterSourceTransformServer(grpcServer, m.svc)

// start the grpc server
Expand Down

0 comments on commit f4a7c9b

Please sign in to comment.