Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from __future__ import annotations

import fnmatch
import json
import logging
from pathlib import Path
Expand Down Expand Up @@ -366,6 +367,9 @@ def add_processor(
) -> Self:
"""Add a processor to the current Data Designer configuration.

If a processor with the same name already exists, it is replaced (upsert),
making notebook cells safely re-runnable.

You can either provide a processor config object directly, or provide a processor type and
additional keyword arguments to construct the processor config object.

Expand All @@ -385,15 +389,49 @@ def add_processor(
)
processor_config = get_processor_config_from_kwargs(processor_type=processor_type, **kwargs)

self._remove_processor_by_name(processor_config.name)

# Checks elsewhere fail if DropColumnsProcessor drops a column but it is not marked for drop
if processor_config.processor_type == ProcessorType.DROP_COLUMNS:
for column in processor_config.column_names:
if column in self._column_configs:
self._column_configs[column].drop = True
for col in self._resolve_drop_column_names(processor_config.column_names):
self._column_configs[col].drop = True

self._processor_configs.append(processor_config)
return self

def _remove_processor_by_name(self, name: str) -> None:
"""Remove an existing processor by name and undo its side-effects."""
for existing in self._processor_configs:
if existing.name != name:
continue
if existing.processor_type == ProcessorType.DROP_COLUMNS:
other_dropped = {
col
for p in self._processor_configs
if p.name != name and p.processor_type == ProcessorType.DROP_COLUMNS
for col in self._resolve_drop_column_names(p.column_names)
}
for col in self._resolve_drop_column_names(existing.column_names):
if col not in other_dropped:
self._column_configs[col].drop = False
self._processor_configs.remove(existing)
return

def _resolve_drop_column_names(self, column_names: list[str]) -> list[str]:
"""Resolve column names, expanding glob patterns against known column configs."""
seen: set[str] = set()
resolved = []
for name in column_names:
if "*" in name:
for match in fnmatch.filter(self._column_configs.keys(), name):
if match not in seen:
seen.add(match)
resolved.append(match)
elif name in self._column_configs and name not in seen:
seen.add(name)
resolved.append(name)
return resolved

def add_profiler(self, profiler_config: ColumnProfilerConfigT) -> Self:
"""Add a profiler to the current Data Designer configuration.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
InvalidConfigError,
)
from data_designer.config.models import ChatCompletionInferenceParams, ModelConfig
from data_designer.config.processors import DropColumnsProcessorConfig, SchemaTransformProcessorConfig
from data_designer.config.sampler_constraints import ColumnInequalityConstraint, ScalarInequalityConstraint
from data_designer.config.sampler_params import SamplerType, UUIDSamplerParams
from data_designer.config.seed import SamplingStrategy
Expand Down Expand Up @@ -889,6 +890,96 @@ def test_cannot_write_config_with_dataframe_seed(stub_model_configs):
assert "DataFrame seed dataset" in str(excinfo.value)


class TestAddProcessorIdempotent:
"""Tests that add_processor replaces existing processors with the same name."""

@staticmethod
def _add_sampler(builder, name):
builder.add_column(SamplerColumnConfig(name=name, sampler_type="uuid", params=UUIDSamplerParams()))

def test_add_processor_replaces_existing_by_name(self, stub_empty_builder):
self._add_sampler(stub_empty_builder, "col_a")
self._add_sampler(stub_empty_builder, "col_b")

stub_empty_builder.add_processor(
DropColumnsProcessorConfig(name="cleanup", column_names=["col_a"]),
)
assert stub_empty_builder.get_column_config("col_a").drop is True

stub_empty_builder.add_processor(
DropColumnsProcessorConfig(name="cleanup", column_names=["col_b"]),
)
configs = stub_empty_builder.get_processor_configs()
assert len(configs) == 1
assert configs[0].column_names == ["col_b"]
assert stub_empty_builder.get_column_config("col_a").drop is False
assert stub_empty_builder.get_column_config("col_b").drop is True

def test_add_processor_different_names_appends(self, stub_empty_builder):
self._add_sampler(stub_empty_builder, "col_a")
stub_empty_builder.add_processor(
DropColumnsProcessorConfig(name="cleanup_1", column_names=["col_a"]),
)
stub_empty_builder.add_processor(
SchemaTransformProcessorConfig(name="transform_1", template={"x": "{{ col_a }}"}),
)
assert len(stub_empty_builder.get_processor_configs()) == 2

