Skip to content

Commit

Permalink
chore: add serving store example, change interface and types (#179)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 authored Feb 23, 2025
1 parent 7a75e8e commit ea9640b
Show file tree
Hide file tree
Showing 34 changed files with 273 additions and 236 deletions.
2 changes: 1 addition & 1 deletion pkg/apis/proto/map/v1/map.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/map/v1/map_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/reduce/v1/reduce.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/reduce/v1/reduce_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

225 changes: 78 additions & 147 deletions pkg/apis/proto/serving/v1/store.pb.go

Large diffs are not rendered by default.

23 changes: 8 additions & 15 deletions pkg/apis/proto/serving/v1/store.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,17 @@ service ServingStore {

// Payload that represent the output that is to be written into to the store.
message Payload {
// ID is the unique id as provided by the user in the original request. If not provided, it will be a system generated
// uuid.
string id = 1;
// Origin is the Vertex that generated this result.
string origin = 1;
// Value is the result of the computation.
bytes value = 2;
}

// PutRequest is the request sent to the Store.
message PutRequest {
// Origin is the Vertex that generated this result.
string origin = 1;
// ID is the unique id as provided by the user in the original request. If not provided, it will be a system generated
// uuid.
string id = 1;
// Payloads are one or more results generated (could be more than one due to flat-map).
repeated Payload payloads = 2;
}
Expand All @@ -64,18 +64,11 @@ message GetRequest {
string id = 1;
}

// OriginalPayload is one of the result generated by the Compute Graph of Numaflow.
message OriginalPayload {
// Origin is the Vertex that generated this result.
string origin = 1;
// Payloads are one or more results generated (could be more than one due to flat-map).
repeated Payload payloads = 2;
}

// GetResponse is the result stored in the Store.
message GetResponse {
// OriginalPayload are one or more results generated (could be more than one due to flat-map).
repeated OriginalPayload payloads = 1;
string id = 1;
// Payloads are one or more results generated (could be more than one due to flat-map).
repeated Payload payloads = 2;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/proto/serving/v1/store_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/sessionreduce/v1/sessionreduce.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/sessionreduce/v1/sessionreduce_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/sideinput/v1/sideinput.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/sideinput/v1/sideinput_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/sink/v1/sink.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/sink/v1/sink_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/source/v1/source.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/source/v1/source_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/sourcetransform/v1/transform.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/sourcetransform/v1/transform_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/batchmapper/examples/batchmap_flatmap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func batchMapFn(_ context.Context, datums <-chan batchmapper.Datum) batchmapper.
msg := d.Value()
_ = d.EventTime() // Event time is available
_ = d.Watermark() // Watermark is available
batchResponse := batchmapper.NewBatchResponse(d.Id())
batchResponse := batchmapper.NewBatchResponse(d.ID())
strs := strings.Split(string(msg), ",")
for _, s := range strs {
batchResponse = batchResponse.Append(batchmapper.NewMessage([]byte(s)))
Expand Down
2 changes: 1 addition & 1 deletion pkg/batchmapper/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Datum interface {
// Headers returns the headers of the message.
Headers() map[string]string
// Id returns the unique ID set for the given message
Id() string
ID() string
// Keys returns the keys associated with a given datum
Keys() []string
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/batchmapper/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type batchResponse struct {
}

// Id returns request ID for the given list of responses
func (m batchResponse) Id() string {
func (m batchResponse) ID() string {
return m.id
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/batchmapper/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestBatchMapServer_Start(t *testing.T) {
var batchMapHandler = BatchMapperFunc(func(ctx context.Context, datums <-chan Datum) BatchResponses {
batchResponses := BatchResponsesBuilder()
for d := range datums {
results := NewBatchResponse(d.Id())
results := NewBatchResponse(d.ID())
results.Append(NewMessage(d.Value()).WithKeys([]string{d.Keys()[0] + "_test"}))
batchResponses.Append(results)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/batchmapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (fs *Service) processData(ctx context.Context, stream mappb.Map_MapFnServer
}
singleRequestResp := &mappb.MapResponse{
Results: elements,
Id: batchResp.Id(),
Id: batchResp.ID(),
}
select {
case <-ctx.Done():
Expand Down
4 changes: 2 additions & 2 deletions pkg/batchmapper/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestService_BatchMapFn(t *testing.T) {
handler: BatchMapperFunc(func(ctx context.Context, datums <-chan Datum) BatchResponses {
batchResponses := BatchResponsesBuilder()
for d := range datums {
results := NewBatchResponse(d.Id())
results := NewBatchResponse(d.ID())
results = results.Append(NewMessage(d.Value()).WithKeys([]string{d.Keys()[0] + "_test"}))
batchResponses = batchResponses.Append(results)
}
Expand Down Expand Up @@ -175,7 +175,7 @@ func TestService_BatchMapFn(t *testing.T) {
handler: BatchMapperFunc(func(ctx context.Context, datums <-chan Datum) BatchResponses {
batchResponses := BatchResponsesBuilder()
for d := range datums {
results := NewBatchResponse(d.Id())
results := NewBatchResponse(d.ID())
results = results.Append(NewMessage(d.Value()).WithKeys([]string{d.Keys()[0] + "_test"}))
batchResponses = batchResponses.Append(results)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/batchmapper/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (h *handlerDatum) Headers() map[string]string {
return h.headers
}

func (h *handlerDatum) Id() string {
func (h *handlerDatum) ID() string {
return h.id
}

Expand Down
20 changes: 20 additions & 0 deletions pkg/servingstore/examples/memory_store/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
####################################################################################################
# base
####################################################################################################
FROM alpine:3.20 AS base
ARG TARGETARCH
RUN apk update && apk upgrade && \
apk add ca-certificates && \
apk --no-cache add tzdata

COPY dist/serving-inmem-store-${TARGETARCH} /bin/serving-inmem-store
RUN chmod +x /bin/serving-inmem-store

####################################################################################################
# flatmap
####################################################################################################
FROM scratch AS memory_store
COPY --from=base /usr/share/zoneinfo /usr/share/zoneinfo
COPY --from=base /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
COPY --from=base /bin/serving-inmem-store /bin/serving-inmem-store
ENTRYPOINT [ "/bin/serving-inmem-store" ]
22 changes: 22 additions & 0 deletions pkg/servingstore/examples/memory_store/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
TAG ?= stable
PUSH ?= false
IMAGE_REGISTRY = quay.io/numaio/numaflow-go/serving-inmem-store:${TAG}
ARCHITECTURES = amd64 arm64

.PHONY: build
build:
for arch in $(ARCHITECTURES); do \
CGO_ENABLED=0 GOOS=linux GOARCH=$${arch} go build -v -o ./dist/serving-inmem-store-$${arch} main.go; \
done

.PHONY: image-push
image-push: build
docker buildx build -t ${IMAGE_REGISTRY} --platform linux/amd64,linux/arm64 --target memory_store . --push

.PHONY: image
image: build
docker build -t ${IMAGE_REGISTRY} --target memory_store .
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi

clean:
-rm -rf ./dist
3 changes: 3 additions & 0 deletions pkg/servingstore/examples/memory_store/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# In-Memory Store

This example demonstrates how to use the in-memory store to store and retrieve data.
18 changes: 18 additions & 0 deletions pkg/servingstore/examples/memory_store/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
module memory_store

go 1.22

toolchain go1.23.1

replace github.com/numaproj/numaflow-go => ../../../..

require github.com/numaproj/numaflow-go v0.9.0

require (
golang.org/x/net v0.29.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/grpc v1.66.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
)
22 changes: 22 additions & 0 deletions pkg/servingstore/examples/memory_store/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/grpc v1.66.0 h1:DibZuoBznOxbDQxRINckZcUvnCEvrW9pcWIE2yF9r1c=
google.golang.org/grpc v1.66.0/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
46 changes: 46 additions & 0 deletions pkg/servingstore/examples/memory_store/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package main

import (
"context"
"log"

"github.com/numaproj/numaflow-go/pkg/servingstore"
)

type InMemoryStore struct {
store map[string][]servingstore.Payload
}

func (i *InMemoryStore) Put(ctx context.Context, putDatum servingstore.PutDatum) {
id := putDatum.ID()
log.Printf("Received Put request for %s", id)
if _, ok := i.store[id]; !ok {
i.store[id] = make([]servingstore.Payload, 0)
}
for _, payload := range putDatum.Payloads() {
i.store[id] = append(i.store[id], servingstore.NewPayload(payload.Origin(), payload.Value()))
}
}

func (i *InMemoryStore) Get(ctx context.Context, getDatum servingstore.GetDatum) servingstore.StoredResult {
id := getDatum.ID()
log.Printf("Received Get request for %s", id)
if data, ok := i.store[id]; ok {
return servingstore.NewStoredResult(id, data)
} else {
return servingstore.NewStoredResult(id, nil)
}
}

func NewInMemoryStore() *InMemoryStore {
return &InMemoryStore{
store: make(map[string][]servingstore.Payload),
}
}

func main() {
err := servingstore.NewServer(NewInMemoryStore()).Start(context.Background())
if err != nil {
log.Panic("Failed to serving store function server: ", err)
}
}
Binary file not shown.
Loading

0 comments on commit ea9640b

Please sign in to comment.