Skip to content

Commit

Permalink
update comments
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid committed Jul 1, 2024
1 parent 0b1f385 commit 808e724
Show file tree
Hide file tree
Showing 13 changed files with 128 additions and 84 deletions.
31 changes: 17 additions & 14 deletions pkg/apis/proto/map/v1/map.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions pkg/apis/proto/map/v1/map.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ service Map {
// IsReady is the heartbeat endpoint for gRPC.
rpc IsReady(google.protobuf.Empty) returns (ReadyResponse);

// BatchMapFn is a bi-directional streaming rpc which applies a
// batchMap function on each element of the stream and then returns streams
// MapStreamFn is a bi-directional streaming rpc which applies a
// Map function on each element of the stream and then returns streams
// back MapResponse elements.
// TODO(map-batch): in the target state when we move the current
// unary implementation to bi-di as well, we can rename this and
// use a single rpc for both.
rpc BatchMapFn(stream MapRequest) returns (stream MapResponse);
rpc MapStreamFn(stream MapRequest) returns (stream MapResponse);
}

/**
Expand Down
46 changes: 23 additions & 23 deletions pkg/apis/proto/map/v1/map_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 20 additions & 20 deletions pkg/apis/proto/map/v1/mapmock/mapmock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/mapper/examples/batchmap/Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
TAG ?= mapbatchv5
TAG ?= mapbatchemptyv1
PUSH ?= true

.PHONY: build
Expand Down
10 changes: 5 additions & 5 deletions pkg/mapper/examples/batchmap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,20 @@ func mapFn(_ context.Context, datums []mapper.Datum) mapper.BatchResponses {
batchResponses := mapper.BatchResponsesBuilder()
log.Println("MYDEBUG: length of input ", len(datums))
for _, d := range datums {
msg := d.Value()
//msg := d.Value()
_ = d.EventTime() // Event time is available
_ = d.Watermark() // Watermark is available
results := mapper.NewBatchResponse(d.Id())
for i := 0; i < 2; i++ {
results = results.Append(mapper.NewMessage(msg))
}
//for i := 0; i < 2; i++ {
// results = results.Append(mapper.NewMessage(msg))
//}
batchResponses = batchResponses.Append(results)
}
return batchResponses
}

func main() {
err := mapper.NewBatchMapServer(mapper.BatchMapperFunc(mapFn)).Start(context.Background())
err := mapper.NewBatchServer(mapper.BatchMapperFunc(mapFn)).Start(context.Background())
if err != nil {
log.Panic("Failed to start map function server: ", err)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/mapper/examples/batchmap/pipe.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ kind: Pipeline
metadata:
name: simple-pipeline
spec:
watermark:
disabled: true # Optional, defaults to false.
# watermark:
# disabled: true # Optional, defaults to false.
limits:
readBatchSize: 500
vertices:
Expand All @@ -31,7 +31,7 @@ spec:
udf:
container:
# image: "quay.io/numaio/numaflow-go/map-flatmap:stable"
image: "quay.io/kohlisid/numaflow-go/map-flatmap:mapbatchv5"
image: "quay.io/kohlisid/numaflow-go/map-flatmap:mapbatchemptyv1"
imagePullPolicy: Always
containerTemplate:
resources:
Expand Down
2 changes: 2 additions & 0 deletions pkg/mapper/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ type Datum interface {
Headers() map[string]string
// Id returns the unique ID set for the given message
Id() string
// Keys returns the keys associated with a given datum
Keys() []string
}

// Mapper is the interface of map function implementation. This is the traditional interface
Expand Down
6 changes: 0 additions & 6 deletions pkg/mapper/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ var (
DROP = fmt.Sprintf("%U__DROP__", '\\') // U+005C__DROP__
)

// ===========================================================================================
// Common structures used in map functions
// ===========================================================================================

// Message is used to wrap the data return by Map functions
type Message struct {
Expand Down Expand Up @@ -55,9 +53,7 @@ func (m Message) Tags() []string {
return m.tags
}

// ===========================================================================================
// Utility structures for unary map use case
// ===========================================================================================

type Messages []Message

Expand All @@ -77,9 +73,7 @@ func (m Messages) Items() []Message {
return m
}

// ===========================================================================================
// Utility structures for batch map mode
// ===========================================================================================

// batchResponse is used to wrap the data return by batch map function along
// with the ID of the corresponding request
Expand Down
4 changes: 2 additions & 2 deletions pkg/mapper/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ func NewServer(m Mapper, inputOptions ...Option) numaflow.Server {
return s
}

// NewBatchMapServer creates a new batch map server.
// NewBatchServer creates a new batch map server.
// TODO(map-batch): as this would be a streaming server should we see if there are some options (like maxMessageSize)
// which are different than unary server which are optimal for this use case.
func NewBatchMapServer(m BatchMapper, inputOptions ...Option) numaflow.Server {
func NewBatchServer(m BatchMapper, inputOptions ...Option) numaflow.Server {
opts := defaultOptions()
for _, inputOption := range inputOptions {
inputOption(opts)
Expand Down
28 changes: 28 additions & 0 deletions pkg/mapper/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,31 @@ func TestMapServer_Start(t *testing.T) {
err := NewServer(mapHandler, WithSockAddr(socketFile.Name()), WithServerInfoFilePath(serverInfoFile.Name())).Start(ctx)
assert.NoError(t, err)
}

func TestBatchMapServer_Start(t *testing.T) {
socketFile, _ := os.CreateTemp("/tmp", "numaflow-test.sock")
defer func() {
_ = os.RemoveAll(socketFile.Name())
}()

serverInfoFile, _ := os.CreateTemp("/tmp", "numaflow-test-info")
defer func() {
_ = os.RemoveAll(serverInfoFile.Name())
}()

var mapHandler = BatchMapperFunc(func(ctx context.Context, datums []Datum) BatchResponses {
batchResponses := BatchResponsesBuilder()
for _, d := range datums {
results := NewBatchResponse(d.Id())
results.Append(NewMessage(d.Value()).WithKeys([]string{d.Keys()[0] + "_test"}))
batchResponses.Append(results)
}

return batchResponses
})
// note: using actual uds connection
ctx, cancel := context.WithTimeout(context.Background(), 6*time.Second)
defer cancel()
err := NewBatchServer(mapHandler, WithSockAddr(socketFile.Name()), WithServerInfoFilePath(serverInfoFile.Name())).Start(ctx)
assert.NoError(t, err)
}
Loading

0 comments on commit 808e724

Please sign in to comment.