Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions python/sources/kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Kafka Replicator Source

This source template replicates data from a Kafka topic to a Quix topic using the `KafkaReplicatorSource` class from Quix Streams.

## Features

- Replicates data from any Kafka broker to Quix
- Supports SASL/SSL authentication
- Configurable deserializers for keys and values
- Configurable consumer behavior (offset reset, poll timeout, etc.)
- Support for running multiple instances with the same name

## Environment Variables

### Required

- **output**: The Quix topic that will receive the replicated data
- **SOURCE_BROKER_ADDRESS**: The source Kafka broker address (e.g., `localhost:9092` or `broker.example.com:9092`)
- **SOURCE_TOPIC**: The source Kafka topic name to replicate from

### Optional

- **AUTO_OFFSET_RESET**: What to do when there is no initial offset in Kafka. Options: `earliest`, `latest`. Default: `latest`
- **VALUE_DESERIALIZER**: Deserializer to use for the message value. Options: `json`, `bytes`, `string`, `double`, `integer`. Default: `json`
- **KEY_DESERIALIZER**: Deserializer to use for the message key. Options: `json`, `bytes`, `string`, `double`, `integer`. Default: `bytes`
- **CONSUMER_POLL_TIMEOUT**: Consumer poll timeout in seconds (optional)
- **SHUTDOWN_TIMEOUT**: Timeout for shutting down the source in seconds. Default: `10`

### Authentication (Optional)

If your source Kafka cluster requires authentication, provide these variables:

- **SOURCE_KAFKA_SASL_USERNAME**: SASL username for source Kafka authentication
- **SOURCE_KAFKA_SASL_PASSWORD**: SASL password for source Kafka authentication
- **SOURCE_KAFKA_SASL_MECHANISM**: SASL mechanism for authentication. Options: `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512`, `GSSAPI`, `OAUTHBEARER`, `AWS_MSK_IAM`. Default: `SCRAM-SHA-256`
- **SOURCE_KAFKA_SSL_CA_LOCATION**: Path to the SSL CA certificate file for secure connections

## How It Works

1. The template creates a connection to the source Kafka broker using the provided configuration
2. It uses the `KafkaReplicatorSource` to consume messages from the source topic
3. Messages are automatically replicated to the output topic in Quix
4. The replication preserves message keys, values, and metadata

## Use Cases

- Migrating data from one Kafka cluster to another
- Creating a real-time backup of a Kafka topic
- Integrating external Kafka data sources into Quix pipelines
- Bridging data between different Kafka environments (e.g., on-premise to cloud)

## Example Configuration

For a public Kafka broker without authentication:
```
SOURCE_BROKER_ADDRESS=localhost:9092
SOURCE_TOPIC=my-source-topic
output=replicated-data
```

For a secured Kafka cluster:
```
SOURCE_BROKER_ADDRESS=broker.example.com:9092
SOURCE_TOPIC=my-source-topic
output=replicated-data
SOURCE_KAFKA_SASL_USERNAME=my-username
SOURCE_KAFKA_SASL_PASSWORD=my-password
SOURCE_KAFKA_SASL_MECHANISM=SCRAM-SHA-256
```

## Notes

- The `KafkaReplicatorSource` supports running multiple instances with the same name for parallel processing
- If no authentication variables are provided, the connector will attempt to connect without SASL/SSL
- Make sure the source Kafka broker is accessible from the Quix environment
28 changes: 28 additions & 0 deletions python/sources/kafka/dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
FROM python:3.12.5-slim-bookworm

# Set environment variables for non-interactive setup and unbuffered output
ENV DEBIAN_FRONTEND=noninteractive \
PYTHONUNBUFFERED=1 \
PYTHONIOENCODING=UTF-8 \
PYTHONPATH="/app"

# Build argument for setting the main app path
ARG MAINAPPPATH=.

# Set working directory inside the container
WORKDIR /app

# Copy requirements to leverage Docker cache
COPY "${MAINAPPPATH}/requirements.txt" "${MAINAPPPATH}/requirements.txt"

# Install dependencies without caching
RUN pip install --no-cache-dir -r "${MAINAPPPATH}/requirements.txt"

# Copy entire application into container
COPY . .

# Set working directory to main app path
WORKDIR "/app/${MAINAPPPATH}"

