Skip to content

Commit 19fa296

Browse files
committed
chore: Introducing Synchronizer protocol & streaming implementation
1 parent 0fd0afd commit 19fa296

File tree

5 files changed

+986
-25
lines changed

5 files changed

+986
-25
lines changed
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,77 @@
11
"""
22
This module houses FDv2 types and implementations of synchronizers and
33
initializers for the datasystem.
4+
5+
All types and implementations in this module are considered internal
6+
and are not part of the public API of the LaunchDarkly Python SDK.
7+
They are subject to change without notice and should not be used directly
8+
by users of the SDK.
9+
10+
You have been warned.
411
"""
512

13+
from abc import abstractmethod
14+
from dataclasses import dataclass
15+
from typing import Iterable, Mapping, Optional, Protocol, Tuple
16+
17+
from ldclient.impl.datasystem.protocolv2 import ChangeSet, Selector
18+
from ldclient.impl.util import _Result
19+
from ldclient.interfaces import DataSourceErrorInfo, DataSourceState
20+
21+
PollingResult = _Result[Tuple[ChangeSet, Mapping], str]
22+
23+
24+
class PollingRequester(Protocol): # pylint: disable=too-few-public-methods
25+
"""
26+
PollingRequester allows PollingDataSource to delegate fetching data to
27+
another component.
28+
29+
This is useful for testing the PollingDataSource without needing to set up
30+
a test HTTP server.
31+
"""
32+
33+
@abstractmethod
34+
def fetch(self, selector: Optional[Selector]) -> PollingResult:
35+
"""
36+
Fetches the data for the given selector.
37+
Returns a Result containing a tuple of ChangeSet and any request headers,
38+
or an error if the data could not be retrieved.
39+
"""
40+
raise NotImplementedError
41+
42+
43+
@dataclass(frozen=True)
44+
class Update:
45+
"""
46+
Update represents the results of a synchronizer's ongoing sync
47+
method.
48+
"""
49+
50+
state: DataSourceState
51+
change_set: Optional[ChangeSet] = None
52+
error: Optional[DataSourceErrorInfo] = None
53+
revert_to_fdv1: bool = False
54+
environment_id: Optional[str] = None
55+
56+
57+
class Synchronizer(Protocol): # pylint: disable=too-few-public-methods
58+
"""
59+
Synchronizer represents a component capable of synchronizing data from an external
60+
data source, such as a streaming or polling API.
61+
62+
It is responsible for yielding Update objects that represent the current state
63+
of the data source, including any changes that have occurred since the last
64+
synchronization.
65+
"""
66+
67+
@abstractmethod
68+
def sync(self) -> Iterable[Update]:
69+
"""
70+
sync should begin the synchronization process for the data source, yielding
71+
Update objects until the connection is closed or an unrecoverable error
72+
occurs.
73+
"""
74+
raise NotImplementedError
75+
76+
677
__all__: list[str] = []

ldclient/impl/datasourcev2/polling.py

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
"""
2-
Default implementation of the polling synchronizer and initializer.
2+
This module contains the implementations of a polling synchronizer and
3+
initializer, along with any required supporting classes and protocols.
34
"""
45

56
import json
6-
from abc import abstractmethod
77
from collections import namedtuple
8-
from collections.abc import Mapping
9-
from typing import Optional, Protocol, Tuple
8+
from typing import Iterable, Optional
109
from urllib import parse
1110

1211
import urllib3
1312

13+
from ldclient.impl.datasourcev2 import PollingRequester, PollingResult, Update
1414
from ldclient.impl.datasystem.protocolv2 import (
1515
Basis,
1616
ChangeSet,
@@ -38,27 +38,6 @@
3838

3939
POLLING_ENDPOINT = "/sdk/poll"
4040

41-
PollingResult = _Result[Tuple[ChangeSet, Mapping], str]
42-
43-
44-
class PollingRequester(Protocol): # pylint: disable=too-few-public-methods
45-
"""
46-
PollingRequester allows PollingDataSource to delegate fetching data to
47-
another component.
48-
49-
This is useful for testing the PollingDataSource without needing to set up
50-
a test HTTP server.
51-
"""
52-
53-
@abstractmethod
54-
def fetch(self, selector: Optional[Selector]) -> PollingResult:
55-
"""
56-
Fetches the data for the given selector.
57-
Returns a Result containing a tuple of ChangeSet and any request headers,
58-
or an error if the data could not be retrieved.
59-
"""
60-
raise NotImplementedError
61-
6241

6342
CacheEntry = namedtuple("CacheEntry", ["data", "etag"])
6443

0 commit comments

Comments
 (0)