Skip to content

fix: (PoC) (do not merge) (CDK) (Manifest) - Migrate manifest fields #463

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 26 commits into from
Closed
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d4ffa27
add migrations to the manifest
bazarnov Apr 8, 2025
6593490
Merge remote-tracking branch 'origin/main' into baz/cdk/manifest-migr…
bazarnov Apr 8, 2025
a73d162
add stack of migrations
bazarnov Apr 9, 2025
b358c02
add deprecation warnings
bazarnov Apr 10, 2025
1a53161
Merge remote-tracking branch 'origin/main' into baz/cdk/manifest-migr…
bazarnov Apr 10, 2025
a0bc96e
updated the migrations approach
bazarnov Apr 11, 2025
424ddbc
formatted, removed old implementation
bazarnov Apr 11, 2025
475ea83
removed custom non-related exception
bazarnov Apr 11, 2025
fd5d696
removed unused conftest
bazarnov Apr 11, 2025
403a41f
formatted
bazarnov Apr 11, 2025
b43c079
Merge remote-tracking branch 'origin/main' into baz/cdk/manifest-migr…
bazarnov Apr 14, 2025
046c308
cleaned up
bazarnov Apr 14, 2025
54bf364
emit deprecation warnings in Connector Builder
bazarnov Apr 14, 2025
5bf675e
Merge remote-tracking branch 'origin/main' into baz/cdk/manifest-migr…
bazarnov Apr 14, 2025
9c87e5e
Merge remote-tracking branch 'origin/main' into baz/cdk/manifest-migr…
bazarnov Apr 15, 2025
b2a49d3
added version checks for manifest migrations. Added auto-import for a…
bazarnov Apr 15, 2025
3630289
Merge remote-tracking branch 'origin/main' into baz/cdk/manifest-migr…
bazarnov Apr 15, 2025
7fcb29d
formatted
bazarnov Apr 15, 2025
b912e82
updated
bazarnov Apr 15, 2025
4085c8e
changed the naming structure for the migrations
bazarnov Apr 15, 2025
a3679a3
add README.md
bazarnov Apr 15, 2025
7b16aab
add __requires_migration: bool flag to manifest config
bazarnov Apr 15, 2025
d145cde
Merge remote-tracking branch 'origin/main' into baz/cdk/manifest-migr…
bazarnov Apr 15, 2025
4a051c2
dummy change line
bazarnov Apr 16, 2025
6814f08
updated
bazarnov Apr 22, 2025
cf5cf5f
Merge remote-tracking branch 'origin/main' into baz/cdk/manifest-migr…
bazarnov Apr 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions airbyte_cdk/connector_builder/connector_builder_handler.py
Original file line number Diff line number Diff line change
@@ -56,12 +56,23 @@ def get_limits(config: Mapping[str, Any]) -> TestLimits:
return TestLimits(max_records, max_pages_per_slice, max_slices, max_streams)


def should_migrate_manifest(config: Mapping[str, Any]) -> bool:
"""
Check if the manifest requires migration.

:param config: The config to check
:return: True if the manifest requires migration, False otherwise
"""
return config.get("__should_migrate", False)


