Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MQTT Sink #659

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open

Add MQTT Sink #659

wants to merge 5 commits into from

Conversation

SteveRosam
Copy link
Contributor

@SteveRosam SteveRosam commented Nov 27, 2024

Added MQTT connector
Added tests
Updated pyproject.toml
Updated tests/requirement.txt

Resolves #658

@daniil-quix daniil-quix added the connector Issues updating Sinks or Sources label Nov 28, 2024
Comment on lines +24 to +32
mqtt_client_id: str,
mqtt_server: str,
mqtt_port: int,
mqtt_topic_root: str,
mqtt_username: str = None,
mqtt_password: str = None,
mqtt_version: str = "3.1.1",
tls_enabled: bool = True,
qos: int = 1,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
mqtt_client_id: str,
mqtt_server: str,
mqtt_port: int,
mqtt_topic_root: str,
mqtt_username: str = None,
mqtt_password: str = None,
mqtt_version: str = "3.1.1",
tls_enabled: bool = True,
qos: int = 1,
client_id: str,
server: str,
port: int,
topic_root: str,
username: str = None,
password: str = None,
version: str = "3.1.1",
tls_enabled: bool = True,
qos: int = 1,

Copy link

@devova devova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SteveRosam I also need this sink and willing to contribute to your PR if you don't mind

if isinstance(data, bytes):
data = data.decode("utf-8") # Decode bytes to string using utf-8

json_data = json.dumps(data)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some sinks has concept of

value_serializer: Callable[[Any], str] = json.dumps,
key_serializer: Callable[[Any], str] = bytes.decode,

in its constructors. That is very handy, specially when you are dealing with non json data. I don't see a use case for key_serializer, but value_serializer is definitely something nice to have.

Comment on lines +131 to +133
self.mqtt_topic_root + "/" + message_key_string,
payload=json_data,
qos=self.qos,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to docs, it would be great to have a control over retain and properties arguments.
Ideally the constructor should accept either constant or callbacks

retain: bool | Callable[[Any], bool] = False,
properties: paho.mqtt.properties.Properties | Callable[[Any], paho.mqtt.properties.Properties] | None = None,

"utf-8"
) # Convert to string using utf-8 encoding
# publish to MQTT
self.mqtt_client.publish(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to add sync primitive here. Client.publish returns MQTTMessageInfo which has
wait_for_publish()

Suggested change
self.mqtt_client.publish(
self.message_infos.add(self.mqtt_client.publish(...))
def flush(self, topic: str, partition: str):
for message_info in self.message_infos:
message_info.wait_for_publish()

see quix-streams sink flush docs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
connector Issues updating Sinks or Sources
Projects
Status: No status
Development

Successfully merging this pull request may close these issues.

New Sink: MQTT
4 participants