Skip to content

Commit 0a12a58

Browse files
authored
fix(low-code): Fix declarative low-code state migration in SubstreamPartitionRouter (#267)
1 parent b1824c6 commit 0a12a58

File tree

4 files changed

+115
-32
lines changed

4 files changed

+115
-32
lines changed

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ def close_partition(self, partition: Partition) -> None:
147147
< cursor.state[self.cursor_field.cursor_field_key]
148148
):
149149
self._new_global_cursor = copy.deepcopy(cursor.state)
150-
self._emit_state_message()
150+
self._emit_state_message()
151151

152152
def ensure_at_least_one_state_emitted(self) -> None:
153153
"""
@@ -192,7 +192,8 @@ def _generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[St
192192
self._global_cursor,
193193
self._lookback_window if self._global_cursor else 0,
194194
)
195-
self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor
195+
with self._lock:
196+
self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor
196197
self._semaphore_per_partition[self._to_partition_key(partition.partition)] = (
197198
threading.Semaphore(0)
198199
)
@@ -210,16 +211,38 @@ def _generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[St
210211

211212
def _ensure_partition_limit(self) -> None:
212213
"""
213-
Ensure the maximum number of partitions is not exceeded. If so, the oldest added partition will be dropped.
214+
Ensure the maximum number of partitions does not exceed the predefined limit.
215+
216+
Steps:
217+
1. Attempt to remove partitions that are marked as finished in `_finished_partitions`.
218+
These partitions are considered processed and safe to delete.
219+
2. If the limit is still exceeded and no finished partitions are available for removal,
220+
remove the oldest partition unconditionally. We expect failed partitions to be removed.
221+
222+
Logging:
223+
- Logs a warning each time a partition is removed, indicating whether it was finished
224+
or removed due to being the oldest.
214225
"""
215-
while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1:
216-
self._over_limit += 1
217-
oldest_partition = self._cursor_per_partition.popitem(last=False)[
218-
0
219-
] # Remove the oldest partition
220-
logger.warning(
221-
f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._over_limit}."
222-
)
226+
with self._lock:
227+
while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1:
228+
# Try removing finished partitions first
229+
for partition_key in list(self._cursor_per_partition.keys()):
230+
if partition_key in self._finished_partitions:
231+
oldest_partition = self._cursor_per_partition.pop(
232+
partition_key
233+
) # Remove the oldest partition
234+
logger.warning(
235+
f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._over_limit}."
236+
)
237+
break
238+
else:
239+
# If no finished partitions can be removed, fall back to removing the oldest partition
240+
oldest_partition = self._cursor_per_partition.popitem(last=False)[
241+
1
242+
] # Remove the oldest partition
243+
logger.warning(
244+
f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._over_limit}."
245+
)
223246

224247
def _set_initial_state(self, stream_state: StreamState) -> None:
225248
"""
@@ -264,7 +287,10 @@ def _set_initial_state(self, stream_state: StreamState) -> None:
264287
if not stream_state:
265288
return
266289

267-
if self._PERPARTITION_STATE_KEY not in stream_state:
290+
if (
291+
self._PERPARTITION_STATE_KEY not in stream_state
292+
and self._GLOBAL_STATE_KEY not in stream_state
293+
):
268294
# We assume that `stream_state` is in a global format that can be applied to all partitions.
269295
# Example: {"global_state_format_key": "global_state_format_value"}
270296
self._global_cursor = deepcopy(stream_state)
@@ -273,7 +299,7 @@ def _set_initial_state(self, stream_state: StreamState) -> None:
273299
else:
274300
self._lookback_window = int(stream_state.get("lookback_window", 0))
275301

276-
for state in stream_state[self._PERPARTITION_STATE_KEY]:
302+
for state in stream_state.get(self._PERPARTITION_STATE_KEY, []):
277303
self._cursor_per_partition[self._to_partition_key(state["partition"])] = (
278304
self._create_cursor(state["cursor"])
279305
)

airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py

Lines changed: 46 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -295,28 +295,58 @@ def set_initial_state(self, stream_state: StreamState) -> None:
295295
return
296296

297297
if not parent_state and incremental_dependency:
298-
# Attempt to retrieve child state
299-
substream_state_values = list(stream_state.values())
300-
substream_state = substream_state_values[0] if substream_state_values else {}
301-
# Filter out per partition state. Because we pass the state to the parent stream in the format {cursor_field: substream_state}
302-
if isinstance(substream_state, (list, dict)):
303-
substream_state = {}
304-
305-
parent_state = {}
306-
307-
# Copy child state to parent streams with incremental dependencies
308-
if substream_state:
309-
for parent_config in self.parent_stream_configs:
310-
if parent_config.incremental_dependency:
311-
parent_state[parent_config.stream.name] = {
312-
parent_config.stream.cursor_field: substream_state
313-
}
298+
# Migrate child state to parent state format
299+
parent_state = self._migrate_child_state_to_parent_state(stream_state)
314300

