Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
11 changes: 11 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,14 @@ test.env
.claude
helper_scripts
tmp

# ignore secrets, virtual environments and typical python compilation artifacts
secrets.toml
# ignore basic python artifacts
.env
**/__pycache__/
**/*.py[cod]
**/*$py.class
# ignore duckdb
*.duckdb
*.wal
46 changes: 23 additions & 23 deletions NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,28 @@ Copyright 2022-2025 ScaleVector
Licensed under the Apache License, Version 2.0

The following directories contain code from dlt-hub:
- omniload/src/airtable/
- omniload/src/asana_source/
- omniload/src/chess/
- omniload/src/facebook_ads/
- omniload/src/filesystem/
- omniload/src/freshdesk/
- omniload/src/github/
- omniload/src/google_ads/
- omniload/src/google_analytics/
- omniload/src/google_sheets/
- omniload/src/hubspot/
- omniload/src/jira_source/
- omniload/src/kafka/
- omniload/src/kinesis/
- omniload/src/mongodb/
- omniload/src/notion/
- omniload/src/personio/
- omniload/src/pipedrive/
- omniload/src/salesforce/
- omniload/src/shopify/
- omniload/src/slack/
- omniload/src/stripe_analytics/
- omniload/src/zendesk/
- omniload/source/airtable/
- omniload/source/asana/
- omniload/source/chess/
- omniload/source/facebook_ads/
- omniload/source/filesystem/
- omniload/source/freshdesk/
- omniload/source/github/
- omniload/source/google_ads/
- omniload/source/google_analytics/
- omniload/source/google_sheets/
- omniload/source/hubspot/
- omniload/source/jira/
- omniload/source/kafka/
- omniload/source/kinesis/
- omniload/source/mongodb/
- omniload/source/notion/
- omniload/source/personio/
- omniload/source/pipedrive/
- omniload/source/salesforce/
- omniload/source/shopify/
- omniload/source/slack/
- omniload/source/stripe/
- omniload/source/zendesk/

The Apache License 2.0 can be found in the LICENSE.Apache-2.0 file.
2 changes: 2 additions & 0 deletions docs/backlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

## Iteration +2

- Add Helm chart, and Improve Cloud deployment docs & resources
https://images.minimus.io/
- Refurbish `example-uris` subcommand

## Iteration +3
Expand Down
4 changes: 4 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## in progress

- Maintenance: Refactored module namespace. If you are using omniload
as a library, this introduces many breaking changes. However, the new
layout is much more ergonomic.

## 2026/06/24 v0.3.0

- Feature: Added embeddable `run_ingest()` Python API. Thanks, @hampsterx.
Expand Down
4 changes: 2 additions & 2 deletions docs/supported-sources/google_analytics.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ custom:<dimensions>:<metrics>

```sh
omniload ingest \
--source-uri "googleanalytics://?credentials_path="omniload/src/g_analytics.json&property_id=id123" \
--source-uri "googleanalytics://?credentials_path=/path/to/service/account.json&property_id=id123" \
--source-table "custom:date:activeUsers" \
--dest-uri "duckdb:///analytics.duckdb" \
--dest-table "dest.custom"
Expand Down Expand Up @@ -79,7 +79,7 @@ If no minute_ranges are specified, the system defaults to retrieving data from t

```sh
omniload ingest \
--source-uri "googleanalytics://?credentials_path="omniload/src/g_analytics.json&property_id=id123" \
--source-uri "googleanalytics://?credentials_path=/path/to/service/account.json&property_id=id123" \
--source-table "realtime:streamId:activeUsers:0-4,10-29" \
--dest-uri "duckdb:///analytics.duckdb" \
--dest-table "dest.realtime"
Expand Down
2 changes: 1 addition & 1 deletion omniload/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
SqlReflectionLevel,
run_ingest,
)
from omniload.src.errors import IngestJobError, ValidationError
from omniload.error import IngestJobError, ValidationError

__appname__ = "omniload"

Expand Down
23 changes: 12 additions & 11 deletions omniload/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from enum import Enum
from typing import TYPE_CHECKING

from omniload.src.errors import IngestJobError, ValidationError
from omniload.error import IngestJobError, ValidationError

