Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid committed Jun 28, 2024
1 parent 46a2caf commit 0b1f385
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 253 deletions.
256 changes: 54 additions & 202 deletions pkg/apis/proto/map/v1/map.pb.go

Large diffs are not rendered by default.

26 changes: 12 additions & 14 deletions pkg/apis/proto/map/v1/map.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@ service Map {
// IsReady is the heartbeat endpoint for gRPC.
rpc IsReady(google.protobuf.Empty) returns (ReadyResponse);

// BatchMapFn applies a
rpc BatchMapFn(stream MapRequest) returns (stream BatchMapResponse);
// BatchMapFn is a bi-directional streaming rpc which applies a
// batchMap function on each element of the stream and then returns streams
// back MapResponse elements.
// TODO(map-batch): in the target state when we move the current
// unary implementation to bi-di as well, we can rename this and
// use a single rpc for both.
rpc BatchMapFn(stream MapRequest) returns (stream MapResponse);
}

/**
Expand All @@ -33,25 +38,18 @@ message MapRequest {
/**
* MapResponse represents a response element.
*/
// TODO(map-batch) - currently this is used by both batch map and unary map.
// Do we want to have a separate response struct for batch map responses
// which have only one element instead of a list of responses.
// In that case we need a different mechanism to indicate that all the responses for a given request
// have been completed.
message MapResponse {
message Result {
repeated string keys = 1;
bytes value = 2;
repeated string tags = 3;
}
repeated Result results = 1;
}

/**
* MapResponse represents a response element.
*/
message BatchMapResponse {
message Result {
repeated string keys = 1;
bytes value = 2;
repeated string tags = 3;
}
repeated Result results = 1;
string id = 2;
}

Expand Down
24 changes: 17 additions & 7 deletions pkg/apis/proto/map/v1/map_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/mapper/examples/batchmap/Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
TAG ?= mapbatchv3
TAG ?= mapbatchv5
PUSH ?= true

.PHONY: build
Expand Down
7 changes: 2 additions & 5 deletions pkg/mapper/examples/batchmap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"context"
"log"
"strings"

