Skip to content

Commit 463c30d

Browse files
authored
[feat] PineconeGrpcFuture implements concurrent.futures.Future (#410)
## Problem `GRPCIndex` has long had limited and poorly documented support for async operations via the futures interface of the [`grpc`](https://grpc.github.io/grpc/python/grpc.html#grpc.UnaryUnaryMultiCallable.future) library. I've recently been trying to implement `query_namespaces` using these futures, and discovered that unfortunately the grpc futures implementation is not compatible with the `concurrent.futures` package in the standard library. This makes them pretty much useless for anything at all complicated because the grpc library doesn't provide any utils for synchronization or waiting. ## Solution A class called `PineconeGrpcFuture` was added in the past as a minimal wrapper around the [future](https://grpc.github.io/grpc/python/grpc.html#future-interfaces) that is emitted by `grpc`. These futures objects are used to represent asynchronous computation, and allow you to regisiter callbacks with `add_done_callback`. This is similar to calling `then()` on a javascript promise. The original purpose of our `PineconeGrpcFuture` wrapper class seems to have been to implement some basic (very basic) error mapping, but for this diff I decided to extend the class to implement the `concurrent.futures.Future` interface. This allows the instances of `PineconeGrpcFuture` to be used with `concurrent.futures.as_completed` and `concurrent.futures.wait` utilities, which makes them dramatically more ergonomic to deal with. Unfortunately the grpc future is not compatible with the `concurrent.future` package out of the box. For the unit tests of `PineconeGrpcFuture`, I had to make heavy use of mocking because all the various grpc classes are tightly coupled and can't be simply setup without performing actual network calls. This doesn't give me huge confidence it's actually working as expected, so as a sanity check I added some additional integration test coverage for `upsert`, `fetch`, and `delete` using `concurrent.futures.wait`. ## Type of Change - [x] Bug fix (non-breaking change which fixes an issue) - [x] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [x] This change requires a documentation update - [ ] Infrastructure change (CI configs, etc) - [ ] Non-code change (docs, etc) - [ ] None of the above: (explain here) ## Test Plan Added unit and integration tests
1 parent 36373a1 commit 463c30d

File tree

9 files changed

+885
-32
lines changed

9 files changed

+885
-32
lines changed

pinecone/grpc/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,20 +47,24 @@
4747
from .index_grpc import GRPCIndex
4848
from .pinecone import PineconeGRPC
4949
from .config import GRPCClientConfig
50+
from .future import PineconeGrpcFuture
5051

5152
from pinecone.core.grpc.protos.vector_service_pb2 import (
5253
Vector as GRPCVector,
5354
SparseValues as GRPCSparseValues,
5455
Vector,
5556
SparseValues,
57+
DeleteResponse as GRPCDeleteResponse,
5658
)
5759

5860
__all__ = [
5961
"GRPCIndex",
6062
"PineconeGRPC",
63+
"GRPCDeleteResponse",
6164
"GRPCClientConfig",
6265
"GRPCVector",
6366
"GRPCSparseValues",
6467
"Vector",
6568
"SparseValues",
69+
"PineconeGrpcFuture",
6670
]

pinecone/grpc/future.py

Lines changed: 73 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,86 @@
1-
from grpc._channel import _MultiThreadedRendezvous
1+
from concurrent.futures import Future as ConcurrentFuture
2+
from typing import Optional
3+
from grpc import Future as GrpcFuture, RpcError
24
from pinecone.exceptions.exceptions import PineconeException
35

46

5-
class PineconeGrpcFuture:
6-
def __init__(self, delegate):
7-
self._delegate = delegate
7+
class PineconeGrpcFuture(ConcurrentFuture):
8+
def __init__(
9+
self, grpc_future: GrpcFuture, timeout: Optional[int] = None, result_transformer=None
10+
):
11+
super().__init__()
12+
self._grpc_future = grpc_future
13+
self._result_transformer = result_transformer
14+
if timeout is not None:
15+
self._default_timeout = timeout # seconds
16+
else:
17+
self._default_timeout = 5 # seconds
818

9-
def cancel(self):
10-
return self._delegate.cancel()
19+
# Sync initial state, in case the gRPC future is already done
20+
self._sync_state(self._grpc_future)
1121

12-
def cancelled(self):
13-
return self._delegate.cancelled()
22+
# Add callback to subscribe to updates from the gRPC future
23+
self._grpc_future.add_done_callback(self._sync_state)
1424

15-
def running(self):
16-
return self._delegate.running()
25+
@property
26+
def grpc_future(self):
27+
return self._grpc_future
1728

18-
def done(self):
19-
return self._delegate.done()
29+
def _sync_state(self, grpc_future):
30+
if self.done():
31+
return
2032

21-
def add_done_callback(self, fun):
22-
return self._delegate.add_done_callback(fun)
33+
if grpc_future.cancelled():
34+
self.cancel()
35+
elif grpc_future.exception(timeout=self._default_timeout):
36+
self.set_exception(grpc_future.exception())
37+
elif grpc_future.done():
38+
try:
39+
result = grpc_future.result(timeout=self._default_timeout)
40+
self.set_result(result)
41+
except Exception as e:
42+
self.set_exception(e)
43+
elif grpc_future.running():
44+
self.set_running_or_notify_cancel()
2345

24-
def result(self, timeout=None):
25-
try:
26-
return self._delegate.result(timeout=timeout)
27-
except _MultiThreadedRendezvous as e:
28-
raise PineconeException(e._state.debug_error_string) from e
46+
def set_result(self, result):
47+
if self._result_transformer:
48+
result = self._result_transformer(result)
49+
return super().set_result(result)
50+
51+
def cancel(self):
52+
self._grpc_future.cancel()
53+
return super().cancel()
2954

3055
def exception(self, timeout=None):
31-
return self._delegate.exception(timeout=timeout)
56+
exception = super().exception(timeout=self._timeout(timeout))
57+
if isinstance(exception, RpcError):
58+
return self._wrap_rpc_exception(exception)
59+
return exception
3260

3361
def traceback(self, timeout=None):
34-
return self._delegate.traceback(timeout=timeout)
62+
# This is not part of the ConcurrentFuture interface, but keeping it for
63+
# backward compatibility
64+
return self._grpc_future.traceback(timeout=self._timeout(timeout))
65+
66+
def result(self, timeout=None):
67+
try:
68+
return super().result(timeout=self._timeout(timeout))
69+
except RpcError as e:
70+
raise self._wrap_rpc_exception(e) from e
71+
72+
def _timeout(self, timeout: Optional[int] = None) -> int:
73+
if timeout is not None:
74+
return timeout
75+
else:
76+
return self._default_timeout
77+
78+
def _wrap_rpc_exception(self, e):
79+
if e._state and e._state.debug_error_string:
80+
return PineconeException(e._state.debug_error_string)
81+
else:
82+
return PineconeException("Unknown GRPC error")
83+
84+
def __del__(self):
85+
self._grpc_future.cancel()
86+
self = None # release the reference to the grpc future

pinecone/grpc/index_grpc.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -282,8 +282,12 @@ def delete(
282282
return self.runner.run(self.stub.Delete, request, timeout=timeout)
283283

284284
def fetch(
285-
self, ids: Optional[List[str]], namespace: Optional[str] = None, **kwargs
286-
) -> FetchResponse:
285+
self,
286+
ids: Optional[List[str]],
287+
namespace: Optional[str] = None,
288+
async_req: Optional[bool] = False,
289+
**kwargs,
290+
) -> Union[FetchResponse, PineconeGrpcFuture]:
287291
"""
288292
The fetch operation looks up and returns vectors, by ID, from a single namespace.
289293
The returned vectors include the vector data and/or metadata.
@@ -304,9 +308,13 @@ def fetch(
304308
args_dict = self._parse_non_empty_args([("namespace", namespace)])
305309

306310
request = FetchRequest(ids=ids, **args_dict, **kwargs)
307-
response = self.runner.run(self.stub.Fetch, request, timeout=timeout)
308-
json_response = json_format.MessageToDict(response)
309-
return parse_fetch_response(json_response)
311+
312+
if async_req:
313+
future = self.runner.run(self.stub.Fetch.future, request, timeout=timeout)
314+
return PineconeGrpcFuture(future, result_transformer=parse_fetch_response)
315+
else:
316+
response = self.runner.run(self.stub.Fetch, request, timeout=timeout)
317+
return parse_fetch_response(response)
310318

311319
def query(
312320
self,

pinecone/grpc/utils.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
from typing import Optional
2+
from google.protobuf import json_format
3+
from google.protobuf.message import Message
4+
25
import uuid
36

47
from pinecone.core.openapi.data.models import (
@@ -35,10 +38,12 @@ def parse_sparse_values(sparse_values: dict):
3538
)
3639

3740

38-
def parse_fetch_response(response: dict):
41+
def parse_fetch_response(response: Message):
42+
json_response = json_format.MessageToDict(response)
43+
3944
vd = {}
40-
vectors = response.get("vectors", {})
41-
namespace = response.get("namespace", "")
45+
vectors = json_response.get("vectors", {})
46+
namespace = json_response.get("namespace", "")
4247

4348
for id, vec in vectors.items():
4449
vd[id] = _Vector(
@@ -52,7 +57,7 @@ def parse_fetch_response(response: dict):
5257
return FetchResponse(
5358
vectors=vd,
5459
namespace=namespace,
55-
usage=parse_usage(response.get("usage", {})),
60+
usage=parse_usage(json_response.get("usage", {})),
5661
_check_type=False,
5762
)
5863

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import os
2+
import pytest
3+
from pinecone import Vector
4+
from ..helpers import poll_stats_for_namespace, random_string
5+
6+
if os.environ.get("USE_GRPC") == "true":
7+
from pinecone.grpc import GRPCDeleteResponse
8+
9+
10+
class TestDeleteFuture:
11+
@pytest.mark.skipif(
12+
os.getenv("USE_GRPC") != "true", reason="PineconeGrpcFutures only returned from grpc client"
13+
)
14+
def test_delete_future(self, idx):
15+
namespace = random_string(10)
16+
17+
idx.upsert(
18+
vectors=[
19+
Vector(id="id1", values=[0.1, 0.2]),
20+
Vector(id="id2", values=[0.1, 0.2]),
21+
Vector(id="id3", values=[0.1, 0.2]),
22+
],
23+
namespace=namespace,
24+
)
25+
poll_stats_for_namespace(idx, namespace, 3)
26+
27+
delete_one = idx.delete(ids=["id1"], namespace=namespace, async_req=True)
28+
delete_namespace = idx.delete(namespace=namespace, delete_all=True, async_req=True)
29+
30+
from concurrent.futures import as_completed
31+
32+
for future in as_completed([delete_one, delete_namespace], timeout=10):
33+
resp = future.result()
34+
assert isinstance(resp, GRPCDeleteResponse)
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
import os
2+
import pytest
3+
4+
if os.environ.get("USE_GRPC") == "true":
5+
from pinecone.grpc import PineconeGrpcFuture
6+
7+
8+
@pytest.mark.skipif(
9+
os.getenv("USE_GRPC") != "true", reason="PineconeGrpcFutures only returned from grpc client"
10+
)
11+
class TestFetchFuture:
12+
def setup_method(self):
13+
self.expected_dimension = 2
14+
15+
def test_fetch_multiple_by_id(self, idx, namespace):
16+
target_namespace = namespace
17+
18+
results = idx.fetch(ids=["1", "2", "4"], namespace=target_namespace, async_req=True)
19+
assert isinstance(results, PineconeGrpcFuture)
20+
21+
from concurrent.futures import wait, FIRST_COMPLETED
22+
23+
done, _ = wait([results], return_when=FIRST_COMPLETED)
24+
25+
results = done.pop().result()
26+
assert results.usage is not None
27+
assert results.usage["read_units"] is not None
28+
assert results.usage["read_units"] > 0
29+
30+
assert results.namespace == target_namespace
31+
assert len(results.vectors) == 3
32+
assert results.vectors["1"].id == "1"
33+
assert results.vectors["2"].id == "2"
34+
# Metadata included, if set
35+
assert results.vectors["1"].metadata is None
36+
assert results.vectors["2"].metadata is None
37+
assert results.vectors["4"].metadata is not None
38+
assert results.vectors["4"].metadata["genre"] == "action"
39+
assert results.vectors["4"].metadata["runtime"] == 120
40+
# Values included
41+
assert results.vectors["1"].values is not None
42+
assert len(results.vectors["1"].values) == self.expected_dimension
43+
44+
def test_fetch_single_by_id(self, idx, namespace):
45+
target_namespace = namespace
46+
47+
future = idx.fetch(ids=["1"], namespace=target_namespace, async_req=True)
48+
49+
from concurrent.futures import wait, FIRST_COMPLETED
50+
51+
done, _ = wait([future], return_when=FIRST_COMPLETED)
52+
results = done.pop().result()
53+
54+
assert results.namespace == target_namespace
55+
assert len(results.vectors) == 1
56+
assert results.vectors["1"].id == "1"
57+
assert results.vectors["1"].metadata is None
58+
assert results.vectors["1"].values is not None
59+
assert len(results.vectors["1"].values) == self.expected_dimension
60+
61+
def test_fetch_nonexistent_id(self, idx, namespace):
62+
target_namespace = namespace
63+
64+
# Fetch id that is missing
65+
future = idx.fetch(ids=["100"], namespace=target_namespace, async_req=True)
66+
67+
from concurrent.futures import wait, FIRST_COMPLETED
68+
69+
done, _ = wait([future], return_when=FIRST_COMPLETED)
70+
results = done.pop().result()
71+
72+
assert results.namespace == target_namespace
73+
assert len(results.vectors) == 0
74+
75+
def test_fetch_nonexistent_namespace(self, idx):
76+
target_namespace = "nonexistent-namespace"
77+
78+
# Fetch from namespace with no vectors
79+
future = idx.fetch(ids=["1"], namespace=target_namespace, async_req=True)
80+
81+
from concurrent.futures import wait, FIRST_COMPLETED
82+
83+
done, _ = wait([future], return_when=FIRST_COMPLETED)
84+
results = done.pop().result()
85+
86+
assert results.namespace == target_namespace
87+
assert len(results.vectors) == 0
88+
89+
def test_fetch_unspecified_namespace(self, idx):
90+
# Fetch without specifying namespace gives default namespace results
91+
future = idx.fetch(ids=["1", "4"], async_req=True)
92+
93+
from concurrent.futures import wait, FIRST_COMPLETED
94+
95+
done, _ = wait([future], return_when=FIRST_COMPLETED)
96+
results = done.pop().result()
97+
98+
assert results.namespace == ""
99+
assert results.vectors["1"].id == "1"
100+
assert results.vectors["1"].values is not None
101+
assert results.vectors["4"].metadata is not None

0 commit comments

Comments
 (0)