Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions python/destinations/s3-file/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# AWS S3 File Destination

[This connector](https://github.com/quixio/quix-samples/tree/main/python/destinations/s3-file) demonstrates how to consume data from a Kafka topic and write it to an AWS S3 bucket.

## How to run

Create a [Quix](https://portal.platform.quix.io/signup?xlink=github) account or log-in and visit the `Connectors` tab to use this connector.

Clicking `Set up connector` allows you to enter your connection details and runtime parameters.

Then either:
* click `Test connection & deploy` to deploy the pre-built and configured container into Quix.

* or click `Customise connector` to inspect or alter the code before deployment.

## Environment Variables

The connector uses the following environment variables (which generally correspond to the
[`S3FileSink`](https://quix.io/docs/quix-streams/connectors/sinks/amazon-s3-sink.html) parameter names):

### Required
- `input`: The input Kafka topic
- `S3_BUCKET`: The S3 bucket to use.
- `AWS_ENDPOINT_URL`: The URL to your S3 instance.
- `AWS_REGION_NAME`: The region of your S3 bucket.
- `AWS_SECRET_ACCESS_KEY`: Your AWS secret.
- `AWS_ACCESS_KEY_ID`: Your AWS Access Key.

### Optional
Unless explicitly defined, these are optional, or generally set to the [`S3FileSink`](https://quix.io/docs/quix-streams/connectors/sinks/amazon-s3-sink.html) defaults.

- `S3_BUCKET_DIRECTORY`: An optional path within the S3 bucket to use.
**Default**: "" (root)
- `FILE_FORMAT`: The file format to publish data as; options: \[parquet, json\].
**Default**: "parquet"


## Requirements / Prerequisites

You will need the appropriate AWS features and access to use this connector.

## Contribute

Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit.

## Open Source

This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo. Please star us and mention us on social to show your appreciation.
23 changes: 23 additions & 0 deletions python/destinations/s3-file/dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
FROM python:3.11.1-slim-buster

# Set environment variables to non-interactive and unbuffered output
ENV DEBIAN_FRONTEND=noninteractive \
PYTHONUNBUFFERED=1 \
PYTHONIOENCODING=UTF-8

# Set the working directory inside the container
WORKDIR /app

# Copy only the requirements file(s) to leverage Docker cache
# Assuming all requirements files are in the root or subdirectories
COPY ./requirements.txt ./

# Install dependencies
# Adding `--no-cache-dir` to avoid storing unnecessary files and potentially reduce image size
RUN pip install --no-cache-dir -r requirements.txt

# Copy the rest of the application
COPY . .

# Set the command to run your application
ENTRYPOINT ["python3", "main.py"]
Binary file added python/destinations/s3-file/icon.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
77 changes: 77 additions & 0 deletions python/destinations/s3-file/library.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
{
"libraryItemId": "s3-file",
"name": "AWS S3 File Sink",
"language": "Python",
"tags": {
"Pipeline Stage": ["Destination"],
"Type": ["Connectors"],
"Category": ["File Store"]
},
"shortDescription": "Consume data from a Kafka topic and write it to an AWS S3 bucket path.",
"DefaultFile": "main.py",
"EntryPoint": "dockerfile",
"RunEntryPoint": "main.py",
"IconFile": "icon.png",
"Variables": [
{
"Name": "input",
"Type": "EnvironmentVariable",
"InputType": "InputTopic",
"Description": "The input Kafka topic.",
"DefaultValue": "input",
"Required": true
},
{
"Name": "S3_BUCKET",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The S3 bucket to use.",
"Required": true
},
{
"Name": "S3_BUCKET_DIRECTORY",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "An optional path within the S3 bucket to use, else uses root.",
"DefaultValue": "",
"Required": false
},
{
"Name": "AWS_REGION_NAME",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The region of your S3 bucket.",
"Required": true
},
{
"Name": "AWS_SECRET_ACCESS_KEY",
"Type": "EnvironmentVariable",
"InputType": "Secret",
"Description": "Your AWS secret.",
"Required": true
},
{
"Name": "AWS_ACCESS_KEY_ID",
"Type": "EnvironmentVariable",
"InputType": "Secret",
"Description": "Your AWS Access Key.",
"Required": true
},
{
"Name": "FILE_FORMAT",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The file format to publish data as; options: [parquet, json].",
"DefaultValue": "parquet",
"Required": false
}
],
"DeploySettings": {
"DeploymentType": "Service",
"CpuMillicores": 200,
"MemoryInMb": 500,
"Replicas": 1,
"PublicAccess": false,
"ValidateConnection": false
}
}
37 changes: 37 additions & 0 deletions python/destinations/s3-file/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from typing import get_args
import os

from quixstreams import Application
from quixstreams.sinks.community.file.s3 import S3FileSink
from quixstreams.sinks.community.file.formats import FormatName


def get_file_format() -> FormatName:
valid_formats = get_args(FormatName)
if (file_format := os.getenv("FILE_FORMAT", "parquet")) not in valid_formats:
raise ValueError(
f"`FILE_FORMAT` must be one of {valid_formats}; got {file_format}"
)
return file_format


app = Application(
consumer_group="s3-file-destination",
auto_offset_reset="earliest",
commit_interval=5
)

s3_file_sink = S3FileSink(
bucket=os.environ["S3_BUCKET"],
directory=os.getenv("S3_BUCKET_DIRECTORY", ""),
aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
region_name=os.environ["AWS_REGION_NAME"],
format=get_file_format(),
)

sdf = app.dataframe(topic=app.topic(os.environ["input"])).sink(s3_file_sink)


if __name__ == "__main__":
app.run()
2 changes: 2 additions & 0 deletions python/destinations/s3-file/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
quixstreams[s3]==3.23.1
python-dotenv
120 changes: 120 additions & 0 deletions tests/destinations/s3-file/docker-compose.test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# timeout: 60
services:
minio:
image: minio/minio:latest
command: server /data --console-address ":9001"
environment:
- MINIO_ROOT_USER=minioadmin
- MINIO_ROOT_PASSWORD=minioadmin
networks:
- test-network
healthcheck:
test: ["CMD", "mc", "ready", "local"]
interval: 3s
timeout: 5s
retries: 10
stop_grace_period: 3s

minio-init:
image: minio/mc:latest
depends_on:
minio:
condition: service_healthy
entrypoint: >
/bin/sh -c "
mc alias set myminio http://minio:9000 minioadmin minioadmin;
mc mb myminio/test-bucket --ignore-existing;
echo 'MinIO bucket created';
echo 'Keeping minio-init alive...';
tail -f /dev/null
"
networks:
- test-network

kafka:
image: docker.redpanda.com/redpandadata/redpanda:v24.2.4
command:
- redpanda
- start
- --kafka-addr internal://0.0.0.0:9092
- --advertise-kafka-addr internal://kafka:9092
- --mode dev-container
- --smp 1
healthcheck:
test: ["CMD-SHELL", "rpk cluster health | grep -E 'Healthy:.+true' || exit 1"]
interval: 5s
timeout: 10s
retries: 10
networks:
- test-network
stop_grace_period: 3s

s3-file-dest:
build:
context: ../../../python/destinations/s3-file
dockerfile: Dockerfile
environment:
- Quix__Broker__Address=kafka:9092
- Quix__Consumer__Group=s3-file-dest-test
- Quix__Deployment__Id=test-s3-file-dest
- input=test-s3-input
- S3_BUCKET=test-bucket
- S3_BUCKET_DIRECTORY=test_data
- AWS_ACCESS_KEY_ID=minioadmin
- AWS_SECRET_ACCESS_KEY=minioadmin
- AWS_REGION_NAME=us-east-1
- AWS_ENDPOINT_URL_S3=http://minio:9000
- FILE_FORMAT=json
networks:
- test-network
depends_on:
minio:
condition: service_healthy
kafka:
condition: service_healthy
minio-init:
condition: service_started
stop_grace_period: 3s

test-runner:
build:
context: ../../framework
dockerfile: Dockerfile
environment:
- Quix__Broker__Address=kafka:9092
- TEST_INPUT_TOPIC=test-s3-input
- TEST_MESSAGE_COUNT=3
- MINIO_ENDPOINT=minio:9000
- MINIO_ACCESS_KEY=minioadmin
- MINIO_SECRET_KEY=minioadmin
- S3_BUCKET=test-bucket
- S3_PREFIX=test_data
command: >
sh -c "
echo 'Installing boto3 for S3 access...' &&
pip install boto3 > /dev/null 2>&1 &&
echo 'Producing test messages to Kafka...' &&
python /tests/produce_test_data.py &&
echo 'Waiting for s3-file-dest to process messages...' &&
sleep 15 &&
echo 'Verifying data in S3...' &&
python /tests/verify_output.py
"
volumes:
- ./produce_test_data.py:/tests/produce_test_data.py:ro
- ./verify_output.py:/tests/verify_output.py:ro
working_dir: /
networks:
- test-network
depends_on:
minio:
condition: service_healthy
kafka:
condition: service_healthy
s3-file-dest:
condition: service_started
stop_grace_period: 3s

networks:
test-network:
driver: bridge
51 changes: 51 additions & 0 deletions tests/destinations/s3-file/produce_test_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import os
import time
import json
from quixstreams import Application

def main():
broker_address = os.getenv("Quix__Broker__Address", "kafka:9092")
topic_name = os.getenv("TEST_INPUT_TOPIC", "test-s3-input")
message_count = int(os.getenv("TEST_MESSAGE_COUNT", "3"))

print(f"Producing {message_count} test messages to topic: {topic_name}")

app = Application(
broker_address=broker_address,
producer_extra_config={
"allow.auto.create.topics": "true"
}
)

topic = app.topic(topic_name)

with app.get_producer() as producer:
for i in range(message_count):
message = {
"id": i,
"name": f"test_item_{i}",
"value": f"test_value_{i}",
"timestamp": int(time.time() * 1000)
}
print(f"Producing message {i}: {message}")

serialized = json.dumps(message).encode('utf-8')

producer.produce(
topic=topic.name,
key=f"key_{i}",
value=serialized
)

producer.flush()

print(f"Successfully produced {message_count} messages")

if __name__ == "__main__":
try:
main()
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
exit(1)
Loading