"github.com/numaproj/numaflow-go/pkg/mapper"
)
Expand All @@ -15,11 +14,9 @@ func mapFn(_ context.Context, datums []mapper.Datum) mapper.BatchResponses {
msg := d.Value()
_ = d.EventTime() // Event time is available
_ = d.Watermark() // Watermark is available
// Split the msg into an array with comma.
strs := strings.Split(string(msg), ",")
results := mapper.NewBatchResponse(d.Id())
for _, s := range strs {
results = results.Append(mapper.NewMessage([]byte(s)))
for i := 0; i < 2; i++ {
results = results.Append(mapper.NewMessage(msg))
}
batchResponses = batchResponses.Append(results)
}
Expand Down
11 changes: 5 additions & 6 deletions pkg/mapper/examples/batchmap/pipe.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ spec:
limits:
readBatchSize: 500
vertices:
# - name: in d
# source:
# http: { }
# - name: in
# source:
# http: { }
- name: in
scale:
min: 1
source:
# A self data generating source
generator:
rpu: 10
rpu: 1000
duration: 1s
- name: batchmap
scale:
Expand All @@ -31,7 +31,7 @@ spec:
udf:
container:
# image: "quay.io/numaio/numaflow-go/map-flatmap:stable"
image: "quay.io/kohlisid/numaflow-go/map-flatmap:mapbatchv3"
image: "quay.io/kohlisid/numaflow-go/map-flatmap:mapbatchv5"
imagePullPolicy: Always
containerTemplate:
resources:
Expand All @@ -49,7 +49,6 @@ spec:
- name: out
scale:
min: 1
max: 1
sink:
# A simple log printing sink
log: {}
Expand Down
15 changes: 9 additions & 6 deletions pkg/mapper/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ type Datum interface {
Watermark() time.Time
// Headers returns the headers of the message.
Headers() map[string]string

// Id returns the unique ID set for the given message
Id() string
}

// Mapper is the interface of map function implementation.
// Mapper is the interface of map function implementation. This is the traditional interface
// where a single message is passed as input and the responses corresponding to that request
// are returned.
type Mapper interface {
// Map is the function to process each coming message.
Map(ctx context.Context, keys []string, datum Datum) Messages
Expand All @@ -33,16 +35,17 @@ func (mf MapperFunc) Map(ctx context.Context, keys []string, datum Datum) Messag
return mf(ctx, keys, datum)
}

// BatchMapper
// BatchMapper is the interface for a Batch Map mode where the user is given a list
// of messages, and they return the consolidated response for all of them together.
type BatchMapper interface {
// BatchMap
// BatchMap is the function which processes a list of input messages
BatchMap(ctx context.Context, datums []Datum) BatchResponses
}

// BatchMapperFunc
// BatchMapperFunc is a utility type used to convert a batch map function to a BatchMapper.
type BatchMapperFunc func(ctx context.Context, datums []Datum) BatchResponses

// BatchMap implements the function of BatchMap function.
// BatchMap implements the functionality of BatchMap function.
func (mf BatchMapperFunc) BatchMap(ctx context.Context, datums []Datum) BatchResponses {
return mf(ctx, datums)
}
25 changes: 23 additions & 2 deletions pkg/mapper/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ var (
DROP = fmt.Sprintf("%U__DROP__", '\\') // U+005C__DROP__
)

// ===========================================================================================
// Common structures used in map functions
// ===========================================================================================

// Message is used to wrap the data return by Map functions
type Message struct {
value []byte
Expand Down Expand Up @@ -51,6 +55,10 @@ func (m Message) Tags() []string {
return m.tags
}

// ===========================================================================================
// Utility structures for unary map use case
// ===========================================================================================

type Messages []Message

// MessagesBuilder returns an empty instance of Messages
Expand All @@ -69,28 +77,41 @@ func (m Messages) Items() []Message {
return m
}

// ===========================================================================================
// Utility structures for batch map mode
// ===========================================================================================

// batchResponse is used to wrap the data return by batch map function along
// with the ID of the corresponding request
type batchResponse struct {
id string
messages []Message
}

// Id returns
// Id returns request ID for the given list of responses
func (m batchResponse) Id() string {
return m.id
}

// Append appends a Message to the messages list of a batchResponse
// object and then returns the updated object.
func (m batchResponse) Append(msg Message) batchResponse {
m.messages = append(m.messages, msg)
return m
}

// Items returns the message list
// Items returns the message list for a batchResponse
func (m batchResponse) Items() []Message {
return m.messages
}

// BatchResponses is a list of batchResponse which signify the consolidated
// results for a batch of input messages.
type BatchResponses []batchResponse

// NewBatchResponse is a utility function used to create a new batchResponse object
// Specifying an id is a mandatory requirement, as it is required to reference the
// responses back to a request.
func NewBatchResponse(id string) batchResponse {
return batchResponse{
id: id,
Expand Down
5 changes: 5 additions & 0 deletions pkg/mapper/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ type server struct {
}

// NewServer creates a new map server.
// TODO(map-batch): We call the unary map server as -> NewServer to keep backward compatibility
// Otherwise all users will have to update their current code when this change is released.
func NewServer(m Mapper, inputOptions ...Option) numaflow.Server {
//
opts := defaultOptions()
for _, inputOption := range inputOptions {
inputOption(opts)
Expand All @@ -32,6 +35,8 @@ func NewServer(m Mapper, inputOptions ...Option) numaflow.Server {
}

// NewBatchMapServer creates a new batch map server.
// TODO(map-batch): as this would be a streaming server should we see if there are some options (like maxMessageSize)
// which are different than unary server which are optimal for this use case.
func NewBatchMapServer(m BatchMapper, inputOptions ...Option) numaflow.Server {
opts := defaultOptions()
for _, inputOption := range inputOptions {
Expand Down
28 changes: 18 additions & 10 deletions pkg/mapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,45 +49,53 @@ func (fs *Service) MapFn(ctx context.Context, d *mappb.MapRequest) (*mappb.MapRe
return datumList, nil
}

// BatchMapFn applies a user defined function to a stream of request element and streams back the responses for them.
func (fs *Service) BatchMapFn(stream mappb.Map_BatchMapFnServer) error {
ctx := stream.Context()

// As the BatchMap interface expects a list of request elements
// we read all the requests coming on the stream and keep appending them together
// and then finally send the array for processing.
datums := make([]Datum, 0)
for {
d, err := stream.Recv()
if err == io.EOF {
log.Println("MYDEBUG: Got EOF here", err)
break
}
if err != nil {
log.Println("MYDEBUG: Got error here", err)
log.Println("BatchMapFn: Got an error while recv() on stream", err)
return err
}
var hd = NewHandlerDatum(d.GetValue(), d.GetEventTime().AsTime(), d.GetWatermark().AsTime(), d.GetHeaders(), d.GetId())
datums = append(datums, hd)
}

index := 0
// Apply the user BatchMap implementation function
responses := fs.BatchMapper.BatchMap(ctx, datums)

// iterate over the responses received and covert to the required proto format
for _, batchResp := range responses.Items() {
var elements []*mappb.BatchMapResponse_Result
var elements []*mappb.MapResponse_Result
for _, resp := range batchResp.Items() {
elements = append(elements, &mappb.BatchMapResponse_Result{
elements = append(elements, &mappb.MapResponse_Result{
Keys: resp.Keys(),
Value: resp.Value(),
Tags: resp.Tags(),
})
index += 1
}
datumList := &mappb.BatchMapResponse{
singleRequestResp := &mappb.MapResponse{
Results: elements,
Id: batchResp.Id(),
}
err := stream.Send(datumList)
// We stream back the result for a single request ID
// this would contain all the responses for that request.
err := stream.Send(singleRequestResp)
if err != nil {
log.Println("MYDEBUG: Got error here - 2", err)
log.Println("BatchMapFn: Got an error while Send() on stream", err)
return err
}
}
log.Println("MYDEBUG: SENDING BACK FROM UDF ", index)
// Once all responses are sent we can return, this would indicate the end of the rpc and
// send an EOF to the client on the stream
return nil
}

0 comments on commit 0b1f385

Please sign in to comment.