def test_add_processor_replaces_non_drop_processor(self, stub_empty_builder):
stub_empty_builder.add_processor(
SchemaTransformProcessorConfig(name="transform", template={"x": "old"}),
)
stub_empty_builder.add_processor(
SchemaTransformProcessorConfig(name="transform", template={"x": "new"}),
)
configs = stub_empty_builder.get_processor_configs()
assert len(configs) == 1
assert configs[0].template == {"x": "new"}

def test_add_processor_glob_marks_matching_columns_as_drop(self, stub_empty_builder):
self._add_sampler(stub_empty_builder, "col_a")
self._add_sampler(stub_empty_builder, "col_b")
self._add_sampler(stub_empty_builder, "other")

stub_empty_builder.add_processor(
DropColumnsProcessorConfig(name="cleanup", column_names=["col_*"]),
)
assert stub_empty_builder.get_column_config("col_a").drop is True
assert stub_empty_builder.get_column_config("col_b").drop is True
assert stub_empty_builder.get_column_config("other").drop is False

def test_add_processor_glob_revert_on_replace(self, stub_empty_builder):
self._add_sampler(stub_empty_builder, "col_a")
self._add_sampler(stub_empty_builder, "col_b")

stub_empty_builder.add_processor(
DropColumnsProcessorConfig(name="cleanup", column_names=["col_*"]),
)
assert stub_empty_builder.get_column_config("col_a").drop is True

stub_empty_builder.add_processor(
DropColumnsProcessorConfig(name="cleanup", column_names=["col_b"]),
)
assert stub_empty_builder.get_column_config("col_a").drop is False
assert stub_empty_builder.get_column_config("col_b").drop is True

def test_replace_preserves_drop_from_other_processor(self, stub_empty_builder):
self._add_sampler(stub_empty_builder, "col_a")
stub_empty_builder.add_processor(
DropColumnsProcessorConfig(name="drop1", column_names=["col_a"]),
)
stub_empty_builder.add_processor(
DropColumnsProcessorConfig(name="drop2", column_names=["col_a"]),
)
assert stub_empty_builder.get_column_config("col_a").drop is True

stub_empty_builder.add_processor(
DropColumnsProcessorConfig(name="drop1", column_names=[]),
)
assert stub_empty_builder.get_column_config("col_a").drop is True
assert len(stub_empty_builder.get_processor_configs()) == 2


