Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sample Mode Alpha #11247

Merged
merged 20 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
1faa8b7
Add `--sample` flag to `run` command
QMalcolm Jan 24, 2025
2394402
Remove no longer needed `if` statement around EventTimeFilter creatio…
QMalcolm Jan 24, 2025
25fb1e5
Get sample mode working with `--event-time-start/end`
QMalcolm Jan 24, 2025
6f68797
Begin using `--sample-window` for sample mode instead of `--event-tim…
QMalcolm Jan 24, 2025
6bcb328
Move `SampleWindow` class to `sample_window.py` in `event_time` submo…
QMalcolm Jan 27, 2025
d6f113f
Create an `offset_timestamp` separate from MicrobatchBuilder
QMalcolm Jan 28, 2025
c547617
Add `types-python-dateutil` to dev requirements
QMalcolm Jan 28, 2025
cf5bfd6
Begin supporting microbatch models in sample mode
QMalcolm Jan 28, 2025
a8cbec2
Move parsing logic of `SampleWindowType` to `SampleWindow`
QMalcolm Jan 29, 2025
fb64b00
Allow for specificaion of "specific" sample windows
QMalcolm Jan 29, 2025
10674f7
Fix tests of `BaseResolver.resolve_event_time_filter` for sample mode…
QMalcolm Jan 29, 2025
7e86080
Add `--no-sample` as it's necessary for retry
QMalcolm Jan 29, 2025
2942861
Add guards to accessing of `sample` and `sample_window`
QMalcolm Jan 29, 2025
1811754
Gate sample mode functionality via env var `DBT_EXPERIMENTAL_SAMPLE_M…
QMalcolm Jan 31, 2025
719b550
Add sample mode tests for incremental models
QMalcolm Feb 2, 2025
01ae64f
Add changie doc for sample mode initial implementation
QMalcolm Feb 2, 2025
5c51a6f
Fixup sample mode functional tests
QMalcolm Feb 3, 2025
3c5539b
Ensure microbatch creates correct number of batches when sample mode …
QMalcolm Feb 3, 2025
6d001d7
Correct comment in SampleWindow post serialization method
QMalcolm Feb 3, 2025
1ef8b3f
Hide CLI sample mode options
QMalcolm Feb 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20250202-140054.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Initial implementation of sample mode
time: 2025-02-02T14:00:54.074209-06:00
custom:
Author: QMalcolm
Issue: 11227 11230 11231 11248 11252 11254 11258
3 changes: 3 additions & 0 deletions core/dbt/artifacts/resources/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,6 @@ class BatchSize(StrEnum):
day = "day"
month = "month"
year = "year"

def plural(self) -> str:
return str(self) + "s"
2 changes: 2 additions & 0 deletions core/dbt/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,8 @@ def parse(ctx, **kwargs):
@p.empty
@p.event_time_start
@p.event_time_end
@p.sample
@p.sample_window
@p.select
@p.selector
@p.target_path
Expand Down
33 changes: 32 additions & 1 deletion core/dbt/cli/option_types.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from click import Choice, ParamType
from typing import Optional

import pytz
from click import Choice, Context, Parameter, ParamType

from dbt.config.utils import normalize_warn_error_options, parse_cli_yaml_string
from dbt.event_time.sample_window import SampleWindow
from dbt.events import ALL_EVENT_NAMES
from dbt.exceptions import OptionNotYamlDictError, ValidationError
from dbt_common.exceptions import DbtValidationError
Expand Down Expand Up @@ -88,3 +92,30 @@
super().convert(value, param, ctx)

return value


class SampleWindowType(ParamType):
name = "SAMPLE_WINDOW"

def convert(
self, value, param: Optional[Parameter], ctx: Optional[Context]
) -> Optional[SampleWindow]:
if value is None:
return None

Check warning on line 104 in core/dbt/cli/option_types.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/cli/option_types.py#L104

Added line #L104 was not covered by tests

if isinstance(value, str):
try:
# Try and identify if it's a "dict" or a "str"
if value.lstrip()[0] == "{":
param_option_name: str = param.opts[0] if param.opts else param.name # type: ignore
parsed_dict = parse_cli_yaml_string(value, param_option_name.strip("-"))
sample_window = SampleWindow.from_dict(parsed_dict)
sample_window.start = sample_window.start.replace(tzinfo=pytz.UTC)
sample_window.end = sample_window.end.replace(tzinfo=pytz.UTC)
return sample_window
else:
return SampleWindow.from_relative_string(value)
except Exception as e:
self.fail(e.__str__(), param, ctx)
else:
self.fail(f"Cannot load SAMPLE_WINDOW from type {type(value)}", param, ctx)

Check warning on line 121 in core/dbt/cli/option_types.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/cli/option_types.py#L121

