Skip to content

Commit

Permalink
Support Pipelines (#63)
Browse files Browse the repository at this point in the history
* pipeline draft

* intermediate results

* wrap all

* fix list tests

* fix JS client

* fix all tests

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* make logging optional and try longer for health check

* go back to old timeout setup (container was not compatible)

* bugfix (kwargs/args)

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* list extend using pipelines

* allow to specify max message size

* clean up handle errors

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix typo

* bump version

* test and fix self-trigger

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* bump JS version as well

* review

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
PythonFZ and pre-commit-ci[bot] authored Nov 7, 2024
1 parent 9f327f6 commit 5307bda
Show file tree
Hide file tree
Showing 10 changed files with 524 additions and 357 deletions.
60 changes: 33 additions & 27 deletions js/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,17 @@ export class Client {

lLen(key) {
return new Promise((resolve, reject) => {
this._socket.emit("llen", { name: key }, (data) => {
this._socket.emit("llen", [[key], {}], (data) => {
// Check if there is an error or invalid response and reject if necessary
resolve(data);
resolve(data["data"]);
});
});
}

lIndex(key, index) {
return new Promise((resolve, reject) => {
this._socket.emit("lindex", { name: key, index: index }, (data) => {
resolve(data || null);
this._socket.emit("lindex", [[key, index],{}], (data) => {
resolve(data["data"] || null);
});
});
}
Expand All @@ -62,7 +62,7 @@ export class Client {
return new Promise((resolve, reject) => {
this._socket.emit(
"lset",
{ name: key, index: index, value: value },
[[key, index, value], {}],
(data) => {
resolve("OK"); // TODO
},
Expand All @@ -74,7 +74,7 @@ export class Client {
return new Promise((resolve, reject) => {
this._socket.emit(
"lrem",
{ name: key, count: count, value: value },
[[key, count, value], {}],
(data) => {
resolve("OK"); // TODO
},
Expand All @@ -84,32 +84,38 @@ export class Client {

lRange(key, start, end) {
return new Promise((resolve, reject) => {
this._socket.emit("lrange", { name: key, start: start, end: end -1 }, (data) => {
resolve(data);
this._socket.emit(
"lrange",
[[key, start, end - 1], {}],
(data) => {
resolve(data["data"]);
});
});
}

rPush(key, value) {
return new Promise((resolve, reject) => {
this._socket.emit("rpush", { name: key, value: value }, (data) => {
this._socket.emit(
"rpush",
[[key, value], {}],
(data) => {
resolve("OK"); // TODO
});
});
}

lPush(key, value) {
return new Promise((resolve, reject) => {
this._socket.emit("lpush", { name: key, value: value }, (data) => {
this._socket.emit("lpush", [[key, value], {}], (data) => {
resolve("OK"); // TODO
});
});
}

hGet(key, field) {
return new Promise((resolve, reject) => {
this._socket.emit("hget", { name: key, key: field }, (data) => {
resolve(data || null);
this._socket.emit("hget", [[key, field], {}], (data) => {
resolve(data["data"] || null);
});
});
}
Expand All @@ -118,7 +124,7 @@ export class Client {
return new Promise((resolve, reject) => {
this._socket.emit(
"hset",
{ name: key, mapping: { [field]: value } },
[[], { name: key, mapping: { [field]: value } }],
(data) => {
resolve("OK"); // TODO
},
Expand All @@ -128,32 +134,32 @@ export class Client {

hMSet(key, mapping) {
return new Promise((resolve, reject) => {
this._socket.emit("hset", { name: key, mapping: mapping }, (data) => {
this._socket.emit("hset", [[key], {mapping: mapping}], (data) => {
resolve("OK"); // TODO
});
});
}

hDel(key, field) {
return new Promise((resolve, reject) => {
this._socket.emit("hdel", { name: key, key: field }, (data) => {
this._socket.emit("hdel", [[key, field], {}], (data) => {
resolve("OK"); // TODO
});
});
}

del(key) {
return new Promise((resolve, reject) => {
this._socket.emit("delete", { name: key }, (data) => {
this._socket.emit("delete", [[key],{}], (data) => {
resolve("OK"); // TODO
});
});
}

hExists(key, field) {
return new Promise((resolve, reject) => {
this._socket.emit("hexists", { name: key, key: field }, (data) => {
if (data === 1) {
this._socket.emit("hexists", [[key, field], {}], (data) => {
if (data["data"] === 1) {
resolve(true);
} else {
resolve(false);
Expand All @@ -164,39 +170,39 @@ export class Client {

hLen(key) {
return new Promise((resolve, reject) => {
this._socket.emit("hlen", { name: key }, (data) => {
resolve(data);
this._socket.emit("hlen", [[key],{}], (data) => {
resolve(data["data"]);
});
});
}

hKeys(key) {
return new Promise((resolve, reject) => {
this._socket.emit("hkeys", { name: key }, (data) => {
resolve(data);
this._socket.emit("hkeys", [[key],{}], (data) => {
resolve(data["data"]);
});
});
}

hVals(key) {
return new Promise((resolve, reject) => {
this._socket.emit("hvals", { name: key }, (data) => {
resolve(data);
this._socket.emit("hvals", [[key], {}], (data) => {
resolve(data["data"]);
});
});
}

hGetAll(key) {
return new Promise((resolve, reject) => {
this._socket.emit("hgetall", { name: key }, (data) => {
resolve(data);
this._socket.emit("hgetall", [[key], {}], (data) => {
resolve(data["data"]);
});
});
}

flushAll() {
return new Promise((resolve, reject) => {
this._socket.emit("flushall", {}, (data) => {
this._socket.emit("flushall", [[], {}], (data) => {
resolve("OK"); // TODO
});
});
Expand Down
2 changes: 2 additions & 0 deletions js_tests/native.list.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ test("native_list_slice", async () => {
await lst.push("item1");
await lst.push("item2");
await lst.push("item3");
// expect the full list
expect(await lst.slice(0, 3)).toEqual(["item1", "item2", "item3"]);

// Slice the list
let sliced = await lst.slice(0, 2);
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "znsocket",
"version": "0.2.4",
"version": "0.2.5",
"description": "JS interface for the python znsocket package",
"main": "js/index.js",
"types": "js/index.d.ts",
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "znsocket"
version = "0.2.4"
version = "0.2.5"
description = "Python implementation of a Redis-compatible API using websockets."
authors = ["Fabian Zills <[email protected]>"]
license = "Apache-2.0"
Expand Down
44 changes: 40 additions & 4 deletions tests/test_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,8 @@ def test_list_refresh_delitem(client, request, znsclient):
lst2.on_refresh(mock)
lst.extend([1, 2, 3])
znsclient.sio.sleep(0.01)
# assert mock called 3 times
assert mock.call_count == 3
# extend sends all at once
assert mock.call_count == 1
mock.reset_mock()

assert len(lst) == 3
Expand All @@ -423,8 +423,8 @@ def test_list_refresh_setitem(client, request, znsclient):
lst2.on_refresh(mock)
lst.extend([1, 2, 3])
znsclient.sio.sleep(0.01)
# assert mock called 3 times
assert mock.call_count == 3
# extend sends all at once
assert mock.call_count == 1
mock.reset_mock()

assert len(lst) == 3
Expand Down Expand Up @@ -504,3 +504,39 @@ def test_list_delete_empty(client, request):

with pytest.raises(IndexError):
del lst[0]


@pytest.mark.parametrize("client", ["znsclient", "znsclient_w_redis", "redisclient"])
def test_list_refresh_extend(client, request, znsclient):
r = request.getfixturevalue(client)
lst = znsocket.List(r=r, key="list:test", socket=znsclient)
lst2 = znsocket.List(
r=r, key="list:test", socket=znsocket.Client.from_url(znsclient.address)
)
mock = MagicMock()
lst2.on_refresh(mock)
assert len(lst) == 0
lst.extend([1, 2, 3])
znsclient.sio.sleep(0.01)
assert len(lst) == 3
mock.assert_called_once_with({"start": 0, "stop": None})

mock.reset_mock()
lst.extend([4, 5, 6])
znsclient.sio.sleep(0.01)
assert len(lst) == 6
mock.assert_called_once_with({"start": 3, "stop": None})


@pytest.mark.parametrize("client", ["znsclient", "znsclient_w_redis", "redisclient"])
def test_list_refresh_extend_self_trigger(client, request, znsclient):
r = request.getfixturevalue(client)
lst = znsocket.List(r=r, key="list:test", socket=znsclient)

mock = MagicMock()
lst.on_refresh(mock)
assert len(lst) == 0
lst.extend([1, 2, 3])
znsclient.sio.sleep(0.01)
assert len(lst) == 3
mock.assert_not_called()
Loading

0 comments on commit 5307bda

Please sign in to comment.