class TestToolConfigDuplicateValidation:
"""Tests for duplicate tool name validation at config build time."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from __future__ import annotations

import logging
from fnmatch import fnmatch
from typing import TYPE_CHECKING

from data_designer.config.processors import DropColumnsProcessorConfig
Expand All @@ -20,24 +21,34 @@
class DropColumnsProcessor(Processor[DropColumnsProcessorConfig]):
"""Drops specified columns from the dataset after each batch."""

def _resolve_columns(self, available: pd.Index) -> list[str]:
"""Expand column_names entries (including glob patterns) against available columns."""
seen: set[str] = set()
resolved = []
for name in self.config.column_names:
if "*" in name:
for col in available:
if fnmatch(col, name) and col not in seen:
seen.add(col)
resolved.append(col)
elif name in available and name not in seen:
seen.add(name)
resolved.append(name)
elif name not in available:
logger.warning(f"⚠️ Cannot drop column: `{name}` not found in the dataset.")
return resolved

def process_after_batch(self, data: pd.DataFrame, *, current_batch_number: int | None) -> pd.DataFrame:
logger.info(f"πŸ™ˆ Dropping columns: {self.config.column_names}")
resolved = self._resolve_columns(data.columns)
if current_batch_number is not None:
self._save_dropped_columns(data, current_batch_number)
return self._drop_columns(data)

def _drop_columns(self, data: pd.DataFrame) -> pd.DataFrame:
for column in self.config.column_names:
if column in data.columns:
data.drop(columns=[column], inplace=True)
else:
logger.warning(f"⚠️ Cannot drop column: `{column}` not found in the dataset.")
self._save_dropped_columns(data, resolved, current_batch_number)
if resolved:
data.drop(columns=resolved, inplace=True)
return data

def _save_dropped_columns(self, data: pd.DataFrame, current_batch_number: int) -> None:
# Only save columns that actually exist
existing_columns = [col for col in self.config.column_names if col in data.columns]
if not existing_columns:
def _save_dropped_columns(self, data: pd.DataFrame, resolved: list[str], current_batch_number: int) -> None:
if not resolved:
return

logger.debug("πŸ“¦ Saving dropped columns to dropped-columns directory")
Expand All @@ -47,6 +58,6 @@ def _save_dropped_columns(self, data: pd.DataFrame, current_batch_number: int) -
).name
self.artifact_storage.write_parquet_file(
parquet_file_name=dropped_column_parquet_file_name,
dataframe=data[existing_columns],
dataframe=data[resolved],
batch_stage=BatchStage.DROPPED_COLUMNS,
)
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from __future__ import annotations

from enum import Enum
from fnmatch import fnmatch
from string import Formatter

from jinja2 import meta
Expand Down Expand Up @@ -271,25 +272,42 @@ def validate_columns_not_all_dropped(
return []


def _is_glob(pattern: str) -> bool:
return "*" in pattern


def validate_drop_columns_processor(
columns: list[ColumnConfigT],
processor_configs: list[ProcessorConfigT],
) -> list[Violation]:
all_column_names = {c.name for c in columns}
for col in columns:
all_column_names.update(col.side_effect_columns)
violations = []
for processor_config in processor_configs:
if processor_config.processor_type == ProcessorType.DROP_COLUMNS:
invalid_columns = set(processor_config.column_names) - all_column_names
if len(invalid_columns) > 0:
return [
if processor_config.processor_type != ProcessorType.DROP_COLUMNS:
continue
for name in processor_config.column_names:
if _is_glob(name):
if not any(fnmatch(col, name) for col in all_column_names):
violations.append(
Violation(
column=None,
type=ViolationType.INVALID_COLUMN,
message=f"Drop columns processor pattern '{name}' does not match any columns.",
level=ViolationLevel.WARNING,
)
)
elif name not in all_column_names:
violations.append(
Violation(
column=c,
column=name,
type=ViolationType.INVALID_COLUMN,
message=f"Drop columns processor is configured to drop column '{c!r}', but the column is not defined.",
message=f"Drop columns processor is configured to drop column '{name!r}', but the column is not defined.",
level=ViolationLevel.ERROR,
)
for c in invalid_columns
]
return []
)
return violations


def validate_schema_transform_processor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,24 @@ def stub_empty_dataframe():
},
None,
),
(
"drop_glob_pattern",
["col*"],
{"category": ["A", "B", "A", "B"], "other_col": [1, 2, 3, 4]},
None,
),
(
"drop_glob_no_match",
["zzz*"],
{
"col1": [1, 2, 3, 4],
"col2": ["a", "b", "c", "d"],
"col3": [True, False, True, False],
"category": ["A", "B", "A", "B"],
"other_col": [1, 2, 3, 4],
},
None,
),
],
)
def test_process_after_batch_scenarios(
Expand Down
54 changes: 54 additions & 0 deletions packages/data-designer-engine/tests/engine/test_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

from unittest.mock import Mock, patch

import pytest

from data_designer.config.column_configs import (
ExpressionColumnConfig,
LLMCodeColumnConfig,
Expand All @@ -27,6 +29,7 @@
validate_code_validation,
validate_columns_not_all_dropped,
validate_data_designer_config,
validate_drop_columns_processor,
validate_expression_references,
validate_prompt_templates,
validate_schema_transform_processor,
Expand Down Expand Up @@ -279,6 +282,57 @@ def test_validate_schema_transform_processor():
assert violations[0].level == ViolationLevel.ERROR


@pytest.mark.parametrize(
"extract_reasoning, expected_violations",
[
(True, 0),
(False, 1),
],
)
def test_validate_drop_columns_processor_reasoning_column(extract_reasoning, expected_violations):
columns = [
LLMTextColumnConfig(
name="answer",
prompt="Answer the question.",
model_alias=STUB_MODEL_ALIAS,
extract_reasoning_content=extract_reasoning,
),
]
processor_configs = [
DropColumnsProcessorConfig(
name="drop_reasoning",
column_names=["answer__reasoning_content"],
),
]
violations = validate_drop_columns_processor(columns, processor_configs)
assert len(violations) == expected_violations


@pytest.mark.parametrize(
"pattern, expected_violations, expected_level",
[
("*__reasoning_content", 0, None),
("zzz_*", 1, ViolationLevel.WARNING),
],
)
def test_validate_drop_columns_processor_glob(pattern, expected_violations, expected_level):
columns = [
LLMTextColumnConfig(
name="answer",
prompt="Answer the question.",
model_alias=STUB_MODEL_ALIAS,
extract_reasoning_content=True,
),
]
processor_configs = [
DropColumnsProcessorConfig(name="drop_glob", column_names=[pattern]),
]
violations = validate_drop_columns_processor(columns, processor_configs)
assert len(violations) == expected_violations
if expected_level:
assert violations[0].level == expected_level


@patch("data_designer.engine.validation.Console.print")
def test_rich_print_violations(mock_console_print):
rich_print_violations([])
Expand Down