if TYPE_CHECKING:
from dlt.common.pipeline import LoadInfo
Expand Down Expand Up @@ -145,18 +145,19 @@ def run_ingest(
from dlt.common.schema.typing import TColumnSchema
from dlt.pipeline.exceptions import PipelineStepFailed

import omniload.src.partition as partition
import omniload.src.resource as resource
from omniload.src.collector.spinner import SpinnerCollector
from omniload.src.destinations import AthenaDestination, ClickhouseDestination
from omniload.src.factory import SourceDestinationFactory
from omniload.src.filters import (
import omniload.core.resource as resource
from omniload.codec import hint
from omniload.codec.filter import (
cast_set_to_list,
cast_spanner_types,
create_masking_filter,
handle_mysql_empty_dates,
)
from omniload.src.sources import MongoDbSource
from omniload.core.factory import SourceDestinationFactory
from omniload.source.mongodb.api import MongoDbSource
from omniload.target.athena import AthenaDestination
from omniload.target.clickhouse import ClickhouseDestination
from omniload.util.spinner import SpinnerCollector

incremental_strategy = _coerce(incremental_strategy, IncrementalStrategy)
progress = _coerce(progress, Progress)
Expand Down Expand Up @@ -428,7 +429,7 @@ def parse_columns(columns: list[str]) -> dict:
# https://github.com/dlt-hub/dlt/issues/2248
# TODO(turtledev): only apply for write dispositions that actually cause an exception.
# TODO(turtledev): make batch size configurable
import omniload.src.arrow as arrow
import omniload.source.arrow.adapter as arrow

resource.for_each(dlt_source, lambda x: x.add_map(arrow.as_list))

Expand All @@ -440,7 +441,7 @@ def parse_columns(columns: list[str]) -> dict:
resource.for_each(dlt_source, lambda x: x.add_limit(yield_limit))

if isinstance(source, MongoDbSource):
from omniload.src.resource import TypeHintMap
from omniload.core.resource import TypeHintMap

resource.for_each(dlt_source, lambda x: x.add_map(TypeHintMap().type_hint_map))

Expand All @@ -451,7 +452,7 @@ def col_h(x):
resource.for_each(dlt_source, col_h)

if isinstance(destination, AthenaDestination) and partition_by:
partition.apply_athena_hints(dlt_source, partition_by, column_hints)
hint.apply_athena_hints(dlt_source, partition_by, column_hints)

if isinstance(destination, ClickhouseDestination):
from dlt.destinations.adapters import clickhouse_adapter
Expand Down
2 changes: 1 addition & 1 deletion omniload/src/filters.py → omniload/codec/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def excluder(table: Table):


def create_masking_filter(mask_configs: list[str]):
from omniload.src.masking import create_masking_mapper
from omniload.codec.masking import create_masking_mapper

if not mask_configs:
return lambda x: x
Expand Down
2 changes: 1 addition & 1 deletion omniload/src/partition.py → omniload/codec/hint.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dlt.common.schema.typing import TColumnSchema
from dlt.sources import DltResource, DltSource

import omniload.src.resource as resource
import omniload.core.resource as resource


def apply_athena_hints(
Expand Down
File renamed without changes.
48 changes: 48 additions & 0 deletions omniload/core/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from typing import Dict, Type
from urllib.parse import urlparse

from omniload.core.model import DestinationProtocol, SourceProtocol
from omniload.core.registry import SQL_SOURCE_SCHEMES, destinations, sources
from omniload.core.router import SqlSourceRouter


def parse_scheme_from_uri(uri: str) -> str:
parsed = urlparse(uri)
if parsed.scheme != "":
return parsed.scheme

uri_parts = uri.split("://")
if len(uri_parts) > 1:
return uri_parts[0]

raise ValueError(f"Could not parse scheme from uri: {uri}")


class SourceDestinationFactory:
source_scheme: str
destination_scheme: str
sources: Dict[str, Type[SourceProtocol]] = sources
destinations: Dict[str, Type[DestinationProtocol]] = destinations

def __init__(self, source_uri: str, destination_uri: str):
self.source_uri = source_uri
self.source_scheme = parse_scheme_from_uri(source_uri)

self.destination_uri = destination_uri
self.destination_scheme = parse_scheme_from_uri(destination_uri)

def get_source(self) -> SourceProtocol:
if self.source_scheme in SQL_SOURCE_SCHEMES:
return SqlSourceRouter()
elif self.source_scheme in self.sources:
return self.sources[self.source_scheme]()
else:
raise ValueError(f"Unsupported source scheme: {self.source_scheme}")

def get_destination(self) -> DestinationProtocol:
if self.destination_scheme in self.destinations:
return self.destinations[self.destination_scheme]()
else:
raise ValueError(
f"Unsupported destination scheme: {self.destination_scheme}"
)
37 changes: 37 additions & 0 deletions omniload/core/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from dataclasses import dataclass
from typing import Protocol

from dlt.common.destination import Destination


class SourceProtocol(Protocol):
def dlt_source(self, uri: str, table: str, **kwargs):
pass

def handles_incrementality(self) -> bool:
pass


class DestinationProtocol(Protocol):
def dlt_dest(self, uri: str, **kwargs) -> Destination:
pass

def dlt_run_params(self, uri: str, table: str, **kwargs):
pass

def post_load(self) -> None:
pass


@dataclass
class TableDefinition:
dataset: str
table: str


def table_string_to_dataclass(table: str) -> TableDefinition:
table_fields = table.split(".", 1)
if len(table_fields) != 2:
raise ValueError("Table name must be in the format <schema>.<table>")

return TableDefinition(dataset=table_fields[0], table=table_fields[1])
Loading
Loading