Skip to content

Commit 0e132c5

Browse files
maxi297octavia-squidington-iii
and
octavia-squidington-iii
authored
fix: Properly setup global substream cursor based on manifest (#490)
Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent 24cbc51 commit 0e132c5

File tree

3 files changed

+54
-3
lines changed

3 files changed

+54
-3
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
from airbyte_cdk.sources.declarative.extractors.record_filter import (
2020
ClientSideIncrementalRecordFilterDecorator,
2121
)
22-
from airbyte_cdk.sources.declarative.incremental import ConcurrentPerPartitionCursor
22+
from airbyte_cdk.sources.declarative.incremental import (
23+
ConcurrentPerPartitionCursor,
24+
GlobalSubstreamCursor,
25+
)
2326
from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
2427
from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import (
2528
PerPartitionWithGlobalCursor,
@@ -361,7 +364,8 @@ def _group_streams(
361364
== DatetimeBasedCursorModel.__name__
362365
and hasattr(declarative_stream.retriever, "stream_slicer")
363366
and isinstance(
364-
declarative_stream.retriever.stream_slicer, PerPartitionWithGlobalCursor
367+
declarative_stream.retriever.stream_slicer,
368+
(GlobalSubstreamCursor, PerPartitionWithGlobalCursor),
365369
)
366370
):
367371
stream_state = self._connector_state_manager.get_stream_state(

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1439,7 +1439,9 @@ def create_concurrent_cursor_from_perpartition_cursor(
14391439
stream_state = self.apply_stream_state_migrations(stream_state_migrations, stream_state)
14401440

14411441
# Per-partition state doesn't make sense for GroupingPartitionRouter, so force the global state
1442-
use_global_cursor = isinstance(partition_router, GroupingPartitionRouter)
1442+
use_global_cursor = isinstance(
1443+
partition_router, GroupingPartitionRouter
1444+
) or component_definition.get("global_substream_cursor", False)
14431445

14441446
# Return the concurrent cursor and state converter
14451447
return ConcurrentPerPartitionCursor(

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3449,3 +3449,48 @@ def test_semaphore_cleanup():
34493449
assert '{"id":"2"}' not in cursor._semaphore_per_partition
34503450
assert len(cursor._partition_parent_state_map) == 0 # All parent states should be popped
34513451
assert cursor._parent_state == {"parent": {"state": "state2"}} # Last parent state
3452+
3453+
3454+
def test_given_global_state_when_read_then_state_is_not_per_partition() -> None:
3455+
manifest = deepcopy(SUBSTREAM_MANIFEST)
3456+
manifest["definitions"]["post_comments_stream"]["incremental_sync"][
3457+
"global_substream_cursor"
3458+
] = True
3459+
manifest["streams"].remove({"$ref": "#/definitions/post_comment_votes_stream"})
3460+
record = {
3461+
"id": 9,
3462+
"post_id": 1,
3463+
"updated_at": COMMENT_10_UPDATED_AT,
3464+
}
3465+
mock_requests = [
3466+
(
3467+
f"https://api.example.com/community/posts?per_page=100&start_time={START_DATE}",
3468+
{
3469+
"posts": [
3470+
{"id": 1, "updated_at": POST_1_UPDATED_AT},
3471+
],
3472+
},
3473+
),
3474+
# Fetch the first page of comments for post 1
3475+
(
3476+
"https://api.example.com/community/posts/1/comments?per_page=100",
3477+
{
3478+
"comments": [record],
3479+
},
3480+
),
3481+
]
3482+
3483+
run_mocked_test(
3484+
mock_requests,
3485+
manifest,
3486+
CONFIG,
3487+
"post_comments",
3488+
{},
3489+
[record],
3490+
{
3491+
"lookback_window": 1,
3492+
"parent_state": {"posts": {"updated_at": "2024-01-30T00:00:00Z"}},
3493+
"state": {"updated_at": "2024-01-25T00:00:00Z"},
3494+
"use_global_cursor": True, # ensures that it is running the Concurrent CDK version as this is not populated in the declarative implementation
3495+
}, # this state does have per partition which would be under `states`
3496+
)

0 commit comments

Comments
 (0)