Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .github/workflows/test-samples.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
name: Test QuixStreams Samples

on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main, develop ]

jobs:
test:
runs-on: ubuntu-latest
timeout-minutes: 30

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'

- name: Run all tests in parallel
run: ./test.py test-all -p 4

- name: Cleanup Docker
if: always()
run: docker ps -q | xargs -r docker stop && docker system prune -f
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@ Destinations: the connectors to analytics dashboards, alerts, data warehouses an

Explore and deploy them using `quix app create` command of the [Quix CLI](https://quix.io/docs/quix-cli/cli-quickstart.html) or use them directly on [Quix Cloud](https://quix.io/docs/quix-cloud/quickstart.html).

## Testing

All samples include Docker Compose integration tests. Run tests using:

```bash
./test.py test sources/starter_source # Run specific test
./test.py test-all --parallel 3 # Run all tests
```

See [tests/README.md](tests/README.md) for complete testing documentation.

## Contributing

Contributors are very welcome at Quix! Follow this guide to get a project into production on the platform.
Expand Down
16 changes: 16 additions & 0 deletions python/destinations/MQTT/library.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,22 @@
"Description": "MQTT protocol version: 3.1, 3.1.1, 5",
"DefaultValue": "3.1.1",
"Required": true
},
{
"Name": "mqtt_tls_enabled",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "Enable TLS for MQTT connection (true/false)",
"DefaultValue": "true",
"Required": false
},
{
"Name": "consumer_group_name",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "Kafka consumer group name for the MQTT destination",
"DefaultValue": "mqtt_consumer_group",
"Required": false
}
],
"DeploySettings": {
Expand Down
26 changes: 19 additions & 7 deletions python/destinations/MQTT/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def mqtt_protocol_version():
return paho.MQTTv311

def configure_authentication(mqtt_client):
mqtt_username = os.getenv("mqtt_username", "")
mqtt_username = os.getenv("mqtt_username", "")
if mqtt_username != "":
mqtt_password = os.getenv("mqtt_password", "")
if mqtt_password == "":
Expand All @@ -33,21 +33,31 @@ def configure_authentication(mqtt_client):
print("Using anonymous authentication")

mqtt_port = os.environ["mqtt_port"]
mqtt_tls_enabled = os.getenv("mqtt_tls_enabled", "true").lower() == "true"

# Validate the config
if not mqtt_port.isnumeric():
raise ValueError('mqtt_port must be a numeric value')

client_id = os.getenv("Quix__Deployment__Id", "default")
mqtt_client = paho.Client(callback_api_version=paho.CallbackAPIVersion.VERSION2,
client_id = client_id, userdata = None, protocol = mqtt_protocol_version())
mqtt_client.tls_set(tls_version = mqtt.client.ssl.PROTOCOL_TLS) # we'll be using tls

if mqtt_tls_enabled:
print("TLS enabled")
mqtt_client.tls_set(tls_version = mqtt.client.ssl.PROTOCOL_TLS)
else:
print("TLS disabled")

mqtt_client.reconnect_delay_set(5, 60)
configure_authentication(mqtt_client)

# Create a Quix platform-specific application instead
app = Application(consumer_group="mqtt_consumer_group", auto_offset_reset='earliest')
consumer_group = os.getenv("consumer_group_name", "mqtt_consumer_group")
app = Application(consumer_group=consumer_group, auto_offset_reset='earliest')
# initialize the topic, this will combine the topic name with the environment details to produce a valid topic identifier
input_topic = app.topic(os.environ["input"])
input_topic_name = os.environ["input"]
input_topic = app.topic(input_topic_name)

# setting callbacks for different events to see if it works, print the message etc.
def on_connect_cb(client: paho.Client, userdata: any, connect_flags: paho.ConnectFlags,
Expand Down Expand Up @@ -77,8 +87,10 @@ def on_disconnect_cb(client: paho.Client, userdata: any, disconnect_flags: paho.
def publish_to_mqtt(data, key, timestamp, headers):
json_data = json.dumps(data)
message_key_string = key.decode('utf-8') # Convert to string using utf-8 encoding
# publish to MQTT
mqtt_client.publish(mqtt_topic_root + "/" + message_key_string, payload = json_data, qos = 1)
mqtt_topic = mqtt_topic_root + "/" + message_key_string
# publish to MQTT with retain=True so messages are available for late subscribers
mqtt_client.publish(mqtt_topic, payload = json_data, qos = 1, retain=True)
return data

sdf = sdf.apply(publish_to_mqtt, metadata=True)

Expand All @@ -88,7 +100,7 @@ def publish_to_mqtt(data, key, timestamp, headers):

print("Starting application")
# run the data processing pipeline
app.run(sdf)
app.run()

# stop handling MQTT messages
mqtt_client.loop_stop()
Expand Down
2 changes: 1 addition & 1 deletion python/destinations/MQTT/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
quixstreams==2.9.0
quixstreams==3.23.1
paho-mqtt==2.1.0
python-dotenv
54 changes: 35 additions & 19 deletions python/destinations/TDengine/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ def _get_tdengine_subtable_name(
combined tag values, ex. `host1__USA` for tags (hostname, region).
"""
if not _as_bool("TDENGINE_NAME_SUBTABLES_FROM_TAGS"):
# uses names auto-generated by TDengine
return lambda row: None
# uses names auto-generated by TDengine (via hash)
return lambda row: ""

def _subtable_name(row: dict):
_tags_keys = tags_keys(row) if callable(tags_keys) else tags_keys
Expand All @@ -64,23 +64,39 @@ def _subtable_name(row: dict):
time_setter: Optional[TimeSetter] = col if (col := os.getenv("TIMESTAMP_COLUMN")) else None


tdengine_sink = TDengineSink(
host=os.environ["TDENGINE_HOST"],
database=os.environ["TDENGINE_DATABASE"],
token=os.environ["TDENGINE_TOKEN"],
supertable=supertable,
subtable=subtable,
fields_keys=fields_keys,
tags_keys=tags_keys,
time_setter=time_setter,
time_precision=os.environ["TDENGINE_TIME_PRECISION"],
allow_missing_fields=_as_bool("TDENGINE_ALLOW_MISSING_FIELDS"),
include_metadata_tags=_as_bool("TDENGINE_INCLUDE_METADATA_TAGS"),
convert_ints_to_floats=_as_bool("TDENGINE_CONVERT_INTS_TO_FLOATS"),
enable_gzip=_as_bool("TDENGINE_ENABLE_GZIP"),
max_retries=_as_int("TDENGINE_MAX_RETRIES", 5),
retry_backoff_factor=_as_float("TDENGINE_RETRY_BACKOFF_FACTOR", 1.0)
)
# Authentication: supports both token (Cloud) and username/password (On-Premise)
token = os.getenv("TDENGINE_TOKEN", "")
username = os.getenv("TDENGINE_USERNAME", "")
password = os.getenv("TDENGINE_PASSWORD", "")

# Build sink kwargs based on available auth credentials
sink_kwargs = {
"host": os.environ["TDENGINE_HOST"],
"database": os.environ["TDENGINE_DATABASE"],
"supertable": supertable,
"subtable": subtable,
"fields_keys": fields_keys,
"tags_keys": tags_keys,
"time_setter": time_setter,
"time_precision": os.environ["TDENGINE_TIME_PRECISION"],
"allow_missing_fields": _as_bool("TDENGINE_ALLOW_MISSING_FIELDS"),
"include_metadata_tags": _as_bool("TDENGINE_INCLUDE_METADATA_TAGS"),
"convert_ints_to_floats": _as_bool("TDENGINE_CONVERT_INTS_TO_FLOATS"),
"enable_gzip": _as_bool("TDENGINE_ENABLE_GZIP"),
"max_retries": _as_int("TDENGINE_MAX_RETRIES", 5),
"retry_backoff_factor": _as_float("TDENGINE_RETRY_BACKOFF_FACTOR", 1.0)
}

# Add auth credentials
if token:
sink_kwargs["token"] = token
elif username and password:
sink_kwargs["username"] = username
sink_kwargs["password"] = password
else:
raise ValueError("Either TDENGINE_TOKEN (for Cloud) or TDENGINE_USERNAME and TDENGINE_PASSWORD (for On-Premise) must be provided")

tdengine_sink = TDengineSink(**sink_kwargs)


app = Application(
Expand Down
2 changes: 1 addition & 1 deletion python/destinations/TDengine/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
quixstreams[tdengine]==3.22.0
quixstreams[tdengine]==3.23.1
python-dotenv
2 changes: 1 addition & 1 deletion python/destinations/big_query/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@
sdf.sink(big_query_sink)

if __name__ == "__main__":
app.run(sdf)
app.run()
2 changes: 1 addition & 1 deletion python/destinations/big_query/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
--extra-index-url https://pkgs.dev.azure.com/quix-analytics/53f7fe95-59fe-4307-b479-2473b96de6d1/_packaging/public/pypi/simple/
quixstreams==2.9.0a
quixstreams==3.23.1
google-cloud-bigquery
python-dotenv
8 changes: 8 additions & 0 deletions python/destinations/confluent_kafka/library.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@
"Description": "Obtained from the Confluent Kafka portal",
"DefaultValue": "",
"Required": true
},
{
"Name": "consumer_group_name",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "Kafka consumer group name for the Confluent Kafka destination",
"DefaultValue": "kafka-connector-consumer-group",
"Required": false
}
],
"DeploySettings": {
Expand Down
7 changes: 4 additions & 3 deletions python/destinations/confluent_kafka/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from quixstreams.app import Application
from quixstreams import Application
from quixstreams import message_context
from quixstreams.kafka.producer import Producer
import os
Expand Down Expand Up @@ -35,7 +35,8 @@
producer = Producer(broker_address=broker_address, extra_config=sasl_config)

# Define your application
app = Application(consumer_group="kafka-connector-consumer-group",
consumer_group = os.getenv("consumer_group_name", "kafka-connector-consumer-group")
app = Application(consumer_group=consumer_group,
auto_offset_reset="earliest")

# Define the input and output topics
Expand Down Expand Up @@ -71,4 +72,4 @@ def produce_to_kafka(value):
sdf = sdf.update(produce_to_kafka)

# Run the application to start consuming, processing, and producing data
app.run(dataframe=sdf)
app.run()
2 changes: 1 addition & 1 deletion python/destinations/confluent_kafka/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
quixstreams==2.9.0
quixstreams==3.23.1
python-dotenv
2 changes: 1 addition & 1 deletion python/destinations/elasticsearch/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,4 @@ def _as_optional_dict(value: Optional[str]) -> Optional[dict]:
# --------------- Run Application ---------------

if __name__ == "__main__":
app.run(sdf)
app.run()
2 changes: 1 addition & 1 deletion python/destinations/elasticsearch/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
quixstreams[elasticsearch]==3.14.1
quixstreams[elasticsearch]==3.23.1
python-dotenv
2 changes: 1 addition & 1 deletion python/destinations/flet-waveform/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
flet
quixstreams
quixstreams==3.23.1
python-dotenv
16 changes: 16 additions & 0 deletions python/destinations/hivemq/library.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,22 @@
"Description": "MQTT protocol version: 3.1, 3.1.1, 5",
"DefaultValue": "3.1.1",
"Required": true
},
{
"Name": "mqtt_tls_enabled",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "Enable TLS for MQTT connection (true/false)",
"DefaultValue": "true",
"Required": false
},
{
"Name": "consumer_group_name",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "Kafka consumer group name for the HiveMQ destination",
"DefaultValue": "mqtt_consumer_group",
"Required": false
}
],
"DeploySettings": {
Expand Down
20 changes: 15 additions & 5 deletions python/destinations/hivemq/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,28 @@ def configure_authentication(mqtt_client):
print("Using anonymous authentication")

mqtt_port = os.environ["mqtt_port"]
mqtt_tls_enabled = os.getenv("mqtt_tls_enabled", "true").lower() == "true"

# Validate the config
if not mqtt_port.isnumeric():
raise ValueError('mqtt_port must be a numeric value')

client_id = os.getenv("Quix__Deployment__Id", "default")
mqtt_client = paho.Client(callback_api_version=paho.CallbackAPIVersion.VERSION2,
client_id = client_id, userdata = None, protocol = mqtt_protocol_version())
mqtt_client.tls_set(tls_version = mqtt.client.ssl.PROTOCOL_TLS) # we'll be using tls

if mqtt_tls_enabled:
print("TLS enabled")
mqtt_client.tls_set(tls_version = mqtt.client.ssl.PROTOCOL_TLS)
else:
print("TLS disabled")

mqtt_client.reconnect_delay_set(5, 60)
configure_authentication(mqtt_client)

# Create a Quix platform-specific application instead
app = Application(consumer_group="mqtt_consumer_group", auto_offset_reset='earliest')
consumer_group = os.getenv("consumer_group_name", "mqtt_consumer_group")
app = Application(consumer_group=consumer_group, auto_offset_reset='earliest')
# initialize the topic, this will combine the topic name with the environment details to produce a valid topic identifier
input_topic = app.topic(os.environ["input"])

Expand Down Expand Up @@ -77,8 +86,9 @@ def on_disconnect_cb(client: paho.Client, userdata: any, disconnect_flags: paho.
def publish_to_mqtt(data, key, timestamp, headers):
json_data = json.dumps(data)
message_key_string = key.decode('utf-8') # Convert to string using utf-8 encoding
# publish to MQTT
mqtt_client.publish(mqtt_topic_root + "/" + message_key_string, payload = json_data, qos = 1)
# publish to MQTT with retain=True so messages are available for late subscribers
mqtt_client.publish(mqtt_topic_root + "/" + message_key_string, payload = json_data, qos = 1, retain=True)
return data

sdf = sdf.apply(publish_to_mqtt, metadata=True)

Expand All @@ -88,7 +98,7 @@ def publish_to_mqtt(data, key, timestamp, headers):

print("Starting application")
# run the data processing pipeline
app.run(sdf)
app.run()

# stop handling MQTT messages
mqtt_client.loop_stop()
Expand Down
2 changes: 1 addition & 1 deletion python/destinations/hivemq/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
quixstreams==2.9.0
quixstreams==3.23.1
paho-mqtt==2.1.0
python-dotenv
2 changes: 1 addition & 1 deletion python/destinations/influxdb_3/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
quixstreams[influxdb3]==3.16.1
quixstreams[influxdb3]==3.23.1
python-dotenv
2 changes: 1 addition & 1 deletion python/destinations/mongodb/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,4 @@ def document_matcher_env_parser() -> Optional[Callable[[SinkItem], MongoQueryFil
# --------------- Run Application ---------------

if __name__ == "__main__":
app.run(sdf)
app.run()
2 changes: 1 addition & 1 deletion python/destinations/mongodb/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
quixstreams[mongodb]==3.14.1
quixstreams[mongodb]==3.23.1
python-dotenv
2 changes: 1 addition & 1 deletion python/destinations/postgres/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,4 @@ def _as_iterable(env_var) -> list[str]:
sdf.sink(postgres_sink)

if __name__ == "__main__":
app.run(sdf)
app.run()
2 changes: 1 addition & 1 deletion python/destinations/postgres/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
quixstreams[postgresql]==3.20.0
quixstreams[postgresql]==3.23.1
python-dotenv
Loading