# Define the container's startup command
ENTRYPOINT ["python3", "main.py"]
Binary file added python/sources/kafka/icon.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
129 changes: 129 additions & 0 deletions python/sources/kafka/library.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
{
"libraryItemId": "kafka-replicator-source",
"name": "Kafka Replicator Source",
"language": "Python",
"tags": {
"Pipeline Stage": ["Source"],
"Type": ["Connectors"],
"Category": ["Data streaming"]
},
"shortDescription": "Replicate data from a Kafka topic to a Quix Cloud topic",
"DefaultFile": "main.py",
"EntryPoint": "dockerfile",
"RunEntryPoint": "main.py",
"IconFile": "icon.png",
"Variables": [
{
"Name": "output",
"Type": "EnvironmentVariable",
"InputType": "OutputTopic",
"Description": "This is the Quix Topic that will receive the replicated data",
"DefaultValue": "",
"Required": true
},
{
"Name": "SOURCE_BROKER_ADDRESS",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The source Kafka broker address (e.g., localhost:9092 or broker.example.com:9092)",
"DefaultValue": "",
"Required": true
},
{
"Name": "SOURCE_TOPIC",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The source Kafka topic name to replicate from",
"DefaultValue": "",
"Required": true
},
{
"Name": "AUTO_OFFSET_RESET",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "What to do when there is no initial offset in Kafka. Options: earliest, latest",
"DefaultValue": "latest",
"Required": false
},
{
"Name": "VALUE_DESERIALIZER",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "Deserializer to use for the message value. Options: json, bytes, string, double, integer",
"DefaultValue": "json",
"Required": false
},
{
"Name": "KEY_DESERIALIZER",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "Deserializer to use for the message key. Options: json, bytes, string, double, integer",
"DefaultValue": "bytes",
"Required": false
},
{
"Name": "CONSUMER_POLL_TIMEOUT",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "Consumer poll timeout in seconds (optional)",
"DefaultValue": "",
"Required": false
},
{
"Name": "SHUTDOWN_TIMEOUT",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "Timeout for shutting down the source in seconds",
"DefaultValue": "10",
"Required": false
},
{
"Name": "SOURCE_KAFKA_SASL_USERNAME",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "SASL username for source Kafka authentication (optional)",
"DefaultValue": "",
"Required": false
},
{
"Name": "SOURCE_KAFKA_SASL_PASSWORD",
"Type": "EnvironmentVariable",
"InputType": "Secret",
"Description": "SASL password for source Kafka authentication (optional)",
"DefaultValue": "",
"Required": false
},
{
"Name": "SOURCE_KAFKA_SASL_MECHANISM",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "SASL mechanism for authentication. Options: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, OAUTHBEARER, AWS_MSK_IAM",
"DefaultValue": "SCRAM-SHA-256",
"Required": false
},
{
"Name": "SOURCE_KAFKA_SSL_CA_LOCATION",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "Path to the SSL CA certificate file for secure connections. If not provided, system default CA certificates will be used",
"DefaultValue": "",
"Required": false
},
{
"Name": "CONSUMER_GROUP",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "Name of the consumer group",
"DefaultValue": "",
"Required": false
}
],
"DeploySettings": {
"DeploymentType": "Service",
"CpuMillicores": 200,
"MemoryInMb": 200,
"Replicas": 1,
"PublicAccess": false,
"ValidateConnection": true
}
}
64 changes: 64 additions & 0 deletions python/sources/kafka/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from quixstreams import Application
from quixstreams.sources.core.kafka import KafkaReplicatorSource
import os

# for local dev, load env vars from a .env file
# from dotenv import load_dotenv
# load_dotenv()

# Required parameters
source_broker_address = os.environ["SOURCE_BROKER_ADDRESS"]
source_topic = os.environ["SOURCE_TOPIC"]
output_topic_name = os.environ["output"]

# Optional parameters
auto_offset_reset = os.getenv("AUTO_OFFSET_RESET", "latest")
consumer_poll_timeout = os.getenv("CONSUMER_POLL_TIMEOUT", None)
shutdown_timeout = float(os.getenv("SHUTDOWN_TIMEOUT", "10"))
value_deserializer = os.getenv("VALUE_DESERIALIZER", "json")
key_deserializer = os.getenv("KEY_DESERIALIZER", "bytes")

# Build consumer_extra_config for SASL/SSL if credentials are provided
consumer_extra_config = {}
sasl_username = os.getenv("SOURCE_KAFKA_SASL_USERNAME", "")
sasl_password = os.getenv("SOURCE_KAFKA_SASL_PASSWORD", "")
sasl_mechanism = os.getenv("SOURCE_KAFKA_SASL_MECHANISM", "SCRAM-SHA-256")
ssl_ca_location = os.getenv("SOURCE_KAFKA_SSL_CA_LOCATION", "")

if sasl_username and sasl_password:
consumer_extra_config['sasl.mechanism'] = sasl_mechanism
consumer_extra_config['security.protocol'] = 'SASL_SSL'
consumer_extra_config['sasl.username'] = sasl_username
consumer_extra_config['sasl.password'] = sasl_password
if ssl_ca_location:
consumer_extra_config['ssl.ca.location'] = ssl_ca_location

# Convert consumer_poll_timeout to float if provided
if consumer_poll_timeout:
consumer_poll_timeout = float(consumer_poll_timeout)

# Create a Quix Application for the destination (Quix platform)
app = Application(
consumer_group=os.environ["CONSUMER_GROUP"],
)

# Create the KafkaReplicatorSource
source = KafkaReplicatorSource(
name="kafka-replicator",
app_config=app.config,
topic=source_topic,
broker_address=source_broker_address,
auto_offset_reset=auto_offset_reset,
consumer_extra_config=consumer_extra_config if consumer_extra_config else None,
consumer_poll_timeout=consumer_poll_timeout,
shutdown_timeout=shutdown_timeout,
value_deserializer=value_deserializer,
key_deserializer=key_deserializer,
)

output_topic = app.topic(output_topic_name)
app.add_source(source=source, topic=output_topic)


if __name__ == "__main__":
app.run()
2 changes: 2 additions & 0 deletions python/sources/kafka/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
quixstreams==3.23.1
python-dotenv
3 changes: 3 additions & 0 deletions tests/sources/kafka/data.jsonlines
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"_key": "test0", "_value": {"k0": "v0", "k1": "v1"}}
{"_key": "test1", "_value": {"k0": "v0", "k1": "v1"}}
{"_key": "test2", "_value": {"k0": "v0", "k1": "v1"}}
Loading