Skip to content

Commit 8802eb4

Browse files
committed
chore: Initial implementation of FDv2 types and polling initializer
1 parent 24e74b0 commit 8802eb4

File tree

8 files changed

+1317
-24
lines changed

8 files changed

+1317
-24
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
"""
2+
This module houses FDv2 types and implementations of synchronizers and
3+
initializers for the datasystem.
4+
"""
5+
6+
__all__: list[str] = []

ldclient/impl/datasourcev2/polling.py

Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
"""
2+
Default implementation of the polling synchronizer and initializer.
3+
"""
4+
5+
import json
6+
from abc import abstractmethod
7+
from collections import namedtuple
8+
from collections.abc import Mapping
9+
from typing import Optional, Protocol, Tuple
10+
from urllib import parse
11+
12+
import urllib3
13+
14+
from ldclient.impl.datasystem.protocolv2 import (
15+
Basis,
16+
ChangeSet,
17+
ChangeSetBuilder,
18+
DeleteObject,
19+
EventName,
20+
IntentCode,
21+
PutObject,
22+
Selector,
23+
ServerIntent,
24+
)
25+
from ldclient.impl.http import _http_factory
26+
from ldclient.impl.repeating_task import RepeatingTask
27+
from ldclient.impl.util import (
28+
Result,
29+
UnsuccessfulResponseException,
30+
_Fail,
31+
_headers,
32+
_Result,
33+
_Success,
34+
http_error_message,
35+
is_http_error_recoverable,
36+
log,
37+
)
38+
39+
POLLING_ENDPOINT = "/sdk/poll"
40+
41+
PollingResult = _Result[Tuple[ChangeSet, Mapping], str]
42+
43+
44+
class PollingRequester(Protocol): # pylint: disable=too-few-public-methods
45+
"""
46+
PollingRequester allows PollingDataSource to delegate fetching data to
47+
another component.
48+
49+
This is useful for testing the PollingDataSource without needing to set up
50+
a test HTTP server.
51+
"""
52+
53+
@abstractmethod
54+
def fetch(self, selector: Optional[Selector]) -> PollingResult:
55+
"""
56+
Fetches the data for the given selector.
57+
Returns a Result containing a tuple of ChangeSet and any request headers,
58+
or an error if the data could not be retrieved.
59+
"""
60+
raise NotImplementedError
61+
62+
63+
CacheEntry = namedtuple("CacheEntry", ["data", "etag"])
64+
65+
66+
class PollingDataSource:
67+
"""
68+
PollingDataSource is a data source that can retrieve information from
69+
LaunchDarkly either as an Initializer or as a Synchronizer.
70+
"""
71+
72+
def __init__(
73+
self,
74+
poll_interval: float,
75+
requester: PollingRequester,
76+
):
77+
self._requester = requester
78+
self._task = RepeatingTask(
79+
"ldclient.datasource.polling", poll_interval, 0, self._poll
80+
)
81+
82+
def name(self) -> str:
83+
"""Returns the name of the initializer."""
84+
return "PollingDataSourceV2"
85+
86+
def fetch(self) -> Result: # Result[Basis]:
87+
"""
88+
Fetch returns a Basis, or an error if the Basis could not be retrieved.
89+
"""
90+
return self._poll()
91+
92+
# TODO(fdv2): This will need to be converted into a synchronizer at some point.
93+
# def start(self):
94+
# log.info(
95+
# "Starting PollingUpdateProcessor with request interval: "
96+
# + str(self._config.poll_interval)
97+
# )
98+
# self._task.start()
99+
100+
def _poll(self) -> Result: # Result[Basis]:
101+
try:
102+
# TODO(fdv2): Need to pass the selector through
103+
result = self._requester.fetch(None)
104+
105+
if isinstance(result, _Fail):
106+
if isinstance(result.exception, UnsuccessfulResponseException):
107+
status_code = result.exception.status
108+
http_error_message_result = http_error_message(
109+
status_code, "polling request"
110+
)
111+
if is_http_error_recoverable(status_code):
112+
log.warning(http_error_message_result)
113+
114+
return Result.fail(http_error_message_result, result.exception)
115+
116+
return Result.fail(
117+
result.error or "Failed to request payload", result.exception
118+
)
119+
120+
(change_set, headers) = result.value
121+
122+
env_id = headers.get("X-LD-EnvID")
123+
if not isinstance(env_id, str):
124+
env_id = None
125+
126+
basis = Basis(
127+
change_set=change_set,
128+
persist=change_set.selector is not None,
129+
environment_id=env_id,
130+
)
131+
132+
return Result.success(basis)
133+
except Exception as e:
134+
msg = f"Error: Exception encountered when updating flags. {e}"
135+
log.exception(msg)
136+
137+
return Result.fail(msg, e)
138+
139+
140+
# pylint: disable=too-few-public-methods
141+
class Urllib3PollingRequester:
142+
"""
143+
Urllib3PollingRequester is a PollingRequester that uses urllib3 to make HTTP requests.
144+
"""
145+
146+
def __init__(self, config):
147+
self._etag = None
148+
self._http = _http_factory(config).create_pool_manager(1, config.base_uri)
149+
self._config = config
150+
self._poll_uri = config.base_uri + POLLING_ENDPOINT
151+
152+
def fetch(self, selector: Optional[Selector]) -> PollingResult:
153+
"""
154+
Fetches the data for the given selector.
155+
Returns a Result containing a tuple of ChangeSet and any request headers,
156+
or an error if the data could not be retrieved.
157+
"""
158+
query_params = {}
159+
if self._config.payload_filter_key is not None:
160+
query_params["filter"] = self._config.payload_filter_key
161+
162+
if selector is not None:
163+
query_params["selector"] = selector.state
164+
165+
if len(query_params) > 0:
166+
filter_query = parse.urlencode(query_params)
167+
self._poll_uri += f"?{filter_query}"
168+
169+
uri = self._poll_uri
170+
hdrs = _headers(self._config)
171+
hdrs["Accept-Encoding"] = "gzip"
172+
173+
if self._etag is not None:
174+
hdrs["If-None-Match"] = self._etag
175+
176+
response = self._http.request(
177+
"GET",
178+
uri,
179+
headers=hdrs,
180+
timeout=urllib3.Timeout(
181+
connect=self._config.http.connect_timeout,
182+
read=self._config.http.read_timeout,
183+
),
184+
retries=1,
185+
)
186+
187+
if response.status >= 400:
188+
return _Fail(
189+
f"HTTP error {response}", UnsuccessfulResponseException(response.status)
190+
)
191+
192+
headers = response.headers
193+
194+
if response.status == 304:
195+
return _Success(value=(ChangeSetBuilder.no_changes(), headers))
196+
197+
data = json.loads(response.data.decode("UTF-8"))
198+
etag = headers.get("ETag")
199+
200+
if etag is not None:
201+
self._etag = etag
202+
203+
log.debug(
204+
"%s response status:[%d] ETag:[%s]",
205+
uri,
206+
response.status,
207+
etag,
208+
)
209+
210+
changeset_result = polling_payload_to_changeset(data)
211+
if isinstance(changeset_result, _Success):
212+
return _Success(value=(changeset_result.value, headers))
213+
214+
return _Fail(
215+
error=changeset_result.error,
216+
exception=changeset_result.exception,
217+
)
218+
219+
220+
# pylint: disable=too-many-branches,too-many-return-statements
221+
def polling_payload_to_changeset(data: dict) -> _Result[ChangeSet, str]:
222+
"""
223+
Converts a polling payload into a ChangeSet.
224+
"""
225+
if "events" not in data or not isinstance(data["events"], list):
226+
return _Fail(error="Invalid payload: 'events' key is missing or not a list")
227+
228+
builder = ChangeSetBuilder()
229+
230+
for event in data["events"]:
231+
if not isinstance(event, dict):
232+
return _Fail(error="Invalid payload: 'events' must be a list of objects")
233+
234+
for event in data["events"]:
235+
if event["event"] == EventName.SERVER_INTENT:
236+
try:
237+
server_intent = ServerIntent.from_dict(event["data"])
238+
except ValueError as err:
239+
return _Fail(error="Invalid JSON in server intent", exception=err)
240+
241+
if server_intent.payload.code == IntentCode.TRANSFER_NONE:
242+
return _Success(ChangeSetBuilder.no_changes())
243+
244+
builder.start(server_intent.payload.code)
245+
elif event["event"] == EventName.PUT_OBJECT:
246+
try:
247+
put = PutObject.from_dict(event["data"])
248+
except ValueError as err:
249+
return _Fail(error="Invalid JSON in put object", exception=err)
250+
251+
builder.add_put(put.kind, put.key, put.version, put.object)
252+
elif event["event"] == EventName.DELETE_OBJECT:
253+
try:
254+
delete_object = DeleteObject.from_dict(event["data"])
255+
except ValueError as err:
256+
return _Fail(error="Invalid JSON in delete object", exception=err)
257+
258+
builder.add_delete(
259+
delete_object.kind, delete_object.key, delete_object.version
260+
)
261+
elif event["event"] == EventName.PAYLOAD_TRANSFERRED:
262+
try:
263+
selector = Selector.from_dict(event["data"])
264+
changeset = builder.finish(selector)
265+
266+
return _Success(value=changeset)
267+
except ValueError as err:
268+
return _Fail(
269+
error="Invalid JSON in payload transferred object", exception=err
270+
)
271+
272+
return _Fail(error="didn't receive any known protocol events in polling payload")

