diff --git a/pkg/servingstore/examples/memory_store/README.md b/pkg/servingstore/examples/memory_store/README.md deleted file mode 100644 index b5702dec..00000000 --- a/pkg/servingstore/examples/memory_store/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# In-Memory Store - -This example demonstrates how to use the in-memory store to store and retrieve data. diff --git a/pkg/servingstore/examples/memory_store/main.go b/pkg/servingstore/examples/memory_store/main.go deleted file mode 100644 index 55d3ab17..00000000 --- a/pkg/servingstore/examples/memory_store/main.go +++ /dev/null @@ -1,46 +0,0 @@ -package main - -import ( - "context" - "log" - - "github.com/numaproj/numaflow-go/pkg/servingstore" -) - -type InMemoryStore struct { - store map[string][]servingstore.Payload -} - -func (i *InMemoryStore) Put(ctx context.Context, putDatum servingstore.PutDatum) { - id := putDatum.ID() - log.Printf("Received Put request for %s", id) - if _, ok := i.store[id]; !ok { - i.store[id] = make([]servingstore.Payload, 0) - } - for _, payload := range putDatum.Payloads() { - i.store[id] = append(i.store[id], servingstore.NewPayload(payload.Origin(), payload.Value())) - } -} - -func (i *InMemoryStore) Get(ctx context.Context, getDatum servingstore.GetDatum) servingstore.StoredResult { - id := getDatum.ID() - log.Printf("Received Get request for %s", id) - if data, ok := i.store[id]; ok { - return servingstore.NewStoredResult(id, data) - } else { - return servingstore.NewStoredResult(id, nil) - } -} - -func NewInMemoryStore() *InMemoryStore { - return &InMemoryStore{ - store: make(map[string][]servingstore.Payload), - } -} - -func main() { - err := servingstore.NewServer(NewInMemoryStore()).Start(context.Background()) - if err != nil { - log.Panic("Failed to serving store function server: ", err) - } -} diff --git a/pkg/servingstore/examples/memory_store/memory_store b/pkg/servingstore/examples/memory_store/memory_store deleted file mode 100755 index 421bac9c..00000000 Binary files a/pkg/servingstore/examples/memory_store/memory_store and /dev/null differ diff --git a/pkg/servingstore/examples/memory_store/Dockerfile b/pkg/servingstore/examples/redis-store/Dockerfile similarity index 75% rename from pkg/servingstore/examples/memory_store/Dockerfile rename to pkg/servingstore/examples/redis-store/Dockerfile index 1f6d62e7..6fc61738 100644 --- a/pkg/servingstore/examples/memory_store/Dockerfile +++ b/pkg/servingstore/examples/redis-store/Dockerfile @@ -7,14 +7,14 @@ RUN apk update && apk upgrade && \ apk add ca-certificates && \ apk --no-cache add tzdata -COPY dist/serving-inmem-store-${TARGETARCH} /bin/serving-inmem-store -RUN chmod +x /bin/serving-inmem-store +COPY dist/serving-redis-store-${TARGETARCH} /bin/serving-redis-store +RUN chmod +x /bin/serving-redis-store #################################################################################################### # flatmap #################################################################################################### -FROM scratch AS memory_store +FROM scratch AS redis-store COPY --from=base /usr/share/zoneinfo /usr/share/zoneinfo COPY --from=base /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt -COPY --from=base /bin/serving-inmem-store /bin/serving-inmem-store -ENTRYPOINT [ "/bin/serving-inmem-store" ] +COPY --from=base /bin/serving-redis-store /bin/serving-redis-store +ENTRYPOINT [ "/bin/serving-redis-store" ] diff --git a/pkg/servingstore/examples/memory_store/Makefile b/pkg/servingstore/examples/redis-store/Makefile similarity index 66% rename from pkg/servingstore/examples/memory_store/Makefile rename to pkg/servingstore/examples/redis-store/Makefile index 22465057..632d2042 100644 --- a/pkg/servingstore/examples/memory_store/Makefile +++ b/pkg/servingstore/examples/redis-store/Makefile @@ -1,21 +1,21 @@ TAG ?= stable PUSH ?= false -IMAGE_REGISTRY = quay.io/numaio/numaflow-go/serving-inmem-store:${TAG} +IMAGE_REGISTRY = quay.io/numaio/numaflow-go/serving-redis-store:${TAG} ARCHITECTURES = amd64 arm64 .PHONY: build build: for arch in $(ARCHITECTURES); do \ - CGO_ENABLED=0 GOOS=linux GOARCH=$${arch} go build -v -o ./dist/serving-inmem-store-$${arch} main.go; \ + CGO_ENABLED=0 GOOS=linux GOARCH=$${arch} go build -v -o ./dist/serving-redis-store-$${arch} main.go; \ done .PHONY: image-push image-push: build - docker buildx build -t ${IMAGE_REGISTRY} --platform linux/amd64,linux/arm64 --target memory_store . --push + docker buildx build -t ${IMAGE_REGISTRY} --platform linux/amd64,linux/arm64 --target redis-store . --push .PHONY: image image: build - docker build -t ${IMAGE_REGISTRY} --target memory_store . + docker build -t ${IMAGE_REGISTRY} --target redis-store . @if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi clean: diff --git a/pkg/servingstore/examples/redis-store/README.md b/pkg/servingstore/examples/redis-store/README.md new file mode 100644 index 00000000..244920f2 --- /dev/null +++ b/pkg/servingstore/examples/redis-store/README.md @@ -0,0 +1,3 @@ +# Redis Store + +This example demonstrates how to use Redis store and retrieve data. diff --git a/pkg/servingstore/examples/memory_store/go.mod b/pkg/servingstore/examples/redis-store/go.mod similarity index 72% rename from pkg/servingstore/examples/memory_store/go.mod rename to pkg/servingstore/examples/redis-store/go.mod index e5e15f3a..91292344 100644 --- a/pkg/servingstore/examples/memory_store/go.mod +++ b/pkg/servingstore/examples/redis-store/go.mod @@ -9,6 +9,9 @@ replace github.com/numaproj/numaflow-go => ../../../.. require github.com/numaproj/numaflow-go v0.9.0 require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/redis/go-redis/v9 v9.7.1 // indirect golang.org/x/net v0.29.0 // indirect golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.18.0 // indirect diff --git a/pkg/servingstore/examples/memory_store/go.sum b/pkg/servingstore/examples/redis-store/go.sum similarity index 76% rename from pkg/servingstore/examples/memory_store/go.sum rename to pkg/servingstore/examples/redis-store/go.sum index 09e06a2c..ac1a75ce 100644 --- a/pkg/servingstore/examples/memory_store/go.sum +++ b/pkg/servingstore/examples/redis-store/go.sum @@ -1,9 +1,15 @@ +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/redis/go-redis/v9 v9.7.1 h1:4LhKRCIduqXqtvCUlaq9c8bdHOkICjDMrr1+Zb3osAc= +github.com/redis/go-redis/v9 v9.7.1/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= diff --git a/pkg/servingstore/examples/redis-store/main.go b/pkg/servingstore/examples/redis-store/main.go new file mode 100644 index 00000000..ca1ed0e3 --- /dev/null +++ b/pkg/servingstore/examples/redis-store/main.go @@ -0,0 +1,123 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "os" + "strconv" + "time" + + "github.com/numaproj/numaflow-go/pkg/servingstore" + "github.com/redis/go-redis/v9" +) + +const DEFAULT_REDIS_URL = "redis:6379" +const DEFAULT_REDIS_TTL_SECONDS = 7200 // 2 hours + +type RedisStore struct { + client *redis.Client + ttl time.Duration +} + +type payload struct { + Origin string `json:"origin"` + Value []byte `json:"value"` +} + +func encodeServingPayload(data servingstore.Payload) ([]byte, error) { + payloadData := payload{ + Origin: data.Origin(), + Value: data.Value(), + } + return json.Marshal(&payloadData) +} + +func decodeServingPayload(data string) (payload, error) { + var payloadData payload + if err := json.Unmarshal([]byte(data), &payloadData); err != nil { + return payloadData, fmt.Errorf("unmarshaling payload: %w", err) + } + return payloadData, nil +} + +func (rs *RedisStore) Put(ctx context.Context, putDatum servingstore.PutDatum) { + id := putDatum.ID() + slog.Info("Received Put request", "key", id) + payloads := putDatum.Payloads() + encodedPayloads := make([]any, 0, len(payloads)) + for _, payload := range payloads { + encoded, err := encodeServingPayload(payload) + if err != nil { + slog.Error("Encoding redis payload", "error", err) + os.Exit(1) + } + encodedPayloads = append(encodedPayloads, encoded) + } + _, err := rs.client.LPush(ctx, id, encodedPayloads...).Result() + if err != nil { + slog.Error("Saving payloads with LPUSH", "key", id, "error", err) + os.Exit(1) + } + if _, err := rs.client.Expire(ctx, id, rs.ttl).Result(); err != nil { + slog.Error("Setting expiry for redis key", "key", id, "error", err) + os.Exit(1) + } + slog.Info("Saved payloads", "key", id, "count", len(encodedPayloads)) +} + +func (rs *RedisStore) Get(ctx context.Context, getDatum servingstore.GetDatum) servingstore.StoredResult { + id := getDatum.ID() + slog.Info("Received Get request", "key", id) + values, err := rs.client.LRange(ctx, id, 0, -1).Result() + if err != nil { + slog.Error("Retrieving results", "key", id, "error", err) + os.Exit(1) + } + if len(values) == 0 { + slog.Info("Returning empty results", "id", id) + return servingstore.NewStoredResult(id, nil) + } + payloads := make([]servingstore.Payload, 0, len(values)) + for _, value := range values { + payload, err := decodeServingPayload(value) + if err != nil { + slog.Error("Decoding redis payload", "error", err) + os.Exit(1) + } + payloads = append(payloads, servingstore.NewPayload(payload.Origin, payload.Value)) + } + slog.Info("Returning results", "key", id, "count", len(payloads)) + return servingstore.NewStoredResult(id, payloads) +} + +func NewRedisStore(addr string, ttl time.Duration) *RedisStore { + rdb := redis.NewClient(&redis.Options{ + Addr: addr, + }) + return &RedisStore{client: rdb, ttl: ttl} +} + +func main() { + redisURL := DEFAULT_REDIS_URL + if addr, exists := os.LookupEnv("REDIS_ADDR"); exists { + redisURL = addr + } + redisTTL := DEFAULT_REDIS_TTL_SECONDS * time.Second + if ttl, exists := os.LookupEnv("REDIS_TTL_SECONDS"); exists { + ttlSecs, err := strconv.ParseInt(ttl, 10, 64) + if err != nil { + slog.Error("Converting value of env variable REDIS_TTL_SECONDS to integer:", "error", err) + } else { + redisTTL = time.Duration(ttlSecs) * time.Second + } + } + + slog.Info("Starting Redis serving store", "redis_url", redisURL, "ttl", redisTTL) + err := servingstore.NewServer(NewRedisStore(redisURL, redisTTL)).Start(context.Background()) + if err != nil { + slog.Error("Failed to serving store function server", "error", err) + os.Exit(1) + } +} diff --git a/pkg/servingstore/examples/redis-store/redis-minimal.yaml b/pkg/servingstore/examples/redis-store/redis-minimal.yaml new file mode 100644 index 00000000..a7ea1154 --- /dev/null +++ b/pkg/servingstore/examples/redis-store/redis-minimal.yaml @@ -0,0 +1,83 @@ +--- +# +# Redis service +# +apiVersion: v1 +kind: Service +metadata: + name: redis + labels: + app: redis +spec: + ports: + - port: 6379 + targetPort: 6379 + name: client + clusterIP: None + selector: + app: redis +--- +# +# Redis configuration file +# +apiVersion: v1 +kind: ConfigMap +metadata: + name: redis-config + labels: + app: redis +data: + # maxmemory is set to 100mb to ensure enough storage for running a single e2e test suite. + # a lower number can lead to redis sink write failure. + redis-config: | + maxmemory 100mb + maxmemory-policy allkeys-lru + protected-mode no +--- +# +# Redis stateful set +# +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: redis +spec: + serviceName: redis + replicas: 1 + minReadySeconds: 10 # by default is 0 + selector: + matchLabels: + app: redis # has to match .spec.template.metadata.labels + template: + metadata: + labels: + app: redis + name: redis + spec: + terminationGracePeriodSeconds: 10 + containers: + - name: redis + image: redis:7.0.11 + ports: + - containerPort: 6379 + name: client + command: + - redis-server + - "/redis-master/redis.conf" + env: + - name: MASTER + value: "true" + volumeMounts: + - mountPath: /redis-master-data + name: data + - mountPath: /redis-master + name: config + volumes: + - name: data + emptyDir: {} + - name: config + configMap: + name: redis-config + items: + - key: redis-config + path: redis.conf