Skip to content

Commit

Permalink
do not use json dump but max_commands_per_call
Browse files Browse the repository at this point in the history
  • Loading branch information
PythonFZ committed Nov 7, 2024
1 parent 2758a29 commit 5e34aee
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 13 deletions.
2 changes: 1 addition & 1 deletion tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def test_set_none(client, request):
@pytest.mark.parametrize("client", ["znsclient", "znsclient_w_redis"])
def test_set_large_message(client, request, caplog):
c = request.getfixturevalue(client)
pipeline = c.pipeline(max_message_size=3000)
pipeline = c.pipeline(max_commands_per_call=75)
for _ in range(100):
pipeline.set("foo", "bar")

Expand Down
31 changes: 19 additions & 12 deletions znsocket/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,20 @@ def hmset(self, name, mapping):

@dataclasses.dataclass
class Pipeline:
"""A pipeline of Redis commands to be executed as a batch on the server.
Arguments
---------
client : Client
The client to send the pipeline to.
max_commands_per_call : int
The maximum number of commands to send in a single call to the server.
Decrease this number for large commands to avoid hitting the message size limit.
Increase it for small commands to reduce latency.
"""
client: Client
max_message_size: t.Optional[int] = 10 * 1024 * 1024
pipeline: list = dataclasses.field(default_factory=list)
max_commands_per_call: int = 1_000_000
pipeline: list = dataclasses.field(default_factory=list, init=False)

def _add_to_pipeline(self, command, *args, **kwargs):
"""Generic handler to add Redis commands to the pipeline."""
Expand Down Expand Up @@ -161,16 +172,12 @@ def execute(self):
results = []
for idx, entry in enumerate(self.pipeline):
message.append(entry)
if self.max_message_size is not None:
msg_size = json.dumps(message).__sizeof__()
if msg_size > self.max_message_size:
warnings.warn(
f"Message size '{msg_size}' is greater than"
f" '{self.max_message_size = }'. Sending message"
f" at index {idx} and continuing."
)
results.extend(self._send_message(message))
message = []
if len(message) >= self.max_commands_per_call:
warnings.warn(
f"splitting message at index {idx} due to max_message_chunk",
)
results.extend(self._send_message(message))
message = []
if message:
results.extend(self._send_message(message))

Expand Down

0 comments on commit 5e34aee

Please sign in to comment.