ldclient/impl/datasystem/__init__.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
"""
2+
This package contains the generic interfaces used for the data system (v1 and
3+
v2), as well as types for v1 and v2 specific protocols.
4+
"""
5+
6+
from abc import abstractmethod
7+
from typing import Protocol
8+
9+
from ldclient.impl.util import Result
10+
11+
12+
class Synchronizer(Protocol):
13+
"""
14+
Represents a component capable of obtaining a Basis and subsequent delta
15+
updates asynchronously.
16+
"""
17+
18+
@abstractmethod
19+
def name(self) -> str:
20+
"""Returns the name of the initializer."""
21+
raise NotImplementedError
22+
23+
# TODO(fdv2): Need sync method
24+
25+
def close(self):
26+
"""
27+
Close the synchronizer, releasing any resources it holds.
28+
"""
29+
30+
31+
class Initializer(Protocol):
32+
"""
33+
Represents a component capable of obtaining a Basis via a synchronous call.
34+
"""
35+
36+
@abstractmethod
37+
def name(self) -> str:
38+
"""Returns the name of the initializer."""
39+
raise NotImplementedError
40+
41+
@abstractmethod
42+
def fetch(self) -> Result:
43+
"""
44+
Fetch returns a Basis, or an error if the Basis could not be retrieved.
45+
"""
46+
raise NotImplementedError
47+
48+
49+
__all__: list[str] = ["Synchronizer", "Initializer"]

0 commit comments

Comments
 (0)