315301
# Set state for each parent stream with an incremental dependency
316302
for parent_config in self.parent_stream_configs:
317303
if parent_config.incremental_dependency:
318304
parent_config.stream.state = parent_state.get(parent_config.stream.name, {})
319305

306+
def _migrate_child_state_to_parent_state(self, stream_state: StreamState) -> StreamState:
307+
"""
308+
Migrate the child stream state to the parent stream's state format.
309+
310+
This method converts the global or child state into a format compatible with parent
311+
streams. The migration occurs only for parent streams with incremental dependencies.
312+
The method filters out per-partition states and retains only the global state in the
313+
format `{cursor_field: cursor_value}`.
314+
315+
Args:
316+
stream_state (StreamState): The state to migrate. Expected formats include:
317+
- {"updated_at": "2023-05-27T00:00:00Z"}
318+
- {"states": [...] } (ignored during migration)
319+
320+
Returns:
321+
StreamState: A migrated state for parent streams in the format:
322+
{
323+
"parent_stream_name": {"parent_stream_cursor": "2023-05-27T00:00:00Z"}
324+
}
325+
326+
Example:
327+
Input: {"updated_at": "2023-05-27T00:00:00Z"}
328+
Output: {
329+
"parent_stream_name": {"parent_stream_cursor": "2023-05-27T00:00:00Z"}
330+
}
331+
"""
332+
substream_state_values = list(stream_state.values())
333+
substream_state = substream_state_values[0] if substream_state_values else {}
334+
335+
# Ignore per-partition states or invalid formats
336+
if isinstance(substream_state, (list, dict)) or len(substream_state_values) != 1:
337+
return {}
338+
339+
# Copy child state to parent streams with incremental dependencies
340+
parent_state = {}
341+
if substream_state:
342+
for parent_config in self.parent_stream_configs:
343+
if parent_config.incremental_dependency:
344+
parent_state[parent_config.stream.name] = {
345+
parent_config.stream.cursor_field: substream_state
346+
}
347+
348+
return parent_state
349+
320350
def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
321351
"""
322352
Get the state of the parent streams.

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1171,7 +1171,7 @@ def test_incremental_parent_state(
11711171

11721172

11731173
@pytest.mark.parametrize(
1174-
"test_name, manifest, mock_requests, expected_records, initial_state, expected_state",
1174+
"test_name, manifest, mock_requests, expected_records, expected_state",
11751175
[
11761176
(
11771177
"test_incremental_parent_state",
@@ -1326,8 +1326,6 @@ def test_incremental_parent_state(
13261326
"id": 300,
13271327
},
13281328
],
1329-
# Initial state
1330-
{"created_at": PARTITION_SYNC_START_TIME},
13311329
# Expected state
13321330
{
13331331
"state": {"created_at": VOTE_100_CREATED_AT},
@@ -1384,6 +1382,25 @@ def test_incremental_parent_state(
13841382
),
13851383
],
13861384
)
1385+
@pytest.mark.parametrize(
1386+
"initial_state",
1387+
[
1388+
{"created_at": PARTITION_SYNC_START_TIME},
1389+
{
1390+
"state": {"created_at": PARTITION_SYNC_START_TIME},
1391+
"lookback_window": 0,
1392+
"use_global_cursor": True,
1393+
"parent_state": {
1394+
"post_comments": {
1395+
"state": {"updated_at": PARTITION_SYNC_START_TIME},
1396+
"parent_state": {"posts": {"updated_at": PARTITION_SYNC_START_TIME}},
1397+
"lookback_window": 0,
1398+
}
1399+
},
1400+
},
1401+
],
1402+
ids=["legacy_python_format", "low_code_global_format"],
1403+
)
13871404
def test_incremental_parent_state_migration(
13881405
test_name, manifest, mock_requests, expected_records, initial_state, expected_state
13891406
):

unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,13 +464,23 @@ def test_substream_partition_router_invalid_parent_record_type():
464464
},
465465
{"parent_stream_cursor": "2023-05-27T00:00:00Z"},
466466
),
467+
# Case 6: Declarative global cursor state, no migration expected
468+
(
469+
{
470+
"looback_window": 1,
471+
"use_global_cursor": True,
472+
"state": {"updated": "2023-05-27T00:00:00Z"},
473+
},
474+
{},
475+
),
467476
],
468477
ids=[
469478
"empty_initial_state",
470479
"initial_state_no_parent_legacy_state",
471480
"initial_state_no_parent_global_state",
472481
"initial_state_no_parent_per_partition_state",
473482
"initial_state_with_parent_state",
483+
"initial_state_no_parent_global_state_declarative",
474484
],
475485
)
476486
def test_set_initial_state(initial_state, expected_parent_state):

0 commit comments

Comments
 (0)