Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add RedisPydanticStream backend #150

Closed
wants to merge 4 commits into from

Conversation

random-things
Copy link

Provides a new backend--RedisPydanticStreamBackend--derived from RedisStreamBackend. Instead of a message: str being passed to publish, it takes a Pydantic BaseModel. On the other side, it deserializes the object back into a Pydantic model. Now that I'm reviewing this, I think it'd be beneficial to add the following to publish:

    async def publish(self: typing.Self, channel: str, message: BaseModel) -> None:
        """Publish a message to a channel."""
        msg_type: str = message.__class__.__name__
+
+        if msg_type not in self._module_cache:
+            self._module_cache[msg_type] = message.__class__
+
        message_json: str = message.model_dump_json()
        await self._producer.xadd(channel, {"msg_type": msg_type, "message": message_json})

But it would only be useful in rare cases (dynamically created models or models at lower-than-module scope) or specifically in the test case, dropping the import and class declaration into the test itself.

@alex-oleshkevich
Copy link
Member

We do not accept any new backends. Please make it as a gist or separate package and link it in the readme.
#118

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants