Skip to content

Commit

Permalink
Sample Mode Alpha (#11247)
Browse files Browse the repository at this point in the history
* Add `--sample` flag to `run` command

* Remove no longer needed `if` statement around EventTimeFilter creation for microbatch models

Upon the initial implementation of microbatch models, the the `start` for a batch was _optional_.
However, in c3d87b8 they became guaranteed. Thus the if statement
guarding when `start/end` isn't present for microbatch models was no longer actually doing anything.
Hence, the if statement was safe to remove.

* Get sample mode working with `--event-time-start/end`

This is temporary as a POC. In the end, sample mode can't depend on the arguments
`--event-time-start/end` and will need to be split into their own CLI args / project
config, something like `--sample-window`. The issue with using `--event-time-start/end`
is that if people set those in the project configs, then their microbatch models would
_always_ run with those values even outside of sample mode. Despite that, this is a
useful checkpoint even though it will go away.

* Begin using `--sample-window` for sample mode instead of `--event-time-start/end`

Using `--event-time-start/end` for sample mode was conflicting with microbatch models
when _not_ running in sample mode. We will have to do _slightly_ more work to plumb
this new way of specifying sample time to microbatch models.

* Move `SampleWindow` class to `sample_window.py` in `event_time` submodule

This is mostly symbolic. We are going to be adding some utilities for "event_time"
type things, which will all live in the `event_time` submodule. Additionally we plan
to refactor `/incremental/materializations/microbatch.py` into the sub module as well.

* Create an `offset_timestamp` separate from MicrobatchBuilder

The `MicrobatchBuilder.offset_timestamp` _truncates_ the timestamp before
offsetting it. We don't want to do that, we want to offset the "raw" timestamp.
We could have split renamed the microbatch builder function name to
`truncate_and_offset_timestamp` and separated the offset logic into a separate
abstract function. However, the offset logic in the MicrobatchBuilder context
depends on the truncation. We might later on be able to refactor the Microbatch
provided function by instead truncating _after_ offsetting instead of before.
But that is out of scope for this initial work, and we should instead revisit it
later.

* Add `types-python-dateutil` to dev requirements

The previous commit began using a submodule of the dateutil builtin
python library. We weren't previously using this library, and thus didn't
need the type stubs for it. But now that we do use it, we need to have
the type stubs during development.

* Begin supporting microbatch models in sample mode

* Move parsing logic of `SampleWindowType` to `SampleWindow`

* Allow for specificaion of "specific" sample windows

In most cases people will want to set "relative" sample windows, i.e.
"3 days" to sample the last three days. However, there are some cases
where people will want to "specific" sample windows for some chunk of
historic time, i.e. `{'start': '2024-01-01', 'end': '2024-01-31'}`.

* Fix tests of `BaseResolver.resolve_event_time_filter` for sample mode changes

* Add `--no-sample` as it's necessary for retry

* Add guards to accessing of `sample` and `sample_window`

This was necessary because these aren't _always_ available. I had expected
to need to do this after putting the `sample` flag behind an environment
variable (which I haven't done yet). However, we needed to add the guards
sooner because the `render` logic is called multiple times throughout the
dbt process, and earlier on the flags aren't available.

* Gate sample mode functionality via env var `DBT_EXPERIMENTAL_SAMPLE_MODE`

At this point sample mode is _alpha_ and should not be depended upon. To make
this crystal clear we've gated the functionality behind an environment variable.
We'll likely remove this gate in the coming month.

* Add sample mode tests for incremental models

* Add changie doc for sample mode initial implementation

* Fixup sample mode functional tests

I had updated the `later_input_model.sql` to be easier to test with. However,
I didn't correspondingly update the inital `input_model.sql` to match.

* Ensure microbatch creates correct number of batches when sample mode env var isn't present

Previously microbatch was creating the _right_ number of batches when:
1. sample mode _wasn't_ being used
2. sample mode _was_ being used AND the env var was present

Unfortunately sample mode _wasn't_ creating the right number of batches when:
3. sample mode _was_ being used AND the env var _wasn't_ present.

In case (3) sample mode shouldn't be run. Unfortunately we weren't gating sample
mode by the environment variable during batch creation. This lead to a situtation
where in creating batches it was using sample mode but in the rendering of refs
it _wasn't_ using sample mode. Putting it in an inbetween state... This commit
fixes that issue.

Additionally of note, we currently have duplicate sample mode gating logic in the
batch creation as well as in the rendering of refs. We should probably consolidate
this logic into a singular importable function, that way any future changes of how
sample mode is gated is easier to implement.

* Correct comment in SampleWindow post serialization method

* Hide CLI sample mode options

We are doing this _temporarily_ while sample mode as a feature is in
alpha/beta and locked behind an environment variable. When we remove the
environment variable we should also unhide these.
  • Loading branch information
QMalcolm authored Feb 4, 2025
1 parent fdabe95 commit 5f873da
Show file tree
Hide file tree
Showing 15 changed files with 1,040 additions and 22 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20250202-140054.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Initial implementation of sample mode
time: 2025-02-02T14:00:54.074209-06:00
custom:
Author: QMalcolm
Issue: 11227 11230 11231 11248 11252 11254 11258
3 changes: 3 additions & 0 deletions core/dbt/artifacts/resources/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,6 @@ class BatchSize(StrEnum):
day = "day"
month = "month"
year = "year"

def plural(self) -> str:
return str(self) + "s"
2 changes: 2 additions & 0 deletions core/dbt/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,8 @@ def parse(ctx, **kwargs):
@p.empty
@p.event_time_start
@p.event_time_end
@p.sample
@p.sample_window
@p.select
@p.selector
@p.target_path
Expand Down
33 changes: 32 additions & 1 deletion core/dbt/cli/option_types.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from click import Choice, ParamType
from typing import Optional

import pytz
from click import Choice, Context, Parameter, ParamType

from dbt.config.utils import normalize_warn_error_options, parse_cli_yaml_string
from dbt.event_time.sample_window import SampleWindow
from dbt.events import ALL_EVENT_NAMES
from dbt.exceptions import OptionNotYamlDictError, ValidationError
from dbt_common.exceptions import DbtValidationError
Expand Down Expand Up @@ -88,3 +92,30 @@ def convert(self, value, param, ctx):
super().convert(value, param, ctx)

return value


class SampleWindowType(ParamType):
name = "SAMPLE_WINDOW"

def convert(
self, value, param: Optional[Parameter], ctx: Optional[Context]
) -> Optional[SampleWindow]:
if value is None:
return None

if isinstance(value, str):
try:
# Try and identify if it's a "dict" or a "str"
if value.lstrip()[0] == "{":
param_option_name: str = param.opts[0] if param.opts else param.name # type: ignore
parsed_dict = parse_cli_yaml_string(value, param_option_name.strip("-"))
sample_window = SampleWindow.from_dict(parsed_dict)
sample_window.start = sample_window.start.replace(tzinfo=pytz.UTC)
sample_window.end = sample_window.end.replace(tzinfo=pytz.UTC)
return sample_window
else:
return SampleWindow.from_relative_string(value)
except Exception as e:
self.fail(e.__str__(), param, ctx)
else:
self.fail(f"Cannot load SAMPLE_WINDOW from type {type(value)}", param, ctx)
26 changes: 25 additions & 1 deletion core/dbt/cli/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@

import click

from dbt.cli.option_types import YAML, ChoiceTuple, Package, WarnErrorOptionsType
from dbt.cli.option_types import (
YAML,
ChoiceTuple,
Package,
SampleWindowType,
WarnErrorOptionsType,
)
from dbt.cli.options import MultiOption
from dbt.cli.resolvers import default_profiles_dir, default_project_dir
from dbt.version import get_version_information
Expand Down Expand Up @@ -518,6 +524,24 @@
default=(),
)

sample = click.option(
"--sample/--no-sample",
envvar="DBT_SAMPLE",
help="Run in sample mode, creating only samples of models where possible",
default=False,
is_flag=True,
hidden=True, # TODO: Unhide
)

sample_window = click.option(
"--sample-window",
envvar="DBT_SAMPLE_WINDOW",
help="The time window to use with sample mode. Example: '3 days'.",
default=None,
type=SampleWindowType(),
hidden=True, # TODO: Unhide
)

# `--select` and `--models` are analogous for most commands except `dbt list` for legacy reasons.
# Most CLI arguments should use the combined `select` option that aliases `--models` to `--select`.
# However, if you need to split out these separators (like `dbt ls`), use the `models` and `raw_select` options instead.
Expand Down
56 changes: 47 additions & 9 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,23 +237,61 @@ def resolve_limit(self) -> Optional[int]:

def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeFilter]:
event_time_filter = None
sample_mode = bool(
os.environ.get("DBT_EXPERIMENTAL_SAMPLE_MODE")
and getattr(self.config.args, "sample", False)
and getattr(self.config.args, "sample_window", None)
)

# TODO The number of branches here is getting rough. We should consider ways to simplify
# what is going on to make it easier to maintain

# Only do event time filtering if the base node has the necessary event time configs
if (
(isinstance(target.config, NodeConfig) or isinstance(target.config, SourceConfig))
and target.config.event_time
and isinstance(self.model, ModelNode)
and self.model.config.materialized == "incremental"
and self.model.config.incremental_strategy == "microbatch"
and self.manifest.use_microbatch_batches(project_name=self.config.project_name)
and self.model.batch is not None
):
start = self.model.batch.event_time_start
end = self.model.batch.event_time_end

if start is not None or end is not None:
# Handling of microbatch models
if (
self.model.config.materialized == "incremental"
and self.model.config.incremental_strategy == "microbatch"
and self.manifest.use_microbatch_batches(project_name=self.config.project_name)
and self.model.batch is not None
):
# Sample mode microbatch models
if sample_mode:
start = (
self.config.args.sample_window.start
if self.config.args.sample_window.start > self.model.batch.event_time_start
else self.model.batch.event_time_start
)
end = (
self.config.args.sample_window.end
if self.config.args.sample_window.end < self.model.batch.event_time_end
else self.model.batch.event_time_end
)
event_time_filter = EventTimeFilter(
field_name=target.config.event_time,
start=start,
end=end,
)

# Regular microbatch models
else:
event_time_filter = EventTimeFilter(
field_name=target.config.event_time,
start=self.model.batch.event_time_start,
end=self.model.batch.event_time_end,
)

# Sample mode _non_ microbatch models
elif sample_mode:
event_time_filter = EventTimeFilter(
field_name=target.config.event_time,
start=start,
end=end,
start=self.config.args.sample_window.start,
end=self.config.args.sample_window.end,
)

return event_time_filter
Expand Down
40 changes: 40 additions & 0 deletions core/dbt/event_time/event_time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from datetime import datetime

from dateutil.relativedelta import relativedelta

from dbt.artifacts.resources.types import BatchSize
from dbt_common.exceptions import DbtRuntimeError


def offset_timestamp(timestamp=datetime, batch_size=BatchSize, offset=int) -> datetime:
"""Offsets the passed in timestamp based on the batch_size and offset.
Note: THIS IS DIFFERENT FROM MicrobatchBuilder.offset_timestamp. That function first
`truncates` the timestamp, and then does delta addition subtraction from there. This
function _doesn't_ truncate the timestamp and uses `relativedelta` for specific edge
case handling (months, years), which may produce different results than the delta math
done in `MicrobatchBuilder.offset_timestamp`
Examples
2024-09-17 16:06:00 + Batchsize.hour -1 -> 2024-09-17 15:06:00
2024-09-17 16:06:00 + Batchsize.hour +1 -> 2024-09-17 17:06:00
2024-09-17 16:06:00 + Batchsize.day -1 -> 2024-09-16 16:06:00
2024-09-17 16:06:00 + Batchsize.day +1 -> 2024-09-18 16:06:00
2024-09-17 16:06:00 + Batchsize.month -1 -> 2024-08-17 16:06:00
2024-09-17 16:06:00 + Batchsize.month +1 -> 2024-10-17 16:06:00
2024-09-17 16:06:00 + Batchsize.year -1 -> 2023-09-17 16:06:00
2024-09-17 16:06:00 + Batchsize.year +1 -> 2025-09-17 16:06:00
2024-01-31 16:06:00 + Batchsize.month +1 -> 2024-02-29 16:06:00
2024-02-29 16:06:00 + Batchsize.year +1 -> 2025-02-28 16:06:00
"""

if batch_size == BatchSize.hour:
return timestamp + relativedelta(hours=offset)
elif batch_size == BatchSize.day:
return timestamp + relativedelta(days=offset)
elif batch_size == BatchSize.month:
return timestamp + relativedelta(months=offset)
elif batch_size == BatchSize.year:
return timestamp + relativedelta(years=offset)
else:
raise DbtRuntimeError(f"Unhandled batch_size '{batch_size}'")
60 changes: 60 additions & 0 deletions core/dbt/event_time/sample_window.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from __future__ import annotations

from datetime import datetime

import pytz
from attr import dataclass

from dbt.artifacts.resources.types import BatchSize
from dbt.event_time.event_time import offset_timestamp
from dbt_common.dataclass_schema import dbtClassMixin
from dbt_common.exceptions import DbtRuntimeError


@dataclass
class SampleWindow(dbtClassMixin):
start: datetime
end: datetime

def __post_serialize__(self, data, context):
# This is insane, but necessary, I apologize. Mashumaro handles the
# dictification of this class via a compile time generated `to_dict`
# method based off of the _typing_ of th class. By default `datetime`
# types are converted to strings. We don't want that, we want them to
# stay datetimes.
# Note: This is safe because the `SampleWindow` isn't part of the artifact
# and thus doesn't get written out.
new_data = super().__post_serialize__(data, context)
new_data["start"] = self.start
new_data["end"] = self.end
return new_data

@classmethod
def from_relative_string(cls, relative_string: str) -> SampleWindow:
end = datetime.now(tz=pytz.UTC)

relative_window = relative_string.split(" ")
if len(relative_window) != 2:
raise DbtRuntimeError(
f"Cannot load SAMPLE_WINDOW from '{relative_string}'. Must be of form 'DAYS_INT GRAIN_SIZE'."
)

try:
lookback = int(relative_window[0])
except Exception:
raise DbtRuntimeError(f"Unable to convert '{relative_window[0]}' to an integer.")

try:
batch_size_string = relative_window[1].lower().rstrip("s")
batch_size = BatchSize[batch_size_string]
except Exception:
grains = [size.value for size in BatchSize]
grain_plurals = [BatchSize.plural(size) for size in BatchSize]
valid_grains = grains + grain_plurals
raise DbtRuntimeError(
f"Invalid grain size '{relative_window[1]}'. Must be one of {valid_grains}."
)

start = offset_timestamp(timestamp=end, batch_size=batch_size, offset=-1 * lookback)

return cls(start=start, end=end)
17 changes: 15 additions & 2 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import functools
import os
import threading
import time
from copy import deepcopy
Expand Down Expand Up @@ -556,11 +557,23 @@ def _execute_microbatch_materialization(
context: Dict[str, Any],
materialization_macro: MacroProtocol,
) -> RunResult:
# TODO: This method has gotten a little large. It may be time to break it up into more manageable parts.
event_time_start = getattr(self.config.args, "EVENT_TIME_START", None)
event_time_end = getattr(self.config.args, "EVENT_TIME_END", None)

if (
os.environ.get("DBT_EXPERIMENTAL_SAMPLE_MODE")
and getattr(self.config.args, "SAMPLE", None)
and getattr(self.config.args, "SAMPLE_WINDOW", None)
):
event_time_start = self.config.args.sample_window.start
event_time_end = self.config.args.sample_window.end

microbatch_builder = MicrobatchBuilder(
model=model,
is_incremental=self._is_incremental(model),
event_time_start=getattr(self.config.args, "EVENT_TIME_START", None),
event_time_end=getattr(self.config.args, "EVENT_TIME_END", None),
event_time_start=event_time_start,
event_time_end=event_time_end,
default_end_time=self.config.invoked_at,
)

Expand Down
1 change: 1 addition & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ types-PyYAML
types-Jinja2
types-mock
types-protobuf>=5.0,<6.0
types-python-dateutil
types-pytz
types-requests
types-setuptools
Expand Down
Loading

0 comments on commit 5f873da

Please sign in to comment.