diff --git a/pkg/batchmapper/examples/batchmap-flatmap/go.mod b/pkg/batchmapper/examples/batchmap-flatmap/go.mod index 88f3f383..0ff2c778 100644 --- a/pkg/batchmapper/examples/batchmap-flatmap/go.mod +++ b/pkg/batchmapper/examples/batchmap-flatmap/go.mod @@ -1,10 +1,10 @@ module flatmap -go 1.20 +go 1.22 replace github.com/numaproj/numaflow-go => ../../../.. -require github.com/numaproj/numaflow-go v0.7.0-rc2 +require github.com/numaproj/numaflow-go v0.8.0 require ( github.com/golang/protobuf v1.5.3 // indirect diff --git a/pkg/batchmapper/examples/batchmap-flatmap/pipe.yaml b/pkg/batchmapper/examples/batchmap-flatmap/pipeline.yaml similarity index 100% rename from pkg/batchmapper/examples/batchmap-flatmap/pipe.yaml rename to pkg/batchmapper/examples/batchmap-flatmap/pipeline.yaml diff --git a/pkg/batchmapper/server.go b/pkg/batchmapper/server.go index 78c94764..2cf90a2e 100644 --- a/pkg/batchmapper/server.go +++ b/pkg/batchmapper/server.go @@ -19,8 +19,6 @@ type server struct { } // NewServer creates a new batch map server. -// TODO(map-batch): as this would be a streaming server should we see if there are some options (like maxMessageSize) -// which are different than unary server which are optimal for this use case. func NewServer(m BatchMapper, inputOptions ...Option) numaflow.Server { opts := defaultOptions() for _, inputOption := range inputOptions { diff --git a/pkg/batchmapper/service.go b/pkg/batchmapper/service.go index 721b4448..685f3c04 100644 --- a/pkg/batchmapper/service.go +++ b/pkg/batchmapper/service.go @@ -2,7 +2,6 @@ package batchmapper import ( "context" - "fmt" "io" "log" @@ -58,11 +57,9 @@ func (fs *Service) BatchMapFn(stream batchmappb.BatchMap_BatchMapFnServer) error // If the number of responses received does not align with the request batch size, // we will not be able to process the data correctly. // This should be marked as an error and shown to the user. - // TODO(map-batch): We could potentially panic here as well if len(responses.Items()) != len(datums) { errMsg := "batchMapFn: mismatch between length of batch requests and responses" - log.Println(errMsg) - return fmt.Errorf(errMsg) + log.Panic(errMsg) } // iterate over the responses received and covert to the required proto format