Skip to content

Commit e9ae4b5

Browse files
authored
External Kafka topic -> Quix topic source template (#652)
* add python kafka source template * remove env var * fix test
1 parent 0bd5fe0 commit e9ae4b5

File tree

10 files changed

+535
-0
lines changed

10 files changed

+535
-0
lines changed

python/sources/kafka/README.md

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# Kafka Replicator Source
2+
3+
This source template replicates data from a Kafka topic to a Quix topic using the `KafkaReplicatorSource` class from Quix Streams.
4+
5+
## Features
6+
7+
- Replicates data from any Kafka broker to Quix
8+
- Supports SASL/SSL authentication
9+
- Configurable deserializers for keys and values
10+
- Configurable consumer behavior (offset reset, poll timeout, etc.)
11+
- Support for running multiple instances with the same name
12+
13+
## Environment Variables
14+
15+
### Required
16+
17+
- **output**: The Quix topic that will receive the replicated data
18+
- **SOURCE_BROKER_ADDRESS**: The source Kafka broker address (e.g., `localhost:9092` or `broker.example.com:9092`)
19+
- **SOURCE_TOPIC**: The source Kafka topic name to replicate from
20+
21+
### Optional
22+
23+
- **AUTO_OFFSET_RESET**: What to do when there is no initial offset in Kafka. Options: `earliest`, `latest`. Default: `latest`
24+
- **VALUE_DESERIALIZER**: Deserializer to use for the message value. Options: `json`, `bytes`, `string`, `double`, `integer`. Default: `json`
25+
- **KEY_DESERIALIZER**: Deserializer to use for the message key. Options: `json`, `bytes`, `string`, `double`, `integer`. Default: `bytes`
26+
- **CONSUMER_POLL_TIMEOUT**: Consumer poll timeout in seconds (optional)
27+
- **SHUTDOWN_TIMEOUT**: Timeout for shutting down the source in seconds. Default: `10`
28+
29+
### Authentication (Optional)
30+
31+
If your source Kafka cluster requires authentication, provide these variables:
32+
33+
- **SOURCE_KAFKA_SASL_USERNAME**: SASL username for source Kafka authentication
34+
- **SOURCE_KAFKA_SASL_PASSWORD**: SASL password for source Kafka authentication
35+
- **SOURCE_KAFKA_SASL_MECHANISM**: SASL mechanism for authentication. Options: `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512`, `GSSAPI`, `OAUTHBEARER`, `AWS_MSK_IAM`. Default: `SCRAM-SHA-256`
36+
- **SOURCE_KAFKA_SSL_CA_LOCATION**: Path to the SSL CA certificate file for secure connections
37+
38+
## How It Works
39+
40+
1. The template creates a connection to the source Kafka broker using the provided configuration
41+
2. It uses the `KafkaReplicatorSource` to consume messages from the source topic
42+
3. Messages are automatically replicated to the output topic in Quix
43+
4. The replication preserves message keys, values, and metadata
44+
45+
## Use Cases
46+
47+
- Migrating data from one Kafka cluster to another
48+
- Creating a real-time backup of a Kafka topic
49+
- Integrating external Kafka data sources into Quix pipelines
50+
- Bridging data between different Kafka environments (e.g., on-premise to cloud)
51+
52+
## Example Configuration
53+
54+
For a public Kafka broker without authentication:
55+
```
56+
SOURCE_BROKER_ADDRESS=localhost:9092
57+
SOURCE_TOPIC=my-source-topic
58+
output=replicated-data
59+
```
60+
61+
For a secured Kafka cluster:
62+
```
63+
SOURCE_BROKER_ADDRESS=broker.example.com:9092
64+
SOURCE_TOPIC=my-source-topic
65+
output=replicated-data
66+
SOURCE_KAFKA_SASL_USERNAME=my-username
67+
SOURCE_KAFKA_SASL_PASSWORD=my-password
68+
SOURCE_KAFKA_SASL_MECHANISM=SCRAM-SHA-256
69+
```
70+
71+
## Notes
72+
73+
- The `KafkaReplicatorSource` supports running multiple instances with the same name for parallel processing
74+
- If no authentication variables are provided, the connector will attempt to connect without SASL/SSL
75+
- Make sure the source Kafka broker is accessible from the Quix environment

python/sources/kafka/dockerfile

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
FROM python:3.12.5-slim-bookworm
2+
3+
# Set environment variables for non-interactive setup and unbuffered output
4+
ENV DEBIAN_FRONTEND=noninteractive \
5+
PYTHONUNBUFFERED=1 \
6+
PYTHONIOENCODING=UTF-8 \
7+
PYTHONPATH="/app"
8+
9+
# Build argument for setting the main app path
10+
ARG MAINAPPPATH=.
11+
12+
# Set working directory inside the container
13+
WORKDIR /app
14+
15+
# Copy requirements to leverage Docker cache
16+
COPY "${MAINAPPPATH}/requirements.txt" "${MAINAPPPATH}/requirements.txt"
17+
18+
# Install dependencies without caching
19+
RUN pip install --no-cache-dir -r "${MAINAPPPATH}/requirements.txt"
20+
21+
# Copy entire application into container
22+
COPY . .
23+
24+
# Set working directory to main app path
25+
WORKDIR "/app/${MAINAPPPATH}"
26+
27+
# Define the container's startup command
28+
ENTRYPOINT ["python3", "main.py"]

python/sources/kafka/icon.png

30.7 KB
Loading

python/sources/kafka/library.json

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
{
2+
"libraryItemId": "kafka-replicator-source",
3+
"name": "Kafka Replicator Source",
4+
"language": "Python",
5+
"tags": {
6+
"Pipeline Stage": ["Source"],
7+
"Type": ["Connectors"],
8+
"Category": ["Data streaming"]
9+
},
10+
"shortDescription": "Replicate data from a Kafka topic to a Quix Cloud topic",
11+
"DefaultFile": "main.py",
12+
"EntryPoint": "dockerfile",
13+
"RunEntryPoint": "main.py",
14+
"IconFile": "icon.png",
15+
"Variables": [
16+
{
17+
"Name": "output",
18+
"Type": "EnvironmentVariable",
19+
"InputType": "OutputTopic",
20+
"Description": "This is the Quix Topic that will receive the replicated data",
21+
"DefaultValue": "",
22+
"Required": true
23+
},
24+
{
25+
"Name": "SOURCE_BROKER_ADDRESS",
26+
"Type": "EnvironmentVariable",
27+
"InputType": "FreeText",
28+
"Description": "The source Kafka broker address (e.g., localhost:9092 or broker.example.com:9092)",
29+
"DefaultValue": "",
30+
"Required": true
31+
},
32+
{
33+
"Name": "SOURCE_TOPIC",
34+
"Type": "EnvironmentVariable",
35+
"InputType": "FreeText",
36+
"Description": "The source Kafka topic name to replicate from",
37+
"DefaultValue": "",
38+
"Required": true
39+
},
40+
{
41+
"Name": "AUTO_OFFSET_RESET",
42+
"Type": "EnvironmentVariable",
43+
"InputType": "FreeText",
44+
"Description": "What to do when there is no initial offset in Kafka. Options: earliest, latest",
45+
"DefaultValue": "latest",
46+
"Required": false
47+
},
48+
{
49+
"Name": "VALUE_DESERIALIZER",
50+
"Type": "EnvironmentVariable",
51+
"InputType": "FreeText",
52+
"Description": "Deserializer to use for the message value. Options: json, bytes, string, double, integer",
53+
"DefaultValue": "json",
54+
"Required": false
55+
},
56+
{
57+
"Name": "KEY_DESERIALIZER",
58+
"Type": "EnvironmentVariable",
59+
"InputType": "FreeText",
60+
"Description": "Deserializer to use for the message key. Options: json, bytes, string, double, integer",
61+
"DefaultValue": "bytes",
62+
"Required": false
63+
},
64+
{
65+
"Name": "CONSUMER_POLL_TIMEOUT",
66+
"Type": "EnvironmentVariable",
67+
"InputType": "FreeText",
68+
"Description": "Consumer poll timeout in seconds (optional)",
69+
"DefaultValue": "",
70+
"Required": false
71+
},
72+
{
73+
"Name": "SHUTDOWN_TIMEOUT",
74+
"Type": "EnvironmentVariable",
75+
"InputType": "FreeText",
76+
"Description": "Timeout for shutting down the source in seconds",
77+
"DefaultValue": "10",
78+
"Required": false
79+
},
80+
{
81+
"Name": "SOURCE_KAFKA_SASL_USERNAME",
82+
"Type": "EnvironmentVariable",
83+
"InputType": "FreeText",
84+
"Description": "SASL username for source Kafka authentication (optional)",
85+
"DefaultValue": "",
86+
"Required": false
87+
},
88+
{
89+
"Name": "SOURCE_KAFKA_SASL_PASSWORD",
90+
"Type": "EnvironmentVariable",
91+
"InputType": "Secret",
92+
"Description": "SASL password for source Kafka authentication (optional)",
93+
"DefaultValue": "",
94+
"Required": false
95+
},
96+
{
97+
"Name": "SOURCE_KAFKA_SASL_MECHANISM",
98+
"Type": "EnvironmentVariable",
99+
"InputType": "FreeText",
100+
"Description": "SASL mechanism for authentication. Options: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, OAUTHBEARER, AWS_MSK_IAM",
101+
"DefaultValue": "SCRAM-SHA-256",
102+
"Required": false
103+
},
104+
{
105+
"Name": "SOURCE_KAFKA_SSL_CA_LOCATION",
106+
"Type": "EnvironmentVariable",
107+
"InputType": "FreeText",
108+
"Description": "Path to the SSL CA certificate file for secure connections. If not provided, system default CA certificates will be used",
109+
"DefaultValue": "",
110+
"Required": false
111+
},
112+
{
113+
"Name": "CONSUMER_GROUP",
114+
"Type": "EnvironmentVariable",
115+
"InputType": "FreeText",
116+
"Description": "Name of the consumer group",
117+
"DefaultValue": "",
118+
"Required": false
119+
}
120+
],
121+
"DeploySettings": {
122+
"DeploymentType": "Service",
123+
"CpuMillicores": 200,
124+
"MemoryInMb": 200,
125+
"Replicas": 1,
126+
"PublicAccess": false,
127+
"ValidateConnection": true
128+
}
129+
}

python/sources/kafka/main.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
from quixstreams import Application
2+
from quixstreams.sources.core.kafka import KafkaReplicatorSource
3+
import os
4+
5+
# for local dev, load env vars from a .env file
6+
# from dotenv import load_dotenv
7+
# load_dotenv()
8+
9+
# Required parameters
10+
source_broker_address = os.environ["SOURCE_BROKER_ADDRESS"]
11+
source_topic = os.environ["SOURCE_TOPIC"]
12+
output_topic_name = os.environ["output"]
13+
14+
# Optional parameters
15+
auto_offset_reset = os.getenv("AUTO_OFFSET_RESET", "latest")
16+
consumer_poll_timeout = os.getenv("CONSUMER_POLL_TIMEOUT", None)
17+
shutdown_timeout = float(os.getenv("SHUTDOWN_TIMEOUT", "10"))
18+
value_deserializer = os.getenv("VALUE_DESERIALIZER", "json")
19+
key_deserializer = os.getenv("KEY_DESERIALIZER", "bytes")
20+
21+
# Build consumer_extra_config for SASL/SSL if credentials are provided
22+
consumer_extra_config = {}
23+
sasl_username = os.getenv("SOURCE_KAFKA_SASL_USERNAME", "")
24+
sasl_password = os.getenv("SOURCE_KAFKA_SASL_PASSWORD", "")
25+
sasl_mechanism = os.getenv("SOURCE_KAFKA_SASL_MECHANISM", "SCRAM-SHA-256")
26+
ssl_ca_location = os.getenv("SOURCE_KAFKA_SSL_CA_LOCATION", "")
27+
28+
if sasl_username and sasl_password:
29+
consumer_extra_config['sasl.mechanism'] = sasl_mechanism
30+
consumer_extra_config['security.protocol'] = 'SASL_SSL'
31+
consumer_extra_config['sasl.username'] = sasl_username
32+
consumer_extra_config['sasl.password'] = sasl_password
33+
if ssl_ca_location:
34+
consumer_extra_config['ssl.ca.location'] = ssl_ca_location
35+
36+
# Convert consumer_poll_timeout to float if provided
37+
if consumer_poll_timeout:
38+
consumer_poll_timeout = float(consumer_poll_timeout)
39+
40+
# Create a Quix Application for the destination (Quix platform)
41+
app = Application(
42+
consumer_group=os.environ["CONSUMER_GROUP"],
43+
)
44+
45+
# Create the KafkaReplicatorSource
46+
source = KafkaReplicatorSource(
47+
name="kafka-replicator",
48+
app_config=app.config,
49+
topic=source_topic,
50+
broker_address=source_broker_address,
51+
auto_offset_reset=auto_offset_reset,
52+
consumer_extra_config=consumer_extra_config if consumer_extra_config else None,
53+
consumer_poll_timeout=consumer_poll_timeout,
54+
shutdown_timeout=shutdown_timeout,
55+
value_deserializer=value_deserializer,
56+
key_deserializer=key_deserializer,
57+
)
58+
59+
output_topic = app.topic(output_topic_name)
60+
app.add_source(source=source, topic=output_topic)
61+
62+
63+
if __name__ == "__main__":
64+
app.run()
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
quixstreams==3.23.1
2+
python-dotenv

tests/sources/kafka/data.jsonlines

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{"_key": "test0", "_value": {"k0": "v0", "k1": "v1"}}
2+
{"_key": "test1", "_value": {"k0": "v0", "k1": "v1"}}
3+
{"_key": "test2", "_value": {"k0": "v0", "k1": "v1"}}

0 commit comments

Comments
 (0)