Added line #L121 was not covered by tests
24 changes: 23 additions & 1 deletion core/dbt/cli/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@

import click

from dbt.cli.option_types import YAML, ChoiceTuple, Package, WarnErrorOptionsType
from dbt.cli.option_types import (
YAML,
ChoiceTuple,
Package,
SampleWindowType,
WarnErrorOptionsType,
)
from dbt.cli.options import MultiOption
from dbt.cli.resolvers import default_profiles_dir, default_project_dir
from dbt.version import get_version_information
Expand Down Expand Up @@ -518,6 +524,22 @@
default=(),
)

sample = click.option(
"--sample/--no-sample",
envvar="DBT_SAMPLE",
help="Run in sample mode, creating only samples of models where possible",
default=False,
is_flag=True,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hide, and later unhide. I.e. hidden=True

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For clarification, we want to hide these while we're in alpha / gated behind an environment variable. Once we remove the environment variable, we'll unhide these

)

sample_window = click.option(
"--sample-window",
envvar="DBT_SAMPLE_WINDOW",
help="The time window to use with sample mode. Example: '3 days'.",
default=None,
type=SampleWindowType(),
)

# `--select` and `--models` are analogous for most commands except `dbt list` for legacy reasons.
# Most CLI arguments should use the combined `select` option that aliases `--models` to `--select`.
# However, if you need to split out these separators (like `dbt ls`), use the `models` and `raw_select` options instead.
Expand Down
56 changes: 47 additions & 9 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,23 +237,61 @@ def resolve_limit(self) -> Optional[int]:

def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeFilter]:
event_time_filter = None
sample_mode = bool(
os.environ.get("DBT_EXPERIMENTAL_SAMPLE_MODE")
and getattr(self.config.args, "sample", False)
and getattr(self.config.args, "sample_window", None)
)

# TODO The number of branches here is getting rough. We should consider ways to simplify
# what is going on to make it easier to maintain

# Only do event time filtering if the base node has the necessary event time configs
if (
(isinstance(target.config, NodeConfig) or isinstance(target.config, SourceConfig))
and target.config.event_time
and isinstance(self.model, ModelNode)
and self.model.config.materialized == "incremental"
and self.model.config.incremental_strategy == "microbatch"
and self.manifest.use_microbatch_batches(project_name=self.config.project_name)
and self.model.batch is not None
):
start = self.model.batch.event_time_start
end = self.model.batch.event_time_end

if start is not None or end is not None:
# Handling of microbatch models
if (
self.model.config.materialized == "incremental"
and self.model.config.incremental_strategy == "microbatch"
and self.manifest.use_microbatch_batches(project_name=self.config.project_name)
and self.model.batch is not None
):
# Sample mode microbatch models
if sample_mode:
start = (
self.config.args.sample_window.start
if self.config.args.sample_window.start > self.model.batch.event_time_start
else self.model.batch.event_time_start
)
end = (
self.config.args.sample_window.end
if self.config.args.sample_window.end < self.model.batch.event_time_end
else self.model.batch.event_time_end
)
event_time_filter = EventTimeFilter(
field_name=target.config.event_time,
start=start,
end=end,
)

# Regular microbatch models
else:
event_time_filter = EventTimeFilter(
field_name=target.config.event_time,
start=self.model.batch.event_time_start,
end=self.model.batch.event_time_end,
)

# Sample mode _non_ microbatch models
elif sample_mode:
event_time_filter = EventTimeFilter(
field_name=target.config.event_time,
start=start,
end=end,
start=self.config.args.sample_window.start,
end=self.config.args.sample_window.end,
)

return event_time_filter
Expand Down
40 changes: 40 additions & 0 deletions core/dbt/event_time/event_time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from datetime import datetime

from dateutil.relativedelta import relativedelta

from dbt.artifacts.resources.types import BatchSize
from dbt_common.exceptions import DbtRuntimeError


def offset_timestamp(timestamp=datetime, batch_size=BatchSize, offset=int) -> datetime:
"""Offsets the passed in timestamp based on the batch_size and offset.

Note: THIS IS DIFFERENT FROM MicrobatchBuilder.offset_timestamp. That function first
`truncates` the timestamp, and then does delta addition subtraction from there. This
function _doesn't_ truncate the timestamp and uses `relativedelta` for specific edge
case handling (months, years), which may produce different results than the delta math
done in `MicrobatchBuilder.offset_timestamp`

Examples
2024-09-17 16:06:00 + Batchsize.hour -1 -> 2024-09-17 15:06:00
2024-09-17 16:06:00 + Batchsize.hour +1 -> 2024-09-17 17:06:00
2024-09-17 16:06:00 + Batchsize.day -1 -> 2024-09-16 16:06:00
2024-09-17 16:06:00 + Batchsize.day +1 -> 2024-09-18 16:06:00
2024-09-17 16:06:00 + Batchsize.month -1 -> 2024-08-17 16:06:00
2024-09-17 16:06:00 + Batchsize.month +1 -> 2024-10-17 16:06:00
2024-09-17 16:06:00 + Batchsize.year -1 -> 2023-09-17 16:06:00
2024-09-17 16:06:00 + Batchsize.year +1 -> 2025-09-17 16:06:00
2024-01-31 16:06:00 + Batchsize.month +1 -> 2024-02-29 16:06:00
2024-02-29 16:06:00 + Batchsize.year +1 -> 2025-02-28 16:06:00
"""

