Skip to content

Commit 58a89d6

Browse files
brianjlaitolik0
andauthored
fix(property chunking): Switch the ordering page iteration and property chunking process chunks first instead of pages first (#487)
Co-authored-by: Anatolii Yatsuk <[email protected]>
1 parent 0e132c5 commit 58a89d6

File tree

4 files changed

+144
-105
lines changed

4 files changed

+144
-105
lines changed

airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,10 @@ def get_request_property_chunks(
5252
chunk_size = 0
5353
for property_field in property_fields:
5454
# If property_limit_type is not defined, we default to property_count which is just an incrementing count
55+
# todo: Add ability to specify parameter delimiter representation and take into account in property_field_size
5556
property_field_size = (
5657
len(property_field)
58+
+ 1 # The +1 represents the extra character for the delimiter in between properties
5759
if self.property_limit_type == PropertyLimitType.characters
5860
else 1
5961
)

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 106 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from typing import (
1111
Any,
1212
Callable,
13+
Dict,
1314
Iterable,
1415
List,
1516
Mapping,
@@ -367,14 +368,65 @@ def _read_pages(
367368
{"next_page_token": initial_token} if initial_token is not None else None
368369
)
369370
while not pagination_complete:
370-
response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
371+
property_chunks: List[List[str]] = (
372+
list(
373+
self.additional_query_properties.get_request_property_chunks(
374+
stream_slice=stream_slice
375+
)
376+
)
377+
if self.additional_query_properties
378+
else [
379+
[]
380+
] # A single empty property chunk represents the case where property chunking is not configured
381+
)
371382

383+
merged_records: MutableMapping[str, Any] = defaultdict(dict)
372384
last_page_size = 0
373385
last_record: Optional[Record] = None
374-
for record in records_generator_fn(response):
375-
last_page_size += 1
376-
last_record = record
377-
yield record
386+
response: Optional[requests.Response] = None
387+
for properties in property_chunks:
388+
if len(properties) > 0:
389+
stream_slice = StreamSlice(
390+
partition=stream_slice.partition or {},
391+
cursor_slice=stream_slice.cursor_slice or {},
392+
extra_fields={"query_properties": properties},
393+
)
394+
395+
response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
396+
for current_record in records_generator_fn(response):
397+
if (
398+
current_record
399+
and self.additional_query_properties
400+
and self.additional_query_properties.property_chunking
401+
):
402+
merge_key = (
403+
self.additional_query_properties.property_chunking.get_merge_key(
404+
current_record
405+
)
406+
)
407+
if merge_key:
408+
_deep_merge(merged_records[merge_key], current_record)
409+
else:
410+
# We should still emit records even if the record did not have a merge key
411+
last_page_size += 1
412+
last_record = current_record
413+
yield current_record
414+
else:
415+
last_page_size += 1
416+
last_record = current_record
417+
yield current_record
418+
419+
if (
420+
self.additional_query_properties
421+
and self.additional_query_properties.property_chunking
422+
):
423+
for merged_record in merged_records.values():
424+
record = Record(
425+
data=merged_record, stream_name=self.name, associated_slice=stream_slice
426+
)
427+
last_page_size += 1
428+
last_record = record
429+
yield record
378430

379431
if not response:
380432
pagination_complete = True
@@ -449,110 +501,43 @@ def read_records(
449501
:param stream_slice: The stream slice to read data for
450502
:return: The records read from the API source
451503
"""
452-
453-
property_chunks = (
454-
list(
455-
self.additional_query_properties.get_request_property_chunks(
456-
stream_slice=stream_slice
457-
)
458-
)
459-
if self.additional_query_properties
460-
else []
461-
)
462-
records_without_merge_key = []
463-
merged_records: MutableMapping[str, Any] = defaultdict(dict)
464-
465504
_slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check
505+
466506
most_recent_record_from_slice = None
507+
record_generator = partial(
508+
self._parse_records,
509+
stream_slice=stream_slice,
510+
stream_state=self.state or {},
511+
records_schema=records_schema,
512+
)
467513

468-
if self.additional_query_properties:
469-
for properties in property_chunks:
470-
_slice = StreamSlice(
471-
partition=_slice.partition or {},
472-
cursor_slice=_slice.cursor_slice or {},
473-
extra_fields={"query_properties": properties},
474-
) # None-check
475-
476-
record_generator = partial(
477-
self._parse_records,
478-
stream_slice=_slice,
479-
stream_state=self.state or {},
480-
records_schema=records_schema,
481-
)
514+
if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor):
515+
stream_state = self.state
482516

483-
for stream_data in self._read_pages(record_generator, self.state, _slice):
484-
current_record = self._extract_record(stream_data, _slice)
485-
if self.cursor and current_record:
486-
self.cursor.observe(_slice, current_record)
517+
# Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to
518+
# fetch more records. The platform deletes stream state for full refresh streams before starting a
519+
# new job, so we don't need to worry about this value existing for the initial attempt
520+
if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY):
521+
return
487522

488-
# Latest record read, not necessarily within slice boundaries.
489-
# TODO Remove once all custom components implement `observe` method.
490-
# https://github.com/airbytehq/airbyte-internal-issues/issues/6955
491-
most_recent_record_from_slice = self._get_most_recent_record(
492-
most_recent_record_from_slice, current_record, _slice
493-
)
523+
yield from self._read_single_page(record_generator, stream_state, _slice)
524+
else:
525+
for stream_data in self._read_pages(record_generator, self.state, _slice):
526+
current_record = self._extract_record(stream_data, _slice)
527+
if self.cursor and current_record:
528+
self.cursor.observe(_slice, current_record)
529+
530+
# Latest record read, not necessarily within slice boundaries.
531+
# TODO Remove once all custom components implement `observe` method.
532+
# https://github.com/airbytehq/airbyte-internal-issues/issues/6955
533+
most_recent_record_from_slice = self._get_most_recent_record(
534+
most_recent_record_from_slice, current_record, _slice
535+
)
536+
yield stream_data
494537

495-
if current_record and self.additional_query_properties.property_chunking:
496-
merge_key = (
497-
self.additional_query_properties.property_chunking.get_merge_key(
498-
current_record
499-
)
500-
)
501-
if merge_key:
502-
merged_records[merge_key].update(current_record)
503-
else:
504-
# We should still emit records even if the record did not have a merge key
505-
records_without_merge_key.append(current_record)
506-
else:
507-
yield stream_data
508538
if self.cursor:
509539
self.cursor.close_slice(_slice, most_recent_record_from_slice)
510-
511-
if len(merged_records) > 0:
512-
yield from [
513-
Record(data=merged_record, stream_name=self.name, associated_slice=stream_slice)
514-
for merged_record in merged_records.values()
515-
]
516-
if len(records_without_merge_key) > 0:
517-
yield from records_without_merge_key
518-
else:
519-
_slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check
520-
521-
most_recent_record_from_slice = None
522-
record_generator = partial(
523-
self._parse_records,
524-
stream_slice=stream_slice,
525-
stream_state=self.state or {},
526-
records_schema=records_schema,
527-
)
528-
529-
if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor):
530-
stream_state = self.state
531-
532-
# Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to
533-
# fetch more records. The platform deletes stream state for full refresh streams before starting a
534-
# new job, so we don't need to worry about this value existing for the initial attempt
535-
if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY):
536-
return
537-
538-
yield from self._read_single_page(record_generator, stream_state, _slice)
539-
else:
540-
for stream_data in self._read_pages(record_generator, self.state, _slice):
541-
current_record = self._extract_record(stream_data, _slice)
542-
if self.cursor and current_record:
543-
self.cursor.observe(_slice, current_record)
544-
545-
# Latest record read, not necessarily within slice boundaries.
546-
# TODO Remove once all custom components implement `observe` method.
547-
# https://github.com/airbytehq/airbyte-internal-issues/issues/6955
548-
most_recent_record_from_slice = self._get_most_recent_record(
549-
most_recent_record_from_slice, current_record, _slice
550-
)
551-
yield stream_data
552-
553-
if self.cursor:
554-
self.cursor.close_slice(_slice, most_recent_record_from_slice)
555-
return
540+
return
556541

557542
def _get_most_recent_record(
558543
self,
@@ -639,6 +624,26 @@ def _to_partition_key(to_serialize: Any) -> str:
639624
return json.dumps(to_serialize, indent=None, separators=(",", ":"), sort_keys=True)
640625

641626

627+
def _deep_merge(
628+
target: MutableMapping[str, Any], source: Union[Record, MutableMapping[str, Any]]
629+
) -> None:
630+
"""
631+
Recursively merge two dictionaries, combining nested dictionaries instead of overwriting them.
632+
633+
:param target: The dictionary to merge into (modified in place)
634+
:param source: The dictionary to merge from
635+
"""
636+
for key, value in source.items():
637+
if (
638+
key in target
639+
and isinstance(target[key], MutableMapping)
640+
and isinstance(value, MutableMapping)
641+
):
642+
_deep_merge(target[key], value)
643+
else:
644+
target[key] = value
645+
646+
642647
@dataclass
643648
class SimpleRetrieverTestReadDecorator(SimpleRetriever):
644649
"""

unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,18 @@
4343
["kate", "laurie", "jaclyn"],
4444
None,
4545
PropertyLimitType.characters,
46-
10,
46+
15,
4747
[["kate", "laurie"], ["jaclyn"]],
4848
id="test_property_chunking_limit_characters",
4949
),
50+
pytest.param(
51+
["laurie", "jaclyn", "kaitlin"],
52+
None,
53+
PropertyLimitType.characters,
54+
12,
55+
[["laurie"], ["jaclyn"], ["kaitlin"]],
56+
id="test_property_chunking_includes_extra_delimiter",
57+
),
5058
pytest.param(
5159
[],
5260
None,

unit_tests/sources/declarative/retrievers/test_simple_retriever.py

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1148,6 +1148,14 @@ def test_simple_retriever_with_additional_query_properties():
11481148
"last_name": "hongou",
11491149
"nonary": "second",
11501150
"bracelet": "1",
1151+
"dict_field": {
1152+
"key1": "value1",
1153+
"key2": "value2",
1154+
"affiliation": {
1155+
"company": "cradle",
1156+
"industry": "pharmaceutical",
1157+
},
1158+
},
11511159
},
11521160
associated_slice=None,
11531161
stream_name=stream_name,
@@ -1170,6 +1178,7 @@ def test_simple_retriever_with_additional_query_properties():
11701178
"last_name": "kurashiki",
11711179
"nonary": "second",
11721180
"bracelet": "6",
1181+
"allies": ["aoi_kurashiki"],
11731182
},
11741183
associated_slice=None,
11751184
stream_name=stream_name,
@@ -1216,7 +1225,12 @@ def test_simple_retriever_with_additional_query_properties():
12161225
record_selector.select_records.side_effect = [
12171226
[
12181227
Record(
1219-
data={"id": "a", "first_name": "gentarou", "last_name": "hongou"},
1228+
data={
1229+
"id": "a",
1230+
"first_name": "gentarou",
1231+
"last_name": "hongou",
1232+
"dict_field": {"key1": "value1", "affiliation": {"company": "cradle"}},
1233+
},
12201234
associated_slice=None,
12211235
stream_name=stream_name,
12221236
),
@@ -1226,7 +1240,12 @@ def test_simple_retriever_with_additional_query_properties():
12261240
stream_name=stream_name,
12271241
),
12281242
Record(
1229-
data={"id": "c", "first_name": "akane", "last_name": "kurashiki"},
1243+
data={
1244+
"id": "c",
1245+
"first_name": "akane",
1246+
"last_name": "kurashiki",
1247+
"allies": ["aoi_kurashiki"],
1248+
},
12301249
associated_slice=None,
12311250
stream_name=stream_name,
12321251
),
@@ -1263,7 +1282,12 @@ def test_simple_retriever_with_additional_query_properties():
12631282
stream_name=stream_name,
12641283
),
12651284
Record(
1266-
data={"id": "a", "nonary": "second", "bracelet": "1"},
1285+
data={
1286+
"id": "a",
1287+
"nonary": "second",
1288+
"bracelet": "1",
1289+
"dict_field": {"key2": "value2", "affiliation": {"industry": "pharmaceutical"}},
1290+
},
12671291
associated_slice=None,
12681292
stream_name=stream_name,
12691293
),

0 commit comments

Comments
 (0)