Skip to content

Commit

Permalink
Add some dev-facing normalization docs (#13780)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao authored Jun 15, 2022
1 parent 330d32e commit 897522c
Show file tree
Hide file tree
Showing 8 changed files with 251 additions and 57 deletions.
1 change: 1 addition & 0 deletions .prettierignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output
264 changes: 214 additions & 50 deletions airbyte-integrations/bases/base-normalization/README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,8 @@ def writer():
line = input_data.readline()
if not line:
break
process.stdin.write(line)
if not line.startswith(b"#"):
process.stdin.write(line)
process.stdin.close()

thread = threading.Thread(target=writer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637991100, "data": { "id": 5, "currency": "USD", "NZD": 0.01, "HKD@spéçiäl & characters": 8.12, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}}
{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637991200, "data": { "id": 5, "currency": "USD", "NZD": 0.01, "HKD@spéçiäl & characters": 9.23, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}}

# Note that some of the IDs are inserted and then deleted; this should be reflected as a single row in the SCD model with _airbyte_active_row set to 0.
{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":1,"name":"mazda","_ab_cdc_updated_at":1623849130530,"_ab_cdc_lsn":26971624,"_ab_cdc_deleted_at":null},"emitted_at":1623859926}}
{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":2,"name":"toyata","_ab_cdc_updated_at":1623849130549,"_ab_cdc_lsn":26971624,"_ab_cdc_deleted_at":null},"emitted_at":1623859926}}
{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":4,"name":"bmw","_ab_cdc_updated_at":1623849314535,"_ab_cdc_lsn":26974776,"_ab_cdc_deleted_at":null},"emitted_at":1623860160}}
{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":5,"name":"vw","_ab_cdc_updated_at":1623849314663,"_ab_cdc_lsn":26975264,"_ab_cdc_deleted_at":null},"emitted_at":1623860160}}
{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":4,"name":null,"_ab_cdc_updated_at":1623849314791,"_ab_cdc_lsn":26975440,"_ab_cdc_deleted_at":1623849314791},"emitted_at":1623860160}}
{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":6,"name":"opel","_ab_cdc_updated_at":1623850868109,"_ab_cdc_lsn":27009440,"_ab_cdc_deleted_at":null},"emitted_at":1623861660}}
{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":7,"name":"lotus","_ab_cdc_updated_at":1623850868237,"_ab_cdc_lsn":27010048,"_ab_cdc_deleted_at":null},"emitted_at":1623861660}}
# messages_incremental.txt has a dedup_cdc_excluded record with emitted_at=1623860160, i.e. older than this record. If you delete/modify this record, make sure to maintain that relationship.
{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":6,"name":null,"_ab_cdc_updated_at":1623850868371,"_ab_cdc_lsn":27010232,"_ab_cdc_deleted_at":1623850868371},"emitted_at":1623861660}}

{"type":"RECORD","record":{"stream":"pos_dedup_cdcx","data":{"id":1,"name":"mazda","_ab_cdc_updated_at":1623849130530,"_ab_cdc_lsn":26971624,"_ab_cdc_log_pos": 33274,"_ab_cdc_deleted_at":null},"emitted_at":1623859926}}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,35 @@
# Some records are duplicated from messages.txt - this mimics our "at-least-once" delivery policy.

# Other records "go back in time", i.e. are new data but have an older emitted_at timestamp than some of the those duplicated records.
# (I think?) This mimics an interruption to normalization, such that some records were normalized but others were not.

# These first records are old data.
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "", "timestamp_col": "", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637990900, "data": { "id": 3, "currency": "GBP", "NZD": 3.14, "HKD@spéçiäl & characters": 9.2, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}}
# These records are new data.
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602650000000, "data": { "id": 2, "currency": "EUR", "NZD": 3.89, "HKD@spéçiäl & characters": 14.05, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602650010000, "data": { "id": 4, "currency": "HKD", "NZD": 1.19, "HKD@spéçiäl & characters": 0.01, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602650011000, "data": { "id": 1, "currency": "USD", "date": "2020-10-14", "timestamp_col": "2020-10-14T00:00:00.000-00", "NZD": 1.14, "HKD@spéçiäl & characters": 9.5, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602650012000, "data": { "id": 5, "currency": "USD", "NZD": 0.01, "HKD@spéçiäl & characters": 6.39, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}}

# These first records are old data.
{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990800, "data": { "id": 2, "currency": "EUR", "date": "", "timestamp_col": "", "NZD": 2.43, "HKD@spéçiäl & characters": 5.4, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}}
{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602637990900, "data": { "id": 3, "currency": "GBP", "NZD": 3.14, "HKD@spéçiäl & characters": 9.2, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}}
# These records are new data.
{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602650000000, "data": { "id": 2, "currency": "EUR", "NZD": 3.89, "HKD@spéçiäl & characters": 14.05, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}}
{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602650010000, "data": { "id": 4, "currency": "HKD", "NZD": 1.19, "HKD@spéçiäl & characters": 0.01, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}}
{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602650011000, "data": { "id": 1, "currency": "USD", "date": "2020-10-14", "timestamp_col": "2020-10-14T00:00:00.000-00", "NZD": 1.14, "HKD@spéçiäl & characters": 9.5, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}}
{"type": "RECORD", "record": {"stream": "dedup_exchange_rate", "emitted_at": 1602650012000, "data": { "id": 5, "currency": "USD", "NZD": 0.01, "HKD@spéçiäl & characters": 6.39, "HKD_special___characters": "column name collision?", "column`_'with\"_quotes":"ma\"z`d'a"}}}

# All of these records are new data.
# This record has an _older_ emitted_at than the latest dedup_cdc_excluded record in messages.txt
{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":5,"name":"vw","column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623849314663,"_ab_cdc_lsn":26975264,"_ab_cdc_deleted_at":null},"emitted_at":1623860160}}
{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":5,"name":null,"column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623900000000,"_ab_cdc_lsn":28010252,"_ab_cdc_deleted_at":1623900000000},"emitted_at":1623900000000}}
# Previously we had a bug where we only respected deletions from the most recent _airbyte_emitted_at. This message tests that ID 5 is still correctly deleted (i.e. marked with _airbyte_active_row = 0).
# This record is also deleted in messages_schema_change.txt.
{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":8,"name":"ford","column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1624000000000,"_ab_cdc_lsn":29010252,"_ab_cdc_deleted_at":null},"emitted_at":1624000000000}}

# All of these records are old data.
{"type":"RECORD","record":{"stream":"pos_dedup_cdcx","data":{"id":1,"name":"mazda","_ab_cdc_updated_at":1623849130530,"_ab_cdc_lsn":26971624,"_ab_cdc_log_pos": 33274,"_ab_cdc_deleted_at":null},"emitted_at":1623859926}}
{"type":"RECORD","record":{"stream":"pos_dedup_cdcx","data":{"id":2,"name":"toyata","_ab_cdc_updated_at":1623849130549,"_ab_cdc_lsn":26971624,"_ab_cdc_log_pos": 33275,"_ab_cdc_deleted_at":null},"emitted_at":1623859926}}
{"type":"RECORD","record":{"stream":"pos_dedup_cdcx","data":{"id":2,"name":"bmw","_ab_cdc_updated_at":1623849314535,"_ab_cdc_lsn":26974776,"_ab_cdc_log_pos": 33278,"_ab_cdc_deleted_at":null},"emitted_at":1623860160}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@
{"type":"RECORD","record":{"stream":"renamed_dedup_cdc_excluded","data":{"id":9,"name":"opel","column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623950868109,"_ab_cdc_lsn":28009440,"_ab_cdc_deleted_at":null},"emitted_at":1623961660}}
{"type":"RECORD","record":{"stream":"renamed_dedup_cdc_excluded","data":{"id":9,"name":null,"column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1623950868371,"_ab_cdc_lsn":28010232,"_ab_cdc_deleted_at":1623950868371},"emitted_at":1623961660}}

# This message tests the ability to delete a record which was inserted in a previous sync. See messages_incremental.txt for how it was inserted.
{"type":"RECORD","record":{"stream":"dedup_cdc_excluded","data":{"id":8,"name":"ford","column`_'with\"_quotes":"ma\"z`d'a","_ab_cdc_updated_at":1625000000000,"_ab_cdc_lsn":29020252,"_ab_cdc_deleted_at":1625000000000},"emitted_at":1625000000000}}
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,13 @@ def safe_cast_to_string(definition: Dict, column_name: str, destination_type: De
return col

def generate_scd_type_2_model(self, from_table: str, column_names: Dict[str, Tuple[str, str]]) -> Any:
"""
This model pulls data from the ID-hashing model and appends it to a log of record updates. When inserting an update to a record, it also
checks whether that record had a previously-existing row in the SCD model; if it does, then that previous row's end_at column is set to
the new update's start_at.
See the docs for more details: https://docs.airbyte.com/understanding-airbyte/basic-normalization#normalization-metadata-columns
"""
cursor_field = self.get_cursor_field(column_names)
order_null = f"is null asc,\n {cursor_field} desc"
if self.destination_type.value == DestinationType.ORACLE.value:
Expand Down Expand Up @@ -1026,6 +1033,10 @@ def get_primary_key_from_path(self, column_names: Dict[str, Tuple[str, str]], pa
raise ValueError(f"No path specified for stream {self.stream_name}")

def generate_final_model(self, from_table: str, column_names: Dict[str, Tuple[str, str]], unique_key: str = "") -> Any:
"""
This is the table that the user actually wants. In addition to the columns that the source outputs, it has some additional metadata columns;
see the basic normalization docs for an explanation: https://docs.airbyte.com/understanding-airbyte/basic-normalization#normalization-metadata-columns
"""
template = Template(
"""
-- Final base SQL model
Expand Down
12 changes: 6 additions & 6 deletions docs/understanding-airbyte/basic-normalization.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ You'll notice that some metadata are added to keep track of important informatio

Additional metadata columns can be added on some tables depending on the usage:
- On the Slowly Changing Dimension (SCD) tables:
- `_airbyte_start_at`: equivalent to the cursor column defined on the table, denotes when the row was first seen
- `_airbyte_end_at`: denotes until when the row was seen with these particular values. If this column is not NULL, then the record has been updated and is no longer the most up to date one. If NULL, then the row is the latest version for the record.
- `_airbyte_active_row`: denotes if the row for the record is the latest version or not.
- `_airbyte_unique_key_scd`: hash of primary keys + cursors used to de-duplicate the scd table.
- On de-duplicated (and SCD) tables:
- `_airbyte_unique_key`: hash of primary keys used to de-duplicate the final table.
- `_airbyte_start_at`: equivalent to the cursor column defined on the table, denotes when the row was first seen
- `_airbyte_end_at`: denotes until when the row was seen with these particular values. If this column is not NULL, then the record has been updated and is no longer the most up to date one. If NULL, then the row is the latest version for the record.
- `_airbyte_active_row`: denotes if the row for the record is the latest version or not.
- `_airbyte_unique_key_scd`: hash of primary keys + cursors used to de-duplicate the scd table.
- On de-duplicated (and SCD) tables:
- `_airbyte_unique_key`: hash of primary keys used to de-duplicate the final table.

The [normalization rules](basic-normalization.md#Rules) are _not_ configurable. They are designed to pick a reasonable set of defaults to hit the 80/20 rule of data normalization. We respect that normalization is a detail-oriented problem and that with a fixed set of rules, we cannot normalize your data in such a way that covers all use cases. If this feature does not meet your normalization needs, we always put the full json blob in destination as well, so that you can parse that object however best meets your use case. We will be adding more advanced normalization functionality shortly. Airbyte is focused on the EL of ELT. If you need a really featureful tool for the transformations then, we suggest trying out dbt.

Expand Down

0 comments on commit 897522c

Please sign in to comment.