Skip to content

Commit a4a292e

Browse files
committed
working coord client plus not working lock/ conenction problemsg
1 parent 96c0ff1 commit a4a292e

File tree

5 files changed

+228
-20
lines changed

5 files changed

+228
-20
lines changed

ydb/coordination/client.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import ydb
2+
from ydb import operation
3+
4+
from ydb._grpc.v5.ydb_coordination_v1_pb2_grpc import CoordinationServiceStub
5+
from ydb._grpc.v5.protos import ydb_coordination_pb2
6+
from ydb.coordination.coordination_lock import CoordinationLock
7+
from ydb.coordination.сoordination_session import CoordinationSession
8+
9+
10+
class CoordinationClient:
11+
def __init__(self, driver: "ydb.Driver"):
12+
self._driver = driver
13+
14+
def session(self):
15+
return CoordinationSession(self._driver)
16+
17+
18+
def create_node(self, path: str, config=None, operation_params=None):
19+
request = ydb_coordination_pb2.CreateNodeRequest(
20+
path=path,
21+
config=config,
22+
operation_params=operation_params,
23+
)
24+
return self._driver(
25+
request,
26+
CoordinationServiceStub,
27+
"CreateNode",
28+
operation.Operation,
29+
)
30+
31+
def describe_node(self, path: str, operation_params=None):
32+
request = ydb_coordination_pb2.DescribeNodeRequest(
33+
path=path,
34+
operation_params=operation_params,
35+
)
36+
return self._driver(
37+
request,
38+
CoordinationServiceStub,
39+
"DescribeNode",
40+
operation.Operation,
41+
)
42+
43+
def delete_node(self, path: str, operation_params=None):
44+
request = ydb_coordination_pb2.DropNodeRequest(
45+
path=path,
46+
operation_params=operation_params,
47+
)
48+
return self._driver(
49+
request,
50+
CoordinationServiceStub,
51+
"DropNode",
52+
operation.Operation,
53+
)
54+
55+
def lock(self, path: str, timeout: int = 5000, count: int = 1):
56+
return CoordinationLock(self.session(), path, timeout, count)
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from ydb.coordination.сoordination_session import CoordinationSession
2+
3+
4+
class CoordinationLock:
5+
def __init__(self, session: CoordinationSession, path: str, timeout: int = 5000, count: int = 1):
6+
self._session = session
7+
self._path = path
8+
self._timeout = timeout
9+
self._count = count
10+
11+
def acquire(self):
12+
self._session.acquire_semaphore(self._path, self._count, self._timeout)
13+
14+
def release(self):
15+
self._session.release_semaphore(self._path)
16+
17+
def __enter__(self):
18+
self.acquire()
19+
return self
20+
21+
def __exit__(self, exc_type, exc_val, exc_tb):
22+
self.release()

ydb/coordination/exceptions.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
2+
class CoordinationError(Exception):
3+
"""Базовое исключение для всех ошибок координации."""
4+
5+
6+
class NodeAlreadyExists(CoordinationError):
7+
"""Узел координации уже существует."""
8+
9+
10+
class NodeNotFound(CoordinationError):
11+
"""Узел координации не найден."""
12+
13+
14+
class NodeLocked(CoordinationError):
15+
"""Узел координации уже захвачен."""
16+
17+
18+
class NodeTimeout(CoordinationError):
19+
"""Истекло время ожидания при захвате узла."""
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
import pytest
2+
import ydb
3+
from ydb.coordination.client import CoordinationClient
4+
import time
5+
6+
7+
8+
@pytest.mark.integration
9+
def test_coordination_client_local():
10+
driver_config = ydb.DriverConfig(
11+
endpoint="grpc://localhost:2136",
12+
database="/local",
13+
)
14+
15+
with ydb.Driver(driver_config) as driver:
16+
for _ in range(10):
17+
try:
18+
driver.wait(timeout=5)
19+
break
20+
except Exception:
21+
time.sleep(1)
22+
23+
24+
scheme = ydb.SchemeClient(driver)
25+
base_path = "/local/coordination"
26+
27+
try:
28+
scheme.describe_path(base_path)
29+
except ydb.issues.SchemeError:
30+
scheme.make_directory(base_path)
31+
desc = scheme.describe_path(base_path)
32+
assert desc is not None, f"Directory {base_path} was not created"
33+
34+
35+
node_path = f"{base_path}/test_node"
36+
37+
client = CoordinationClient(driver)
38+
39+
40+
create_future = client.create_node(path=node_path)
41+
assert create_future is not None
42+
43+
44+
res_desc = client.describe_node(path=node_path)
45+
assert res_desc is not None
46+
47+
48+
res_delete = client.delete_node(path=node_path)
49+
assert res_delete is not None
50+
51+
@pytest.mark.integration
52+
def test_coordination_lock_lifecycle():
53+
driver_config = ydb.DriverConfig(
54+
endpoint="grpc://localhost:2136",
55+
database="/local",
56+
)
57+
58+
with ydb.Driver(driver_config) as driver:
59+
for _ in range(10):
60+
try:
61+
driver.wait(timeout=5)
62+
break
63+
except Exception:
64+
time.sleep(1)
65+
66+
scheme = driver.scheme_client
67+
base_path = "/local/coordination"
68+
try:
69+
scheme.describe_path(base_path)
70+
except ydb.SchemeError:
71+
scheme.make_directory(base_path)
72+
desc = scheme.describe_path(base_path)
73+
assert desc is not None, f"Directory {base_path} was not created"
74+
75+
# создаём client
76+
client = CoordinationClient(driver)
77+
78+
lock_path = f"{base_path}/test_lock"
79+
80+
81+
with client.lock(lock_path, timeout=2000, count=1) as lock:
82+
assert lock._session_id is not None, "Lock должен иметь session_id после acquire"
83+
84+
85+
sem_state = client.describe_node(lock_path)
86+
assert sem_state is not None, "Семафор должен существовать после acquire"
87+
88+
89+
assert lock._session_id is None, "Lock должен быть освобождён после выхода из with"
90+
91+
92+
sem_state_after = client.describe_node(lock_path)
93+
assert sem_state_after is not None, "Семафор всё ещё существует"
Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,59 @@
11
import time
22