if batch_size == BatchSize.hour:
return timestamp + relativedelta(hours=offset)
elif batch_size == BatchSize.day:
return timestamp + relativedelta(days=offset)
elif batch_size == BatchSize.month:
return timestamp + relativedelta(months=offset)
elif batch_size == BatchSize.year:
return timestamp + relativedelta(years=offset)
else:
raise DbtRuntimeError(f"Unhandled batch_size '{batch_size}'")

Check warning on line 40 in core/dbt/event_time/event_time.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/event_time/event_time.py#L40

Added line #L40 was not covered by tests
60 changes: 60 additions & 0 deletions core/dbt/event_time/sample_window.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from __future__ import annotations

from datetime import datetime

import pytz
from attr import dataclass

from dbt.artifacts.resources.types import BatchSize
from dbt.event_time.event_time import offset_timestamp
from dbt_common.dataclass_schema import dbtClassMixin
from dbt_common.exceptions import DbtRuntimeError


@dataclass
class SampleWindow(dbtClassMixin):
start: datetime
end: datetime

def __post_serialize__(self, data, context):
# This is insane, but necessary, I apologize. Mashumaro handles the
# dictification of this class via a compile time generated `to_dict`
# method based off of the _typing_ of th class. By default `datetime`
# types are converted to strings. We don't want that, we want them to
# stay datetimes.
# Note: This is safe because the `BatchContext` isn't part of the artifact
# and thus doesn't get written out.
new_data = super().__post_serialize__(data, context)
new_data["start"] = self.start
new_data["end"] = self.end
return new_data

@classmethod
def from_relative_string(cls, relative_string: str) -> SampleWindow:
end = datetime.now(tz=pytz.UTC)

relative_window = relative_string.split(" ")
if len(relative_window) != 2:
raise DbtRuntimeError(
f"Cannot load SAMPLE_WINDOW from '{relative_string}'. Must be of form 'DAYS_INT GRAIN_SIZE'."
)

try:
lookback = int(relative_window[0])
except Exception:
raise DbtRuntimeError(f"Unable to convert '{relative_window[0]}' to an integer.")

try:
batch_size_string = relative_window[1].lower().rstrip("s")
batch_size = BatchSize[batch_size_string]
except Exception:
grains = [size.value for size in BatchSize]
grain_plurals = [BatchSize.plural(size) for size in BatchSize]
valid_grains = grains + grain_plurals
raise DbtRuntimeError(
f"Invalid grain size '{relative_window[1]}'. Must be one of {valid_grains}."
)

start = offset_timestamp(timestamp=end, batch_size=batch_size, offset=-1 * lookback)

return cls(start=start, end=end)
17 changes: 15 additions & 2 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import functools
import os
import threading
import time
from copy import deepcopy
Expand Down Expand Up @@ -556,11 +557,23 @@ def _execute_microbatch_materialization(
context: Dict[str, Any],
materialization_macro: MacroProtocol,
) -> RunResult:
# TODO: This method has gotten a little large. It may be time to break it up into more manageable parts.
event_time_start = getattr(self.config.args, "EVENT_TIME_START", None)
event_time_end = getattr(self.config.args, "EVENT_TIME_END", None)

if (
os.environ.get("DBT_EXPERIMENTAL_SAMPLE_MODE")
and getattr(self.config.args, "SAMPLE", None)
and getattr(self.config.args, "SAMPLE_WINDOW", None)
):
event_time_start = self.config.args.sample_window.start
event_time_end = self.config.args.sample_window.end

microbatch_builder = MicrobatchBuilder(
model=model,
is_incremental=self._is_incremental(model),
event_time_start=getattr(self.config.args, "EVENT_TIME_START", None),
event_time_end=getattr(self.config.args, "EVENT_TIME_END", None),
event_time_start=event_time_start,
event_time_end=event_time_end,
default_end_time=self.config.invoked_at,
)

Expand Down
1 change: 1 addition & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ types-PyYAML
types-Jinja2
types-mock
types-protobuf>=5.0,<6.0
types-python-dateutil
types-pytz
types-requests
types-setuptools
Expand Down
Loading
Loading