Skip to content

Commit f52f1db

Browse files
authored
Destinations: S3FileSink Template (#638)
* add s3-file-sink template * more cleanup * rebase and add tests and small fixes
1 parent e9ae4b5 commit f52f1db

File tree

9 files changed

+534
-0
lines changed

9 files changed

+534
-0
lines changed
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# AWS S3 File Destination
2+
3+
[This connector](https://github.com/quixio/quix-samples/tree/main/python/destinations/s3-file) demonstrates how to consume data from a Kafka topic and write it to an AWS S3 bucket.
4+
5+
## How to run
6+
7+
Create a [Quix](https://portal.platform.quix.io/signup?xlink=github) account or log-in and visit the `Connectors` tab to use this connector.
8+
9+
Clicking `Set up connector` allows you to enter your connection details and runtime parameters.
10+
11+
Then either:
12+
* click `Test connection & deploy` to deploy the pre-built and configured container into Quix.
13+
14+
* or click `Customise connector` to inspect or alter the code before deployment.
15+
16+
## Environment Variables
17+
18+
The connector uses the following environment variables (which generally correspond to the
19+
[`S3FileSink`](https://quix.io/docs/quix-streams/connectors/sinks/amazon-s3-sink.html) parameter names):
20+
21+
### Required
22+
- `input`: The input Kafka topic
23+
- `S3_BUCKET`: The S3 bucket to use.
24+
- `AWS_ENDPOINT_URL`: The URL to your S3 instance.
25+
- `AWS_REGION_NAME`: The region of your S3 bucket.
26+
- `AWS_SECRET_ACCESS_KEY`: Your AWS secret.
27+
- `AWS_ACCESS_KEY_ID`: Your AWS Access Key.
28+
29+
### Optional
30+
Unless explicitly defined, these are optional, or generally set to the [`S3FileSink`](https://quix.io/docs/quix-streams/connectors/sinks/amazon-s3-sink.html) defaults.
31+
32+
- `S3_BUCKET_DIRECTORY`: An optional path within the S3 bucket to use.
33+
**Default**: "" (root)
34+
- `FILE_FORMAT`: The file format to publish data as; options: \[parquet, json\].
35+
**Default**: "parquet"
36+
37+
38+
## Requirements / Prerequisites
39+
40+
You will need the appropriate AWS features and access to use this connector.
41+
42+
## Contribute
43+
44+
Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit.
45+
46+
## Open Source
47+
48+
This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo. Please star us and mention us on social to show your appreciation.
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
FROM python:3.11.1-slim-buster
2+
3+
# Set environment variables to non-interactive and unbuffered output
4+
ENV DEBIAN_FRONTEND=noninteractive \
5+
PYTHONUNBUFFERED=1 \
6+
PYTHONIOENCODING=UTF-8
7+
8+
# Set the working directory inside the container
9+
WORKDIR /app
10+
11+
# Copy only the requirements file(s) to leverage Docker cache
12+
# Assuming all requirements files are in the root or subdirectories
13+
COPY ./requirements.txt ./
14+
15+
# Install dependencies
16+
# Adding `--no-cache-dir` to avoid storing unnecessary files and potentially reduce image size
17+
RUN pip install --no-cache-dir -r requirements.txt
18+
19+
# Copy the rest of the application
20+
COPY . .
21+
22+
# Set the command to run your application
23+
ENTRYPOINT ["python3", "main.py"]
4.74 KB
Loading
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
{
2+
"libraryItemId": "s3-file",
3+
"name": "AWS S3 File Sink",
4+
"language": "Python",
5+
"tags": {
6+
"Pipeline Stage": ["Destination"],
7+
"Type": ["Connectors"],
8+
"Category": ["File Store"]
9+
},
10+
"shortDescription": "Consume data from a Kafka topic and write it to an AWS S3 bucket path.",
11+
"DefaultFile": "main.py",
12+
"EntryPoint": "dockerfile",
13+
"RunEntryPoint": "main.py",
14+
"IconFile": "icon.png",
15+
"Variables": [
16+
{
17+
"Name": "input",
18+
"Type": "EnvironmentVariable",
19+
"InputType": "InputTopic",
20+
"Description": "The input Kafka topic.",
21+
"DefaultValue": "input",
22+
"Required": true
23+
},
24+
{
25+
"Name": "S3_BUCKET",
26+
"Type": "EnvironmentVariable",
27+
"InputType": "FreeText",
28+
"Description": "The S3 bucket to use.",
29+
"Required": true
30+
},
31+
{
32+
"Name": "S3_BUCKET_DIRECTORY",
33+
"Type": "EnvironmentVariable",
34+
"InputType": "FreeText",
35+
"Description": "An optional path within the S3 bucket to use, else uses root.",
36+
"DefaultValue": "",
37+
"Required": false
38+
},
39+
{
40+
"Name": "AWS_REGION_NAME",
41+
"Type": "EnvironmentVariable",
42+
"InputType": "FreeText",
43+
"Description": "The region of your S3 bucket.",
44+
"Required": true
45+
},
46+
{
47+
"Name": "AWS_SECRET_ACCESS_KEY",
48+
"Type": "EnvironmentVariable",
49+
"InputType": "Secret",
50+
"Description": "Your AWS secret.",
51+
"Required": true
52+
},
53+
{
54+
"Name": "AWS_ACCESS_KEY_ID",
55+
"Type": "EnvironmentVariable",
56+
"InputType": "Secret",
57+
"Description": "Your AWS Access Key.",
58+
"Required": true
59+
},
60+
{
61+
"Name": "FILE_FORMAT",
62+
"Type": "EnvironmentVariable",
63+
"InputType": "FreeText",
64+
"Description": "The file format to publish data as; options: [parquet, json].",
65+
"DefaultValue": "parquet",
66+
"Required": false
67+
}
68+
],
69+
"DeploySettings": {
70+
"DeploymentType": "Service",
71+
"CpuMillicores": 200,
72+
"MemoryInMb": 500,
73+
"Replicas": 1,
74+
"PublicAccess": false,
75+
"ValidateConnection": false
76+
}
77+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
from typing import get_args
2+
import os
3+
4+
from quixstreams import Application
5+
from quixstreams.sinks.community.file.s3 import S3FileSink
6+
from quixstreams.sinks.community.file.formats import FormatName
7+
8+
9+
def get_file_format() -> FormatName:
10+
valid_formats = get_args(FormatName)
11+
if (file_format := os.getenv("FILE_FORMAT", "parquet")) not in valid_formats:
12+
raise ValueError(
13+
f"`FILE_FORMAT` must be one of {valid_formats}; got {file_format}"
14+
)
15+
return file_format
16+
17+
18+
app = Application(
19+
consumer_group="s3-file-destination",
20+
auto_offset_reset="earliest",
21+
commit_interval=5
22+
)
23+
24+
s3_file_sink = S3FileSink(
25+
bucket=os.environ["S3_BUCKET"],
26+
directory=os.getenv("S3_BUCKET_DIRECTORY", ""),
27+
aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
28+
aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
29+
region_name=os.environ["AWS_REGION_NAME"],
30+
format=get_file_format(),
31+
)
32+
33+
sdf = app.dataframe(topic=app.topic(os.environ["input"])).sink(s3_file_sink)
34+
35+
36+
if __name__ == "__main__":
37+
app.run()
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
quixstreams[s3]==3.23.1
2+
python-dotenv
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
# timeout: 60
2+
services:
3+
minio:
4+
image: minio/minio:latest
5+
command: server /data --console-address ":9001"
6+
environment:
7+
- MINIO_ROOT_USER=minioadmin
8+
- MINIO_ROOT_PASSWORD=minioadmin
9+
networks:
10+
- test-network
11+
healthcheck:
12+
test: ["CMD", "mc", "ready", "local"]
13+
interval: 3s
14+
timeout: 5s
15+
retries: 10
16+
stop_grace_period: 3s
17+
18+
minio-init:
19+
image: minio/mc:latest
20+
depends_on:
21+
minio:
22+
condition: service_healthy
23+
entrypoint: >
24+
/bin/sh -c "
25+
mc alias set myminio http://minio:9000 minioadmin minioadmin;
26+
mc mb myminio/test-bucket --ignore-existing;
27+
echo 'MinIO bucket created';
28+
echo 'Keeping minio-init alive...';
29+
tail -f /dev/null
30+
"
31+
networks:
32+
- test-network
33+
34+
kafka:
35+
image: docker.redpanda.com/redpandadata/redpanda:v24.2.4
36+
command:
37+
- redpanda
38+
- start
39+
- --kafka-addr internal://0.0.0.0:9092
40+
- --advertise-kafka-addr internal://kafka:9092
41+
- --mode dev-container
42+
- --smp 1
43+
healthcheck:
44+
test: ["CMD-SHELL", "rpk cluster health | grep -E 'Healthy:.+true' || exit 1"]
45+
interval: 5s
46+
timeout: 10s
47+
retries: 10
48+
networks:
49+
- test-network
50+
stop_grace_period: 3s
51+
52+
s3-file-dest:
53+
build:
54+
context: ../../../python/destinations/s3-file
55+
dockerfile: Dockerfile
56+
environment:
57+
- Quix__Broker__Address=kafka:9092
58+
- Quix__Consumer__Group=s3-file-dest-test
59+
- Quix__Deployment__Id=test-s3-file-dest
60+
- input=test-s3-input
61+
- S3_BUCKET=test-bucket
62+
- S3_BUCKET_DIRECTORY=test_data
63+
- AWS_ACCESS_KEY_ID=minioadmin
64+
- AWS_SECRET_ACCESS_KEY=minioadmin
65+
- AWS_REGION_NAME=us-east-1
66+
- AWS_ENDPOINT_URL_S3=http://minio:9000
67+
- FILE_FORMAT=json
68+
networks:
69+
- test-network
70+
depends_on:
71+
minio:
72+
condition: service_healthy
73+
kafka:
74+
condition: service_healthy
75+
minio-init:
76+
condition: service_started
77+
stop_grace_period: 3s
78+
79+
test-runner:
80+
build:
81+
context: ../../framework
82+
dockerfile: Dockerfile
83+
environment:
84+
- Quix__Broker__Address=kafka:9092
85+
- TEST_INPUT_TOPIC=test-s3-input
86+
- TEST_MESSAGE_COUNT=3
87+
- MINIO_ENDPOINT=minio:9000
88+
- MINIO_ACCESS_KEY=minioadmin
89+
- MINIO_SECRET_KEY=minioadmin
90+
- S3_BUCKET=test-bucket
91+
- S3_PREFIX=test_data
92+
command: >
93+
sh -c "
94+
echo 'Installing boto3 for S3 access...' &&
95+
pip install boto3 > /dev/null 2>&1 &&
96+
echo 'Producing test messages to Kafka...' &&
97+
python /tests/produce_test_data.py &&
98+
echo 'Waiting for s3-file-dest to process messages...' &&
99+
sleep 15 &&
100+
echo 'Verifying data in S3...' &&
101+
python /tests/verify_output.py
102+
"
103+
volumes:
104+
- ./produce_test_data.py:/tests/produce_test_data.py:ro
105+
- ./verify_output.py:/tests/verify_output.py:ro
106+
working_dir: /
107+
networks:
108+
- test-network
109+
depends_on:
110+
minio:
111+
condition: service_healthy
112+
kafka:
113+
condition: service_healthy
114+
s3-file-dest:
115+
condition: service_started
116+
stop_grace_period: 3s
117+
118+
networks:
119+
test-network:
120+
driver: bridge
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import os
2+
import time
3+
import json
4+
from quixstreams import Application
5+
6+
def main():
7+
broker_address = os.getenv("Quix__Broker__Address", "kafka:9092")
8+
topic_name = os.getenv("TEST_INPUT_TOPIC", "test-s3-input")
9+
message_count = int(os.getenv("TEST_MESSAGE_COUNT", "3"))
10+
11+
print(f"Producing {message_count} test messages to topic: {topic_name}")
12+
13+
app = Application(
14+
broker_address=broker_address,
15+
producer_extra_config={
16+
"allow.auto.create.topics": "true"
17+
}
18+
)
19+
20+
topic = app.topic(topic_name)
21+
22+
with app.get_producer() as producer:
23+
for i in range(message_count):
24+
message = {
25+
"id": i,
26+
"name": f"test_item_{i}",
27+
"value": f"test_value_{i}",
28+
"timestamp": int(time.time() * 1000)
29+
}
30+
print(f"Producing message {i}: {message}")
31+
32+
serialized = json.dumps(message).encode('utf-8')
33+
34+
producer.produce(
35+
topic=topic.name,
36+
key=f"key_{i}",
37+
value=serialized
38+
)
39+
40+
producer.flush()
41+
42+
print(f"Successfully produced {message_count} messages")
43+
44+
if __name__ == "__main__":
45+
try:
46+
main()
47+
except Exception as e:
48+
print(f"Error: {e}")
49+
import traceback
50+
traceback.print_exc()
51+
exit(1)

0 commit comments

Comments
 (0)