Skip to content

Commit

Permalink
feat: Redis user-defined store implementation (#181)
Browse files Browse the repository at this point in the history
  • Loading branch information
BulkBeing authored Feb 27, 2025
1 parent 4b9eb5b commit da01d9e
Show file tree
Hide file tree
Showing 10 changed files with 227 additions and 58 deletions.
3 changes: 0 additions & 3 deletions pkg/servingstore/examples/memory_store/README.md

This file was deleted.

46 changes: 0 additions & 46 deletions pkg/servingstore/examples/memory_store/main.go

This file was deleted.

Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ RUN apk update && apk upgrade && \
apk add ca-certificates && \
apk --no-cache add tzdata

COPY dist/serving-inmem-store-${TARGETARCH} /bin/serving-inmem-store
RUN chmod +x /bin/serving-inmem-store
COPY dist/serving-redis-store-${TARGETARCH} /bin/serving-redis-store
RUN chmod +x /bin/serving-redis-store

####################################################################################################
# flatmap
####################################################################################################
FROM scratch AS memory_store
FROM scratch AS redis-store
COPY --from=base /usr/share/zoneinfo /usr/share/zoneinfo
COPY --from=base /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
COPY --from=base /bin/serving-inmem-store /bin/serving-inmem-store
ENTRYPOINT [ "/bin/serving-inmem-store" ]
COPY --from=base /bin/serving-redis-store /bin/serving-redis-store
ENTRYPOINT [ "/bin/serving-redis-store" ]
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
TAG ?= stable
PUSH ?= false
IMAGE_REGISTRY = quay.io/numaio/numaflow-go/serving-inmem-store:${TAG}
IMAGE_REGISTRY = quay.io/numaio/numaflow-go/serving-redis-store:${TAG}
ARCHITECTURES = amd64 arm64

.PHONY: build
build:
for arch in $(ARCHITECTURES); do \
CGO_ENABLED=0 GOOS=linux GOARCH=$${arch} go build -v -o ./dist/serving-inmem-store-$${arch} main.go; \
CGO_ENABLED=0 GOOS=linux GOARCH=$${arch} go build -v -o ./dist/serving-redis-store-$${arch} main.go; \
done

.PHONY: image-push
image-push: build
docker buildx build -t ${IMAGE_REGISTRY} --platform linux/amd64,linux/arm64 --target memory_store . --push
docker buildx build -t ${IMAGE_REGISTRY} --platform linux/amd64,linux/arm64 --target redis-store . --push

.PHONY: image
image: build
docker build -t ${IMAGE_REGISTRY} --target memory_store .
docker build -t ${IMAGE_REGISTRY} --target redis-store .
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi

clean:
Expand Down
3 changes: 3 additions & 0 deletions pkg/servingstore/examples/redis-store/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Redis Store

This example demonstrates how to use Redis store and retrieve data.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ replace github.com/numaproj/numaflow-go => ../../../..
require github.com/numaproj/numaflow-go v0.9.0

require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/redis/go-redis/v9 v9.7.1 // indirect
golang.org/x/net v0.29.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.7.1 h1:4LhKRCIduqXqtvCUlaq9c8bdHOkICjDMrr1+Zb3osAc=
github.com/redis/go-redis/v9 v9.7.1/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
Expand Down
123 changes: 123 additions & 0 deletions pkg/servingstore/examples/redis-store/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package main

import (
"context"
"encoding/json"
"fmt"
"log/slog"
"os"
"strconv"
"time"

"github.com/numaproj/numaflow-go/pkg/servingstore"
"github.com/redis/go-redis/v9"
)

const DEFAULT_REDIS_URL = "redis:6379"
const DEFAULT_REDIS_TTL_SECONDS = 7200 // 2 hours

type RedisStore struct {
client *redis.Client
ttl time.Duration
}

type payload struct {
Origin string `json:"origin"`
Value []byte `json:"value"`
}

func encodeServingPayload(data servingstore.Payload) ([]byte, error) {
payloadData := payload{
Origin: data.Origin(),
Value: data.Value(),
}
return json.Marshal(&payloadData)
}

func decodeServingPayload(data string) (payload, error) {
var payloadData payload
if err := json.Unmarshal([]byte(data), &payloadData); err != nil {
return payloadData, fmt.Errorf("unmarshaling payload: %w", err)
}
return payloadData, nil
}

func (rs *RedisStore) Put(ctx context.Context, putDatum servingstore.PutDatum) {
id := putDatum.ID()
slog.Info("Received Put request", "key", id)
payloads := putDatum.Payloads()
encodedPayloads := make([]any, 0, len(payloads))
for _, payload := range payloads {
encoded, err := encodeServingPayload(payload)
if err != nil {
slog.Error("Encoding redis payload", "error", err)
os.Exit(1)
}
encodedPayloads = append(encodedPayloads, encoded)
}
_, err := rs.client.LPush(ctx, id, encodedPayloads...).Result()
if err != nil {
slog.Error("Saving payloads with LPUSH", "key", id, "error", err)
os.Exit(1)
}
if _, err := rs.client.Expire(ctx, id, rs.ttl).Result(); err != nil {
slog.Error("Setting expiry for redis key", "key", id, "error", err)
os.Exit(1)
}
slog.Info("Saved payloads", "key", id, "count", len(encodedPayloads))
}

func (rs *RedisStore) Get(ctx context.Context, getDatum servingstore.GetDatum) servingstore.StoredResult {
id := getDatum.ID()
slog.Info("Received Get request", "key", id)
values, err := rs.client.LRange(ctx, id, 0, -1).Result()
if err != nil {
slog.Error("Retrieving results", "key", id, "error", err)
os.Exit(1)
}
if len(values) == 0 {
slog.Info("Returning empty results", "id", id)
return servingstore.NewStoredResult(id, nil)
}
payloads := make([]servingstore.Payload, 0, len(values))
for _, value := range values {
payload, err := decodeServingPayload(value)
if err != nil {
slog.Error("Decoding redis payload", "error", err)
os.Exit(1)
}
payloads = append(payloads, servingstore.NewPayload(payload.Origin, payload.Value))
}
slog.Info("Returning results", "key", id, "count", len(payloads))
return servingstore.NewStoredResult(id, payloads)
}

func NewRedisStore(addr string, ttl time.Duration) *RedisStore {
rdb := redis.NewClient(&redis.Options{
Addr: addr,
})
return &RedisStore{client: rdb, ttl: ttl}
}

func main() {
redisURL := DEFAULT_REDIS_URL
if addr, exists := os.LookupEnv("REDIS_ADDR"); exists {
redisURL = addr
}
redisTTL := DEFAULT_REDIS_TTL_SECONDS * time.Second
if ttl, exists := os.LookupEnv("REDIS_TTL_SECONDS"); exists {
ttlSecs, err := strconv.ParseInt(ttl, 10, 64)
if err != nil {
slog.Error("Converting value of env variable REDIS_TTL_SECONDS to integer:", "error", err)
} else {
redisTTL = time.Duration(ttlSecs) * time.Second
}
}

slog.Info("Starting Redis serving store", "redis_url", redisURL, "ttl", redisTTL)
err := servingstore.NewServer(NewRedisStore(redisURL, redisTTL)).Start(context.Background())
if err != nil {
slog.Error("Failed to serving store function server", "error", err)
os.Exit(1)
}
}
83 changes: 83 additions & 0 deletions pkg/servingstore/examples/redis-store/redis-minimal.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
---
#
# Redis service
#
apiVersion: v1
kind: Service
metadata:
name: redis
labels:
app: redis
spec:
ports:
- port: 6379
targetPort: 6379
name: client
clusterIP: None
selector:
app: redis
---
#
# Redis configuration file
#
apiVersion: v1
kind: ConfigMap
metadata:
name: redis-config
labels:
app: redis
data:
# maxmemory is set to 100mb to ensure enough storage for running a single e2e test suite.
# a lower number can lead to redis sink write failure.
redis-config: |
maxmemory 100mb
maxmemory-policy allkeys-lru
protected-mode no
---
#
# Redis stateful set
#
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: redis
spec:
serviceName: redis
replicas: 1
minReadySeconds: 10 # by default is 0
selector:
matchLabels:
app: redis # has to match .spec.template.metadata.labels
template:
metadata:
labels:
app: redis
name: redis
spec:
terminationGracePeriodSeconds: 10
containers:
- name: redis
image: redis:7.0.11
ports:
- containerPort: 6379
name: client
command:
- redis-server
- "/redis-master/redis.conf"
env:
- name: MASTER
value: "true"
volumeMounts:
- mountPath: /redis-master-data
name: data
- mountPath: /redis-master
name: config
volumes:
- name: data
emptyDir: {}
- name: config
configMap:
name: redis-config
items:
- key: redis-config
path: redis.conf

0 comments on commit da01d9e

Please sign in to comment.