Skip to content

Commit eb35610

Browse files
authored
chore: Add early support for FDv2-based test data source (#350)
1 parent 61b71b2 commit eb35610

File tree

4 files changed

+1359
-0
lines changed

4 files changed

+1359
-0
lines changed

ldclient/impl/integrations/test_datav2/__init__.py

Whitespace-only changes.
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
import threading
2+
from queue import Empty, Queue
3+
from typing import Generator
4+
5+
from ldclient.impl.datasystem import BasisResult, Update
6+
from ldclient.impl.datasystem.protocolv2 import (
7+
Basis,
8+
ChangeSetBuilder,
9+
IntentCode,
10+
ObjectKind,
11+
Selector
12+
)
13+
from ldclient.impl.util import _Fail, _Success, current_time_millis
14+
from ldclient.interfaces import (
15+
DataSourceErrorInfo,
16+
DataSourceErrorKind,
17+
DataSourceState
18+
)
19+
20+
21+
class _TestDataSourceV2:
22+
"""
23+
Internal implementation of both Initializer and Synchronizer protocols for TestDataV2.
24+
25+
This component bridges the test data management in TestDataV2 with the FDv2 protocol
26+
interfaces. Each instance implements both Initializer and Synchronizer protocols
27+
and receives change notifications for dynamic updates.
28+
"""
29+
30+
def __init__(self, test_data):
31+
self._test_data = test_data
32+
self._closed = False
33+
self._update_queue = Queue()
34+
self._lock = threading.Lock()
35+
36+
# Always register for change notifications
37+
self._test_data._add_instance(self)
38+
39+
# Locking strategy:
40+
# The threading.Lock instance (_lock) ensures thread safety for shared resources:
41+
# - Used in `fetch` and `close` to prevent concurrent modification of `_closed`.
42+
# - Added to `upsert_flag` to address potential race conditions.
43+
# - The `sync` method relies on Queue's thread-safe properties for updates.
44+
45+
def fetch(self) -> BasisResult:
46+
"""
47+
Implementation of the Initializer.fetch method.
48+
49+
Returns the current test data as a Basis for initial data loading.
50+
"""
51+
try:
52+
with self._lock:
53+
if self._closed:
54+
return _Fail("TestDataV2 source has been closed")
55+
56+
# Get all current flags from test data
57+
init_data = self._test_data._make_init_data()
58+
version = self._test_data._get_version()
59+
60+
# Build a full transfer changeset
61+
builder = ChangeSetBuilder()
62+
builder.start(IntentCode.TRANSFER_FULL)
63+
64+
# Add all flags to the changeset
65+
for key, flag_data in init_data.items():
66+
builder.add_put(
67+
ObjectKind.FLAG,
68+
key,
69+
flag_data.get('version', 1),
70+
flag_data
71+
)
72+
73+
# Create selector for this version
74+
selector = Selector.new_selector(str(version), version)
75+
change_set = builder.finish(selector)
76+
77+
basis = Basis(
78+
change_set=change_set,
79+
persist=False,
80+
environment_id=None
81+
)
82+
83+
return _Success(basis)
84+
85+
except Exception as e:
86+
return _Fail(f"Error fetching test data: {str(e)}")
87+
88+
def sync(self) -> Generator[Update, None, None]:
89+
"""
90+
Implementation of the Synchronizer.sync method.
91+
92+
Yields updates as test data changes occur.
93+
"""
94+
95+
# First yield initial data
96+
initial_result = self.fetch()
97+
if isinstance(initial_result, _Fail):
98+
yield Update(
99+
state=DataSourceState.OFF,
100+
error=DataSourceErrorInfo(
101+
kind=DataSourceErrorKind.STORE_ERROR,
102+
status_code=0,
103+
time=current_time_millis(),
104+
message=initial_result.error
105+
)
106+
)
107+
return
108+
109+
# Yield the initial successful state
110+
yield Update(
111+
state=DataSourceState.VALID,
112+
change_set=initial_result.value.change_set
113+
)
114+
115+
# Continue yielding updates as they arrive
116+
while not self._closed:
117+
try:
118+
# Wait for updates with a timeout to allow checking closed status
119+
try:
120+
update = self._update_queue.get(timeout=1.0)
121+
except Empty:
122+
continue
123+
124+
if update is None: # Sentinel value for shutdown
125+
break
126+
127+
yield update
128+
129+
except Exception as e:
130+
yield Update(
131+
state=DataSourceState.OFF,
132+
error=DataSourceErrorInfo(
133+
kind=DataSourceErrorKind.UNKNOWN,
134+
status_code=0,
135+
time=current_time_millis(),
136+
message=f"Error in test data synchronizer: {str(e)}"
137+
)
138+
)
139+
break
140+
141+
def close(self):
142+
"""Close the data source and clean up resources."""
143+
with self._lock:
144+
if self._closed:
145+
return
146+
self._closed = True
147+
148+
self._test_data._closed_instance(self)
149+
# Signal shutdown to sync generator
150+
self._update_queue.put(None)
151+
152+
def upsert_flag(self, flag_data: dict):
153+
"""
154+
Called by TestDataV2 when a flag is updated.
155+
156+
This method converts the flag update into an FDv2 changeset and
157+
queues it for delivery through the sync() generator.
158+
"""
159+
with self._lock:
160+
if self._closed:
161+
return
162+
163+
try:
164+
version = self._test_data._get_version()
165+
166+
# Build a changes transfer changeset
167+
builder = ChangeSetBuilder()
168+
builder.start(IntentCode.TRANSFER_CHANGES)
169+
170+
# Add the updated flag
171+
builder.add_put(
172+
ObjectKind.FLAG,
173+
flag_data['key'],
174+
flag_data.get('version', 1),
175+
flag_data
176+
)
177+
178+
# Create selector for this version
179+
selector = Selector.new_selector(str(version), version)
180+
change_set = builder.finish(selector)
181+
182+
# Queue the update
183+
update = Update(
184+
state=DataSourceState.VALID,
185+
change_set=change_set
186+
)
187+
188+
self._update_queue.put(update)
189+
190+
except Exception as e:
191+
# Queue an error update
192+
error_update = Update(
193+
state=DataSourceState.OFF,
194+
error=DataSourceErrorInfo(
195+
kind=DataSourceErrorKind.STORE_ERROR,
196+
status_code=0,
197+
time=current_time_millis(),
198+
message=f"Error processing flag update: {str(e)}"
199+
)
200+
)
201+
self._update_queue.put(error_update)

0 commit comments

Comments
 (0)