def create_source(config: Mapping[str, Any], limits: TestLimits) -> ManifestDeclarativeSource:
manifest = config["__injected_declarative_manifest"]
return ManifestDeclarativeSource(
config=config,
emit_connector_builder_messages=True,
source_config=manifest,
migrate_manifest=should_migrate_manifest(config),
component_factory=ModelToComponentFactory(
emit_connector_builder_messages=True,
limit_pages_fetched_per_slice=limits.max_pages_per_slice,
25 changes: 23 additions & 2 deletions airbyte_cdk/connector_builder/test_reader/reader.py
Original file line number Diff line number Diff line change
@@ -112,11 +112,16 @@ def run_test_read(
record_limit = self._check_record_limit(record_limit)
# The connector builder currently only supports reading from a single stream at a time
stream = source.streams(config)[0]

# get any deprecation warnings during the component creation
deprecation_warnings: List[AirbyteLogMessage] = source.deprecation_warnings()

schema_inferrer = SchemaInferrer(
self._pk_to_nested_and_composite_field(stream.primary_key),
self._cursor_field_to_nested_and_composite_field(stream.cursor_field),
)
datetime_format_inferrer = DatetimeFormatInferrer()

message_group = get_message_groups(
self._read_stream(source, config, configured_catalog, state),
schema_inferrer,
@@ -125,7 +130,7 @@ def run_test_read(
)

slices, log_messages, auxiliary_requests, latest_config_update = self._categorise_groups(
message_group
message_group, deprecation_warnings
)
schema, log_messages = self._get_infered_schema(
configured_catalog, schema_inferrer, log_messages
@@ -238,7 +243,11 @@ def _check_record_limit(self, record_limit: Optional[int] = None) -> int:

return record_limit

def _categorise_groups(self, message_groups: MESSAGE_GROUPS) -> GROUPED_MESSAGES:
def _categorise_groups(
self,
message_groups: MESSAGE_GROUPS,
deprecation_warnings: Optional[List[Any]] = None,
) -> GROUPED_MESSAGES:
"""
Categorizes a sequence of message groups into slices, log messages, auxiliary requests, and the latest configuration update.

@@ -269,6 +278,7 @@ def _categorise_groups(self, message_groups: MESSAGE_GROUPS) -> GROUPED_MESSAGES
auxiliary_requests = []
latest_config_update: Optional[AirbyteControlMessage] = None

# process the message groups first
for message_group in message_groups:
match message_group:
case AirbyteLogMessage():
@@ -298,6 +308,17 @@ def _categorise_groups(self, message_groups: MESSAGE_GROUPS) -> GROUPED_MESSAGES
case _:
raise ValueError(f"Unknown message group type: {type(message_group)}")

# process deprecation warnings, if present
if deprecation_warnings is not None:
for deprecation in deprecation_warnings:
match deprecation:
case AirbyteLogMessage():
log_messages.append(
LogMessage(message=deprecation.message, level=deprecation.level.value)
)
case _:
raise ValueError(f"Unknown message group type: {type(deprecation)}")

return slices, log_messages, auxiliary_requests, latest_config_update

def _get_infered_schema(
29 changes: 26 additions & 3 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
@@ -1863,14 +1863,16 @@ definitions:
type: object
required:
- type
- url_base
properties:
type:
type: string
enum: [HttpRequester]
url_base:
deprecated: true
deprecation_message: "Use `url` field instead."
sharable: true
title: API Base URL
description: Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.
description: Deprecated, use the `url` instead. Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.
type: string
interpolation_context:
- config
@@ -1886,9 +1888,30 @@ definitions:
- "{{ config['base_url'] or 'https://app.posthog.com'}}/api"
- "https://connect.squareup.com/v2/quotes/{{ stream_partition['id'] }}/quote_line_groups"
- "https://example.com/api/v1/resource/{{ next_page_token['id'] }}"
url:
sharable: true
title: API URL
description: The URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.
type: string
interpolation_context:
- config
- next_page_token
- stream_interval
- stream_partition
- stream_slice
- creation_response
- polling_response
- download_target
examples:
- "https://connect.squareup.com/v2"
- "{{ config['url'] or 'https://app.posthog.com'}}/api"
- "https://connect.squareup.com/v2/quotes/{{ stream_partition['id'] }}/quote_line_groups"
- "https://example.com/api/v1/resource/{{ next_page_token['id'] }}"
path:
deprecated: true
deprecation_message: "Use `url` field instead."
title: URL Path
description: Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.
description: Deprecated, use the `url` instead. Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.
type: string
interpolation_context:
- config
11 changes: 10 additions & 1 deletion airbyte_cdk/sources/declarative/declarative_source.py
Original file line number Diff line number Diff line change
@@ -4,8 +4,11 @@

import logging
from abc import abstractmethod
from typing import Any, Mapping, Tuple
from typing import Any, List, Mapping, Tuple

from airbyte_cdk.models import (
AirbyteLogMessage,
)
from airbyte_cdk.sources.abstract_source import AbstractSource
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker

@@ -34,3 +37,9 @@ def check_connection(
The error object will be cast to string to display the problem to the user.
"""
return self.connection_checker.check_connection(self, logger, config)

def deprecation_warnings(self) -> List[AirbyteLogMessage]:
"""
Returns a list of deprecation warnings for the source.
"""
return []
24 changes: 20 additions & 4 deletions airbyte_cdk/sources/declarative/manifest_declarative_source.py
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@

from airbyte_cdk.models import (
AirbyteConnectionStatus,
AirbyteLogMessage,
AirbyteMessage,
AirbyteStateMessage,
ConfiguredAirbyteCatalog,
@@ -26,6 +27,9 @@
from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
from airbyte_cdk.sources.declarative.migrations.manifest.migration_handler import (
ManifestMigrationHandler,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
DeclarativeStream as DeclarativeStreamModel,
)
@@ -68,16 +72,19 @@ def __init__(
debug: bool = False,
emit_connector_builder_messages: bool = False,
component_factory: Optional[ModelToComponentFactory] = None,
):
migrate_manifest: Optional[bool] = False,
) -> None:
"""
Args:
config: The provided config dict.
source_config: The manifest of low-code components that describe the source connector.
debug: True if debug mode is enabled.
emit_connector_builder_messages: True if messages should be emitted to the connector builder.
component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
debug: bool True if debug mode is enabled.
emit_connector_builder_messages: Optional[bool] True if messages should be emitted to the connector builder.
component_factory: Optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
migrate_manifest: Optional[bool] if the manifest should be migrated to pick up the latest declarative component schema changes at runtime.
"""
self.logger = logging.getLogger(f"airbyte.{self.name}")

# For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
manifest = dict(source_config)
if "type" not in manifest:
@@ -90,6 +97,12 @@ def __init__(
propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters(
"", resolved_source_config, {}
)

if migrate_manifest:
propagated_source_config = ManifestMigrationHandler(
propagated_source_config
).apply_migrations()

self._source_config = propagated_source_config
self._debug = debug
self._emit_connector_builder_messages = emit_connector_builder_messages
@@ -123,6 +136,9 @@ def dynamic_streams(self) -> List[Dict[str, Any]]:
manifest=self._source_config, config=self._config, with_dynamic_stream_name=True
)

def deprecation_warnings(self) -> List[AirbyteLogMessage]:
return self._constructor.get_model_deprecations() or []

@property
def connection_checker(self) -> ConnectionChecker:
check = self._source_config["check"]
66 changes: 66 additions & 0 deletions airbyte_cdk/sources/declarative/migrations/manifest/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Manifest Migrations

This directory contains the logic and registry for manifest migrations in the Airbyte CDK. Migrations are used to update or transform manifest components to newer formats or schemas as the CDK evolves.

## Adding a New Migration

1. **Create a Migration File:**
- Add a new Python file in the `migrations/` subdirectory.
- Name the file using the pattern: `<description>_v<major>_<minor>_<patch>__<order>.py`.
- Example: `http_requester_url_base_to_url_v6_45_2__0.py`
- The `<order>` integer is used to determine the order of migrations for the same version.

2. **Define the Migration Class:**
- The migration class must inherit from `ManifestMigration`.
- Name the class using the pattern: `V_<major>_<minor>_<patch>_ManifestMigration_<Description>`.
- Example: `V_6_45_2_ManifestMigration_HttpRequesterUrlBaseToUrl`
- Implement the following methods:
- `should_migrate(self, manifest: ManifestType) -> bool`: Return `True` if the migration should be applied to the given manifest.
- `migrate(self, manifest: ManifestType) -> None`: Perform the migration in-place.

3. **Migration Versioning:**
- The migration version is extracted from the class name and used to determine applicability.
- Only manifests with a version less than or equal to the migration version will be migrated.

4. **Component Type:**
- Use the `TYPE_TAG` constant to check the component type in your migration logic.

5. **Examples:**
- See `migrations/http_requester_url_base_to_url_v6_45_2__0.py` and `migrations/http_requester_path_to_url_v6_45_2__1.py` for reference implementations.

## Migration Registry

- All migration classes in the `migrations/` folder are automatically discovered and registered in `migrations_registry.py`.
- Migrations are applied in order, determined by the `<order>` suffix in the filename.

## Testing

- Ensure your migration is covered by unit tests.
- Tests should verify both `should_migrate` and `migrate` behaviors.

## Example Migration Skeleton

```python
from airbyte_cdk.sources.declarative.migrations.manifest.manifest_migration import TYPE_TAG, ManifestMigration, ManifestType

class V_1_2_3_ManifestMigration_Example(ManifestMigration):
component_type = "ExampleComponent"
original_key = "old_key"
replacement_key = "new_key"

def should_migrate(self, manifest: ManifestType) -> bool:
return manifest[TYPE_TAG] == self.component_type and self.original_key in manifest

def migrate(self, manifest: ManifestType) -> None:
manifest[self.replacement_key] = manifest[self.original_key]
manifest.pop(self.original_key, None)
```

## Additional Notes

- Do not modify the migration registry manually; it will pick up all valid migration classes automatically.
- If you need to skip certain component types, use the `NON_MIGRATABLE_TYPES` list in `manifest_migration.py`.

---

For more details, see the docstrings in `manifest_migration.py` and the examples in the `migrations/` folder.
Empty file.
12 changes: 12 additions & 0 deletions airbyte_cdk/sources/declarative/migrations/manifest/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


class ManifestMigrationException(Exception):
"""
Raised when a migration error occurs in the manifest.
"""

def __init__(self, message: str) -> None:
super().__init__(f"Failed to migrate the manifest: {message}")
Loading