Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/redis-sink/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target/
11 changes: 11 additions & 0 deletions examples/redis-sink/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "redis-sink"
version = "0.1.0"
edition.workspace = true
rust-version.workspace = true

[dependencies]
tonic.workspace = true
tokio.workspace = true
numaflow = { path = "../../numaflow" }
redis = { version = "1.0.0", features = ["tokio-comp", "aio"] }
20 changes: 20 additions & 0 deletions examples/redis-sink/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
FROM rust:1.85-bullseye AS build

RUN apt-get update
RUN apt-get install protobuf-compiler -y

WORKDIR /numaflow-rs
COPY ./ ./
WORKDIR /numaflow-rs/examples/redis-sink

# build for release
RUN cargo build --release

# our final base
FROM debian:bullseye AS redis-sink

# copy the build artifact from the build stage
COPY --from=build /numaflow-rs/target/release/redis-sink .

# set the startup command to run your binary
CMD ["./redis-sink"]
16 changes: 16 additions & 0 deletions examples/redis-sink/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
TAG ?= stable
PUSH ?= false
IMAGE_REGISTRY = quay.io/numaio/numaflow-rs/redis-sink:${TAG}
DOCKER_FILE_PATH = examples/redis-sink/Dockerfile

.PHONY: update
update:
cargo check
cargo update

.PHONY: image
image: update
cd ../../ && docker build \
-f ${DOCKER_FILE_PATH} \
-t ${IMAGE_REGISTRY} . --load
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi
11 changes: 11 additions & 0 deletions examples/redis-sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Redis E2E Test Sink
A User Defined Sink using redis hashes to store messages.
The hash key is set by an environment variable `SINK_HASH_KEY` under the sink container spec.

For each message received, the sink will store the message in the hash with the key being the payload of the message
and the value being the no. of occurrences of that payload so far.

The environment variable `CHECK_ORDER` is used to determine whether to check the order of the messages based one event time.
The environment variable `MESSAGE_COUNT` is used to determine how many subsequent number of messages at a time to check the order of.

This sink is used by Numaflow E2E testing.
144 changes: 144 additions & 0 deletions examples/redis-sink/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
use numaflow::sink::{self, Response, SinkRequest, Sinker};
use redis::AsyncCommands;
use std::env;
use tokio::sync::Mutex;

/// RedisTestSink is a sink that writes messages to Redis hashes.
/// Created for numaflow e2e tests.
struct RedisTestSink {
/// Redis hash key to store messages. This is set by an environment variable `SINK_HASH_KEY`
/// under the sink container spec.
hash_key: String,
/// Used to determine how many subsequent number of messages at a time to check the order of.
/// This is set by an environment variable `MESSAGE_COUNT`
message_count: usize,
/// Used to collect `message_count` number of messages, check whether they all arrived in order
/// and increment the count for the order result in Redis
inflight_messages: Mutex<Vec<SinkRequest>>,
client: redis::Client,
/// If true, checks the order of messages based on event time.
/// This is set by an environment variable `CHECK_ORDER`
check_order: bool,
}

impl RedisTestSink {
/// Creates a new instance of RedisTestSink with a Redis client.
fn new() -> Self {
let client =
redis::Client::open("redis://redis:6379").expect("Failed to create Redis client");

let hash_key =
env::var("SINK_HASH_KEY").expect("SINK_HASH_KEY environment variable is not set");

let message_count: usize = env::var("MESSAGE_COUNT")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(0);

let check_order: bool = env::var("CHECK_ORDER")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(false);

RedisTestSink {
client,
hash_key,
message_count,
inflight_messages: Mutex::new(Vec::with_capacity(message_count)),
check_order,
}
}
}

#[tonic::async_trait]
impl Sinker for RedisTestSink {
/// This redis UDSink is created for numaflow e2e tests. This handle function assumes that
/// a redis instance listening on address redis:6379 has already been up and running.
async fn sink(&self, mut input: tokio::sync::mpsc::Receiver<SinkRequest>) -> Vec<Response> {
let mut results: Vec<Response> = Vec::new();

// Get async connection to Redis
let mut con = self
.client
.get_multiplexed_async_connection()
.await
.expect("Failed to get Redis connection");

while let Some(datum) = input.recv().await {
let id = datum.id.clone();
let value = datum.value.clone();

if self.check_order {
let mut inflight = self.inflight_messages.lock().await;
inflight.push(datum);

if inflight.len() == self.message_count {
// Check if messages are ordered by event time
let ordered = inflight
.windows(2)
.all(|w| w[0].event_time <= w[1].event_time);

let result_message = if ordered { "ordered" } else { "not ordered" };

// Increment the count for the order result in Redis
let result: Result<(), redis::RedisError> =
con.hincr(&self.hash_key, result_message, 1).await;

match result {
Ok(_) => {
println!(
"Incremented by 1 the no. of occurrences of {} under hash key {}",
result_message, self.hash_key
);
}
Err(e) => {
eprintln!("Set Error - {:?}", e);
}
}

// Reset the inflight messages
inflight.clear();
}
}

// Watermark and event time of the message can be accessed
let _ = datum.event_time;
let _ = datum.watermark;

// We use redis hashes to store messages.
// Each field of a hash is the content of a message and
// value of the field is the no. of occurrences of the message.
let value_str = String::from_utf8(value).unwrap_or_else(|_| "".to_string());

let result: Result<(), redis::RedisError> =
con.hincr(&self.hash_key, &value_str, 1).await;

match result {
Ok(_) => {
println!(
"Incremented by 1 the no. of occurrences of {} under hash key {}",
value_str, self.hash_key
);
}
Err(e) => {
eprintln!("Set Error - {:?}", e);
}
}

results.push(Response::ok(id));
}

results
}
}

#[tokio::main]
async fn main() {
let sink = RedisTestSink::new();

let server = sink::Server::new(sink);

if let Err(e) = server.start().await {
panic!("Failed to start sink function server: {:?}", e);
}
}
Loading