3+
import ydb
34
from ydb._grpc.v5.protos.ydb_coordination_pb2 import SessionRequest
45
from ydb._grpc.v5.ydb_coordination_v1_pb2_grpc import CoordinationServiceStub
6+
from ydb._utilities import SyncResponseIterator
57

68

79
class CoordinationSession:
810
def __init__(self, driver: "ydb.Driver"):
911
self._driver = driver
10-
11-
def acquire_semaphore(self, name: str, count: int = 1, timeout: int = 5000):
12-
req = SessionRequest(
12+
self._session_id = None
13+
14+
def _ensure_session(self):
15+
if self._session_id is None:
16+
req = SessionRequest(session_start=SessionRequest.SessionStart())
17+
stream_it = self._driver(
18+
req,
19+
CoordinationServiceStub,
20+
"Session",
21+
)
22+
iterator = SyncResponseIterator(stream_it, lambda resp: resp)
23+
first_resp = next(iterator)
24+
self._session_id = first_resp.session_started.session_id
25+
return self._session_id
26+
27+
def acquire_semaphore(self, path: str, count: int = 1, timeout_millis: int = 5000):
28+
session_id = self._ensure_session()
29+
acquire_req = SessionRequest(
1330
acquire_semaphore=SessionRequest.AcquireSemaphore(
14-
name=name,
31+
name=path,
1532
count=count,
16-
timeout_millis=timeout,
33+
timeout_millis=timeout_millis,
1734
req_id=int(time.time() * 1000),
1835
data=b"",
1936
ephemeral=True,
2037
),
21-
session_start=SessionRequest.SessionStart()
38+
session_start=SessionRequest.SessionStart(session_id=session_id)
2239
)
23-
24-
res_iter = self._driver(req, CoordinationServiceStub, "Session")
25-
res = next(res_iter)
26-
acquire_result = getattr(res, "acquire_semaphore_result", None)
27-
28-
if not acquire_result or not acquire_result.acquired:
29-
raise RuntimeError(f"Failed to acquire semaphore {name}")
30-
31-
return res.session_started.session_id
32-
33-
def release_semaphore(self, name: str, session_id: int):
34-
req = SessionRequest(
40+
stream_it = self._driver(acquire_req, CoordinationServiceStub, "Session")
41+
iterator = SyncResponseIterator(stream_it, lambda resp: resp)
42+
resp = next(iterator)
43+
result = getattr(resp, "acquire_semaphore_result", None)
44+
if not result or not result.acquired:
45+
raise RuntimeError(f"Failed to acquire semaphore {path}")
46+
47+
def release_semaphore(self, path: str):
48+
if self._session_id is None:
49+
return
50+
release_req = SessionRequest(
3551
release_semaphore=SessionRequest.ReleaseSemaphore(
36-
name=name,
52+
name=path,
3753
req_id=int(time.time() * 1000),
3854
),
3955
session_stop=SessionRequest.SessionStop()
4056
)
41-
self._driver(req, CoordinationServiceStub, "Session")
57+
stream_it = self._driver(release_req, CoordinationServiceStub, "Session")
58+
SyncResponseIterator(stream_it, lambda resp: resp)
59+
self._session_id = None

0 commit comments

Comments
 (0)