From 847375ca88629c8d9615c4f52b8e63663724d216 Mon Sep 17 00:00:00 2001 From: Nabin Mulepati Date: Mon, 30 Mar 2026 14:19:43 -0600 Subject: [PATCH 1/5] plan: add skip_when for conditional column generation (#479) Adds implementation plan for a `skip_when` field on `SingleColumnConfig` that enables conditional column generation. When the Jinja2 expression evaluates truthy, the cell is set to None and the generator is skipped. Skips auto-propagate through the DAG to downstream columns. Co-Authored-By: Claude Opus 4.6 (1M context) --- plans/479/skip-when-conditional-generation.md | 239 ++++++++++++++++++ 1 file changed, 239 insertions(+) create mode 100644 plans/479/skip-when-conditional-generation.md diff --git a/plans/479/skip-when-conditional-generation.md b/plans/479/skip-when-conditional-generation.md new file mode 100644 index 00000000..43859ea9 --- /dev/null +++ b/plans/479/skip-when-conditional-generation.md @@ -0,0 +1,239 @@ +--- +date: 2026-03-30 +authors: + - nmulepati +issue: https://github.com/NVIDIA-NeMo/DataDesigner/issues/479 +--- + +# Plan: `skip_when` — Conditional Column Generation + +## Problem + +DataDesigner's DAG executes every column for every row unconditionally. In multi-stage synthesis pipelines, expensive downstream generation (LLM calls, segmentation, etc.) runs even when an earlier gate column indicates the row should be filtered out. + +Today the only workarounds are: + +1. **Generate all columns unconditionally and post-filter** — wastes LLM calls on rows that will be discarded. +2. **Split into multiple `DataDesigner.create()` calls** with intermediate filtering — loses single-pipeline ergonomics and forces the user to manage seed-dataset hand-offs. + +### Motivating Example: HopChain Pipeline + +The [HopChain paper](https://arxiv.org/abs/2603.17024) synthesizes multi-hop VLM reasoning data through a pipeline where each stage gates the next: + +``` +image → complexity_score → [if score >= 6] → categories → segmentation → query_gen + [if score < 6] → skip rest +``` + +With DataDesigner today, `categories`, `segmentation`, and `query_gen` all execute even for images that score below the threshold — burning VLM inference budget on rows that will be discarded. + +## Proposed Solution + +Add a `skip_when` field to `SingleColumnConfig`. When its Jinja2 expression evaluates truthy for a row, the cell is set to `None` and the generator is never called. Skips auto-propagate through the DAG: downstream columns whose `required_columns` include a skipped cell also skip automatically. + +```python +config_builder.add_column( + name="complexity_score", column_type="llm-structured", ... +) +config_builder.add_column( + name="categories", + column_type="llm-structured", + skip_when="{{ complexity_score.overall_complexity_score < 6 }}", + ... +) +# Everything downstream of categories auto-skips — no extra config needed +config_builder.add_column(name="instances", column_type="segmentation", ...) +config_builder.add_column(name="multi_hop_query", column_type="llm-structured", ...) +``` + +Skipped rows stay in the output (row count is preserved). Skipped cells contain `None`. + +## Design Decisions + +| Decision | Choice | Rationale | +|---|---|---| +| Where does `skip_when` live? | `SingleColumnConfig` base class | Cross-cutting; applies to all column types | +| What happens to skipped cells? | Set to `None`, row stays in output | Rows are not dropped — users can post-filter or inspect | +| Do downstream columns auto-skip? | Yes, always | If upstream data is missing, generating downstream is wasteful and error-prone | +| How are `skip_when` columns ordered in the DAG? | `skip_when_columns` extracted from the expression become DAG edges | Ensures the gate column is generated before the guarded column | +| How does this interact with `_records_to_drop`? | Independently — skip does not drop rows | Skip produces `None`; drop removes the row entirely | + +--- + +## Implementation + +### 1. Config: `SingleColumnConfig` — add field + property + +**File:** `packages/data-designer-config/src/data_designer/config/base.py` + +Add to `SingleColumnConfig` (after `allow_resize`): + +```python +skip_when: str | None = Field( + default=None, + description="Jinja2 expression; when truthy, skip generation for this row (cell set to None).", +) +``` + +Add a `@field_validator("skip_when")` that validates Jinja2 syntax. **Critical constraint:** `base.py` line 4 prohibits `data_designer.*` imports, so use `jinja2` directly: + +```python +@field_validator("skip_when") +@classmethod +def _validate_skip_when(cls, v: str | None) -> str | None: + if v is not None: + from jinja2.sandbox import ImmutableSandboxedEnvironment + ImmutableSandboxedEnvironment().parse(v) + return v +``` + +Add a concrete property `skip_when_columns` (not abstract — base provides default): + +```python +@property +def skip_when_columns(self) -> list[str]: + if self.skip_when is None: + return [] + from jinja2 import meta + from jinja2.sandbox import ImmutableSandboxedEnvironment + env = ImmutableSandboxedEnvironment() + ast = env.parse(self.skip_when) + return list(meta.find_undeclared_variables(ast)) +``` + +### 2. DAG: add `skip_when_columns` edges + +#### 2a. `dag.py` — `topologically_sort_column_configs()` + +**File:** `packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dag.py` + +After the `for req_col_name in col.required_columns:` block (line 35-47), add a matching block for `col.skip_when_columns` that adds edges using the same pattern (direct column match + side-effect resolution). + +#### 2b. `execution_graph.py` — `ExecutionGraph.create()` + +**File:** `packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py` + +In the second pass (line 78-88), after the `for req in sub.required_columns:` edge loop, add: + +```python +for skip_col in sub.skip_when_columns: + resolved = graph.resolve_side_effect(skip_col) + if resolved not in known_columns: + continue # seed/sampler columns not in graph + if resolved == name: + continue + graph.add_edge(upstream=resolved, downstream=name) +``` + +Store skip metadata on the graph for runtime access: + +- Add `_skip_when: dict[str, str]` to `__init__` +- Populate during first pass: `graph._skip_when[name] = sub.skip_when` (when not None) +- Add accessor: `get_skip_when(column) -> str | None` + +### 3. New utility: `skip_evaluator.py` + +**New file:** `packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/skip_evaluator.py` + +Two pure functions, no engine state dependencies: + +```python +def evaluate_skip_when(expression: str, record: dict) -> bool: + """Render expression against record; return True if truthy.""" + +def should_skip_by_propagation( + required_columns: list[str], + skipped_columns_for_row: set[str], +) -> bool: + """Return True if any required column was skipped.""" +``` + +`evaluate_skip_when` wraps expression in `{{ }}`, renders via `ImmutableSandboxedEnvironment`, checks truthiness (result not in `("", "false", "0", "none", "null")`). + +`should_skip_by_propagation` returns `True` if the intersection of `required_columns` and `skipped_columns_for_row` is non-empty. + +### 4. Sync engine: `DatasetBuilder` + +**File:** `packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py` + +#### 4a. Add state +- `self._skipped_cells: dict[int, set[str]] = {}` — buffer index to skipped column names +- Clear at start of `_run_batch()` (line ~428) + +#### 4b. Add helper methods +- `_should_skip_cell(config, record_index, record) -> bool` — checks propagation (any upstream skipped?) then evaluates `skip_when` expression +- `_mark_cell_skipped(record_index, column_name, side_effect_columns, record)` — records skip, writes `None` to buffer + +#### 4c. Modify `_fan_out_with_threads()` (line 638) +Before `executor.submit()`, check `_should_skip_cell()`. If skip: write `None` for column + side effects, record success on progress tracker, `continue`. + +#### 4d. Modify `_fan_out_with_async()` (line 621) +Convert list comprehension to explicit loop with same skip check. + +#### 4e. Modify `_run_full_column_generator()` (line 503) +After `generator.generate()` returns, iterate records. For each row where `_should_skip_cell()` is true, overwrite that column + side effects with `None` and record in `_skipped_cells`. Replace buffer with updated records. + +### 5. Async engine: `AsyncTaskScheduler` + +#### 5a. `CompletionTracker` — add skip tracking + +**File:** `packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/completion_tracker.py` + +- Add `_skipped: dict[int, dict[int, set[str]]]` (rg -> ri -> column names) +- `mark_cell_skipped(column, row_group, row_index)` +- `get_skipped_columns_for_row(row_group, row_index) -> set[str]` + +#### 5b. Modify `_run_cell()` (line 767 of `async_scheduler.py`) + +After the `is_dropped` guard (line 772), add skip evaluation: + +1. Get `skipped_cols` from tracker for this row +2. Check `should_skip_by_propagation` using `config.required_columns` and `skipped_cols` +3. If not propagation-skipped, check `evaluate_skip_when` using `self._graph.get_skip_when(task.column)` against `row_data` +4. If skip: write `None` to buffer for all output cols, call `tracker.mark_cell_skipped()`, return `None` + +The caller (`_execute_task_inner_impl`) still marks the task complete — skipped cells ARE complete (they produced `None`). Downstream tasks get unblocked and will themselves check propagation. + +#### 5c. Modify `_run_batch()` (line 792 of `async_scheduler.py`) + +After generation, iterate rows. For skipped rows, overwrite with `None` and mark in tracker. Same pattern as sync path Step 4e. + +### 6. Expression generator: defensive `None` guard + +**File:** `packages/data-designer-engine/src/data_designer/engine/column_generators/generators/expression.py` + +In `generate()` (line 24), inside the per-record loop: if any `required_columns` value is `None`, set `record[self.config.name] = None` instead of rendering the Jinja2 template (which would crash on `None` arithmetic like `{{ price * 1.1 }}`). + +### 7. Validation: check `skip_when` references + +**File:** `packages/data-designer-engine/src/data_designer/engine/validation.py` + +- Add `SKIP_WHEN_REFERENCE_MISSING` to `ViolationType` enum +- Add `validate_skip_when_references(columns, allowed_references)` — iterates columns with `skip_when`, checks each `skip_when_columns` entry exists in `allowed_references` +- Wire into `validate_data_designer_config()` + +--- + +## Files Modified + +| File | Change | +|---|---| +| `config/base.py` | `skip_when` field + validator + `skip_when_columns` property | +| `engine/.../dag.py` | Add `skip_when_columns` edges in topological sort | +| `engine/.../execution_graph.py` | Add `skip_when_columns` edges + skip metadata storage + accessor | +| `engine/.../skip_evaluator.py` | **NEW** — `evaluate_skip_when()`, `should_skip_by_propagation()` | +| `engine/.../dataset_builder.py` | `_skipped_cells` state, `_should_skip_cell()`, modify 3 fan-out/generation methods | +| `engine/.../async_scheduler.py` | Skip checks in `_run_cell()` and `_run_batch()` | +| `engine/.../completion_tracker.py` | `_skipped` dict + `mark_cell_skipped` + `get_skipped_columns_for_row` | +| `engine/.../expression.py` | Defensive `None` guard when upstream is null | +| `engine/validation.py` | `validate_skip_when_references()` | + +--- + +## Verification + +1. **Unit tests:** Config field defaults, Jinja2 validation, `skip_when_columns` extraction, DAG edge creation, skip evaluator truthiness, CompletionTracker skip tracking +2. **Integration tests (sync):** Column with `skip_when` produces `None` for matching rows; downstream auto-skips; row count preserved (no drops) +3. **Integration tests (async):** Same scenarios under `DATA_DESIGNER_ASYNC_ENGINE=1` +4. **Validation tests:** Unknown column in `skip_when` produces ERROR violation +5. **Run:** `make check-all-fix` + `make test` + `make update-license-headers` From 3f415c7147b94deab5665232a5c23c0f494e7a25 Mon Sep 17 00:00:00 2001 From: Nabin Mulepati Date: Mon, 30 Mar 2026 14:21:01 -0600 Subject: [PATCH 2/5] plan: remove HopChain example from skip_when plan Co-Authored-By: Claude Opus 4.6 (1M context) --- plans/479/skip-when-conditional-generation.md | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/plans/479/skip-when-conditional-generation.md b/plans/479/skip-when-conditional-generation.md index 43859ea9..58a623ab 100644 --- a/plans/479/skip-when-conditional-generation.md +++ b/plans/479/skip-when-conditional-generation.md @@ -16,17 +16,6 @@ Today the only workarounds are: 1. **Generate all columns unconditionally and post-filter** — wastes LLM calls on rows that will be discarded. 2. **Split into multiple `DataDesigner.create()` calls** with intermediate filtering — loses single-pipeline ergonomics and forces the user to manage seed-dataset hand-offs. -### Motivating Example: HopChain Pipeline - -The [HopChain paper](https://arxiv.org/abs/2603.17024) synthesizes multi-hop VLM reasoning data through a pipeline where each stage gates the next: - -``` -image → complexity_score → [if score >= 6] → categories → segmentation → query_gen - [if score < 6] → skip rest -``` - -With DataDesigner today, `categories`, `segmentation`, and `query_gen` all execute even for images that score below the threshold — burning VLM inference budget on rows that will be discarded. - ## Proposed Solution Add a `skip_when` field to `SingleColumnConfig`. When its Jinja2 expression evaluates truthy for a row, the cell is set to `None` and the generator is never called. Skips auto-propagate through the DAG: downstream columns whose `required_columns` include a skipped cell also skip automatically. From 6ee4c70c828be732f525e69ba500c14219900ae1 Mon Sep 17 00:00:00 2001 From: Nabin Mulepati Date: Mon, 30 Mar 2026 14:21:39 -0600 Subject: [PATCH 3/5] plan: replace HopChain example with generic product review example Co-Authored-By: Claude Opus 4.6 (1M context) --- plans/479/skip-when-conditional-generation.md | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/plans/479/skip-when-conditional-generation.md b/plans/479/skip-when-conditional-generation.md index 58a623ab..53aead93 100644 --- a/plans/479/skip-when-conditional-generation.md +++ b/plans/479/skip-when-conditional-generation.md @@ -20,19 +20,26 @@ Today the only workarounds are: Add a `skip_when` field to `SingleColumnConfig`. When its Jinja2 expression evaluates truthy for a row, the cell is set to `None` and the generator is never called. Skips auto-propagate through the DAG: downstream columns whose `required_columns` include a skipped cell also skip automatically. +Example: a pipeline that generates product reviews only for items in stock. The `sentiment_analysis` and `review` columns are expensive LLM calls that should be skipped for out-of-stock items: + ```python config_builder.add_column( - name="complexity_score", column_type="llm-structured", ... + name="in_stock", column_type="sampler", + sampler_type="bernoulli", params=BernoulliSamplerParams(p=0.7), ) config_builder.add_column( - name="categories", + name="sentiment_analysis", column_type="llm-structured", - skip_when="{{ complexity_score.overall_complexity_score < 6 }}", + skip_when="{{ in_stock == 0 }}", + prompt="Analyze the sentiment of reviews for {{ product_name }}...", ... ) -# Everything downstream of categories auto-skips — no extra config needed -config_builder.add_column(name="instances", column_type="segmentation", ...) -config_builder.add_column(name="multi_hop_query", column_type="llm-structured", ...) +# review depends on sentiment_analysis — auto-skips when sentiment_analysis is skipped +config_builder.add_column( + name="review", + column_type="llm-text", + prompt="Write a {{ sentiment_analysis.tone }} review for {{ product_name }}...", +) ``` Skipped rows stay in the output (row count is preserved). Skipped cells contain `None`. From 14ab39b863e738c58af0e4bc4aa06e9df81de6e2 Mon Sep 17 00:00:00 2001 From: Nabin Mulepati Date: Mon, 30 Mar 2026 14:25:45 -0600 Subject: [PATCH 4/5] plan: add open questions on skip sentinel value and row filtering Co-Authored-By: Claude Opus 4.6 (1M context) --- plans/479/skip-when-conditional-generation.md | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/plans/479/skip-when-conditional-generation.md b/plans/479/skip-when-conditional-generation.md index 53aead93..553d5039 100644 --- a/plans/479/skip-when-conditional-generation.md +++ b/plans/479/skip-when-conditional-generation.md @@ -226,6 +226,31 @@ In `generate()` (line 24), inside the per-record loop: if any `required_columns` --- +## Open Questions + +### 1. What value should skipped cells contain? + +The current plan sets skipped cells to `None` (which becomes `NaN`/`pd.NA` in the DataFrame). Alternatives: + +- **`None`** — simple, standard pandas null. Downside: indistinguishable from a legitimate `None` produced by a generator (e.g., an LLM that returns no output). +- **Sentinel value** (e.g., `SKIPPED = "__SKIPPED__"` or a dedicated `SkippedValue` type) — distinguishable from real nulls. Downside: leaks into user-facing DataFrames unless stripped at output time; complicates type handling. +- **`pd.NA` with metadata** — store skip status in a sidecar structure (the `_skipped_cells` / `CompletionTracker._skipped` dicts already track this) and write `None` to the cell. Users who need to distinguish skip-null from real-null can inspect the metadata. + +Recommendation: Use `None` in the cell, track skip provenance in engine-internal state. If users need to distinguish, expose a `results.load_skip_mask()` or similar accessor. + +### 2. Should there be an option to auto-remove skipped rows from the final output? + +Many pipelines want to discard rows where a gate column failed — they don't need the skipped rows in the output at all. Options: + +- **Post-hoc filtering by the user** — `df = df.dropna(subset=["categories"])`. Simple but manual. +- **`drop_skipped_rows` option on `DataDesigner.create()`** — automatically remove any row where at least one column was skipped before writing to disk. Clean UX but may surprise users who want to inspect skipped rows. +- **A built-in `DropSkippedRowsProcessorConfig` processor** — runs as a post-generation processor that removes rows with any skipped cells. Fits the existing processor model and is opt-in. +- **`drop_when_skipped=True` on individual column configs** — drop the row if *this specific column* was skipped. More granular than a global flag. + +Recommendation: Start with a `DropSkippedRowsProcessorConfig` processor — it's opt-in, composable with other processors, and doesn't require new parameters on `create()` or column configs. + +--- + ## Verification 1. **Unit tests:** Config field defaults, Jinja2 validation, `skip_when_columns` extraction, DAG edge creation, skip evaluator truthiness, CompletionTracker skip tracking From 351bc29d9fc27fa5f7cd5c62b8ebd71e83dc5217 Mon Sep 17 00:00:00 2001 From: Nabin Mulepati Date: Wed, 1 Apr 2026 15:03:29 -0600 Subject: [PATCH 5/5] =?UTF-8?q?plan:=20major=20revision=20=E2=80=94=20Skip?= =?UTF-8?q?Config=20model,=20sync=20engine=20support,=20decouple=20propaga?= =?UTF-8?q?tion?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Introduce SkipConfig(when, value) as nested model on SingleColumnConfig - Move propagate_skip to SingleColumnConfig as independent field, fixing bug where columns with no SkipConfig couldn't participate in propagation - Add full sync engine implementation (Steps 4a-4d) covering both _fan_out_with_threads and _run_full_column_generator dispatch paths - Add serialization boundary stripping for both DatasetBatchManager (sync) and RowGroupBufferManager (async) - Simplify architecture diagrams for readability - Update all references, design decisions, verification plan Made-with: Cursor --- plans/479/skip-when-conditional-generation.md | 564 ++++++++++++++---- 1 file changed, 451 insertions(+), 113 deletions(-) diff --git a/plans/479/skip-when-conditional-generation.md b/plans/479/skip-when-conditional-generation.md index 553d5039..fcfa5a3a 100644 --- a/plans/479/skip-when-conditional-generation.md +++ b/plans/479/skip-when-conditional-generation.md @@ -18,83 +18,205 @@ Today the only workarounds are: ## Proposed Solution -Add a `skip_when` field to `SingleColumnConfig`. When its Jinja2 expression evaluates truthy for a row, the cell is set to `None` and the generator is never called. Skips auto-propagate through the DAG: downstream columns whose `required_columns` include a skipped cell also skip automatically. +Add a `SkipConfig` model and an optional `skip` field on `SingleColumnConfig`. When the `skip.when` Jinja2 expression evaluates truthy for a row, the cell is set to `skip.value` (default `None`) and the generator is never called. + +Independently, a `propagate_skip` field on `SingleColumnConfig` (default `True`) controls whether a column auto-skips when any of its upstream dependencies were skipped. This is a separate concern from expression gating: a column with no `SkipConfig` at all will still auto-skip if an upstream was skipped, unless it opts out with `propagate_skip=False`. Example: a pipeline that generates product reviews only for items in stock. The `sentiment_analysis` and `review` columns are expensive LLM calls that should be skipped for out-of-stock items: ```python config_builder.add_column( - name="in_stock", column_type="sampler", - sampler_type="bernoulli", params=BernoulliSamplerParams(p=0.7), + dd.SamplerColumnConfig( + name="in_stock", + sampler_type="bernoulli", + params=BernoulliSamplerParams(p=0.7), + ) ) config_builder.add_column( - name="sentiment_analysis", - column_type="llm-structured", - skip_when="{{ in_stock == 0 }}", - prompt="Analyze the sentiment of reviews for {{ product_name }}...", - ... + dd.LLMStructuredColumnConfig( + name="sentiment_analysis", + skip=dd.SkipConfig(when="{{ in_stock == 0 }}"), + prompt="Analyze the sentiment of reviews for {{ product_name }}...", + ... + ) ) -# review depends on sentiment_analysis — auto-skips when sentiment_analysis is skipped +# review depends on sentiment_analysis via its prompt template. +# propagate_skip=True (the default) means it auto-skips when sentiment_analysis is skipped. +# No SkipConfig needed — propagation is independent of expression gating. config_builder.add_column( - name="review", - column_type="llm-text", - prompt="Write a {{ sentiment_analysis.tone }} review for {{ product_name }}...", + dd.LLMTextColumnConfig( + name="review", + prompt="Write a {{ sentiment_analysis.tone }} review for {{ product_name }}...", + ) ) ``` -Skipped rows stay in the output (row count is preserved). Skipped cells contain `None`. +Skipped rows stay in the output (row count is preserved). Skipped cells contain `skip.value` (default `None`). ## Design Decisions | Decision | Choice | Rationale | |---|---|---| -| Where does `skip_when` live? | `SingleColumnConfig` base class | Cross-cutting; applies to all column types | -| What happens to skipped cells? | Set to `None`, row stays in output | Rows are not dropped — users can post-filter or inspect | -| Do downstream columns auto-skip? | Yes, always | If upstream data is missing, generating downstream is wasteful and error-prone | -| How are `skip_when` columns ordered in the DAG? | `skip_when_columns` extracted from the expression become DAG edges | Ensures the gate column is generated before the guarded column | -| How does this interact with `_records_to_drop`? | Independently — skip does not drop rows | Skip produces `None`; drop removes the row entirely | +| Where does skip config live? | Nested `SkipConfig` model on `SingleColumnConfig` via `skip: SkipConfig \| None = None`, **validated to reject sampler/seed types** | Groups the expression gate (`when`) and its fill value (`value`) into a self-contained model. Sampler/seed columns are collapsed into shared multi-column generators at compile time — no per-row dispatch point to skip individual columns. Scope v1 to generated single-column configs only. | +| What happens to skipped cells? | Set to `skip.value` (default `None`), row stays in output | Rows are not dropped — users can post-filter or inspect. `skip.value` is configurable per-column to handle dtype constraints (e.g., `value=0` for numeric columns, `value=""` for string columns). | +| Do downstream columns auto-skip? | Yes by default via `propagate_skip=True` on `SingleColumnConfig`, opt-out with `propagate_skip=False` | Propagation is **independent of `SkipConfig`** — a column with no expression gate still auto-skips when an upstream was skipped. This ensures columns that depend on a gated column don't silently receive null inputs. Templates like `{{ 'unknown' if country is none else country\|upper }}` handle missing data fine and can opt out with `propagate_skip=False`. Setting `propagate_skip=False` suppresses *all* upstream skip signals — including future #362 runtime failures — not just expression-gated skips. | +| How are skip columns ordered in the DAG? | `skip.columns` (parsed from `skip.when`) become DAG edges | Ensures the gate column is generated before the guarded column | +| How does this interact with `_records_to_drop`? | Independently — skip does not drop rows | Skip produces `skip.value`; drop removes the row entirely | +| How does this interact with `allow_resize`? | **Blocked for v1** — validation rejects `skip` + `allow_resize` on the same column | `allow_resize` changes the buffer size during generation (1:N or N:1 patterns), which invalidates index-based skip tracking. Blocking the combination avoids complex index remapping. If needed later, the two features can be composed by running resize after skip provenance is finalized. | +| What Jinja2 expression format does `skip.when` use? | Stored value **includes** `{{ }}` delimiters (e.g., `when="{{ in_stock == 0 }}"`) | Aligns with the rest of the codebase (prompts, expressions). The evaluator renders the stored value directly — it does **not** wrap it. `SkipConfig.columns` parses the stored value as-is, which correctly extracts undeclared variables from `{{ }}` expressions. | + +--- + +## Architecture + +### System overview + +The feature touches three layers. Each box below is a file; arrows show data/control flow. + +```mermaid +flowchart TD + subgraph config ["1 - Config (data-designer-config)"] + base_py["base.py\n(SkipConfig + SingleColumnConfig)"] + end + + subgraph build ["2 - Build (data-designer-engine)"] + dag_py["dag.py"] + exec_graph["execution_graph.py"] + val_py["validation.py"] + end + + subgraph runtime ["3 - Runtime (both engines)"] + skip_eval["skip_evaluator.py (NEW)"] + builder["dataset_builder.py\n(sync)"] + sched["async_scheduler.py\n(async)"] + batch_buf["dataset_batch_manager.py\n(sync buffer)"] + rg_buf["row_group_buffer.py\n(async buffer)"] + end + + base_py -->|"SkipConfig.columns\n(DAG edges)"| dag_py + base_py -->|"SkipConfig.columns\n(reference check)"| val_py + dag_py --> exec_graph + base_py -->|"SkipConfig"| exec_graph + exec_graph -->|"get_skip_config()"| builder + exec_graph -->|"get_skip_config()"| sched + skip_eval --> builder + skip_eval --> sched + builder -->|"record.__skipped__\nrecord[col] = skip.value"| batch_buf + sched -->|"record.__skipped__\nrecord[col] = skip.value"| rg_buf + batch_buf -->|"strips __skipped__"| output["Parquet output"] + rg_buf -->|"strips __skipped__"| output +``` + +### Per-cell skip evaluation (both engines) + +```mermaid +flowchart TD + A["evaluate cell (col, row)"] --> B{dropped?} + B -->|yes| C["return None"] + B -->|no| D["read row_data from buffer"] + D --> E{"propagate_skip=True?\n(SingleColumnConfig field)"} + E -->|yes| E2{upstream skipped?} + E2 -->|yes| H + E2 -->|no| F + E -->|no| F{SkipConfig\npresent?} + F -->|no| G["generate(row_data)"] + F -->|yes| I["evaluate_skip_when()"] + I --> J{truthy?} + J -->|no| G + J -->|yes| H["SKIP:\nrecord.__skipped__.add(col)\nrecord[col] = skip value\nrecord[se_col] = None"] + H --> K["return None\ncaller marks complete"] + G --> L["write result to buffer"] +``` --- ## Implementation -### 1. Config: `SingleColumnConfig` — add field + property +### 1. Config: `SingleColumnConfig` — add fields + property **File:** `packages/data-designer-config/src/data_designer/config/base.py` -Add to `SingleColumnConfig` (after `allow_resize`): +Add a `SkipConfig` model to `base.py` (alongside `ConfigBase`). This groups the three skip-related fields into a self-contained unit rather than spreading them across `SingleColumnConfig`: ```python -skip_when: str | None = Field( - default=None, - description="Jinja2 expression; when truthy, skip generation for this row (cell set to None).", -) +class SkipConfig(ConfigBase): + """Expression gate for conditional column generation. + + Attach to a ``SingleColumnConfig`` via ``skip=SkipConfig(...)`` to gate + generation on a Jinja2 expression. Controls *when* to skip; propagation + of upstream skips is controlled separately by ``propagate_skip`` on + ``SingleColumnConfig``. + """ + + when: str = Field( + description="Jinja2 expression (including {{ }} delimiters); " + "when truthy, skip generation for this row.", + ) + value: bool | int | float | str | None = Field( + default=None, + description="Value to write for skipped cells. " + "Defaults to None (becomes NaN/pd.NA in DataFrame).", + ) + + @field_validator("when") + @classmethod + def _validate_when_syntax(cls, v: str) -> str: + from jinja2.sandbox import ImmutableSandboxedEnvironment + ImmutableSandboxedEnvironment().parse(v) + return v + + @cached_property + def columns(self) -> list[str]: + """Column names referenced in the ``when`` expression. + + Parsed once from the Jinja2 AST and cached. Used by the DAG builder + to add dependency edges and by the execution graph to store metadata. + """ + from jinja2 import meta + from jinja2.sandbox import ImmutableSandboxedEnvironment + env = ImmutableSandboxedEnvironment() + ast = env.parse(self.when) + return list(meta.find_undeclared_variables(ast)) ``` -Add a `@field_validator("skip_when")` that validates Jinja2 syntax. **Critical constraint:** `base.py` line 4 prohibits `data_designer.*` imports, so use `jinja2` directly: +`ConfigBase` is not frozen (`model_config` has no `frozen=True`), so `cached_property` works directly — Pydantic will not include it in `model_fields`, serialization, or `__repr__`, which is correct for a derived property. + +Add the fields to `SingleColumnConfig` (after `allow_resize`): ```python -@field_validator("skip_when") -@classmethod -def _validate_skip_when(cls, v: str | None) -> str | None: - if v is not None: - from jinja2.sandbox import ImmutableSandboxedEnvironment - ImmutableSandboxedEnvironment().parse(v) - return v +skip: SkipConfig | None = None +propagate_skip: bool = Field( + default=True, + description="If True (default), this column auto-skips when any " + "of its required_columns was skipped. Independent of skip — " + "a column with no SkipConfig still propagates upstream skips. " + "Set to False for null-tolerant columns.", +) ``` -Add a concrete property `skip_when_columns` (not abstract — base provides default): +Add a `@model_validator(mode="after")` on `SingleColumnConfig` to reject `skip` on sampler/seed types, block `skip` + `allow_resize`, and reject self-referencing expressions. **Critical constraint:** `base.py` line 4 prohibits `data_designer.*` imports, so the validator uses only Pydantic/stdlib: ```python -@property -def skip_when_columns(self) -> list[str]: - if self.skip_when is None: - return [] - from jinja2 import meta - from jinja2.sandbox import ImmutableSandboxedEnvironment - env = ImmutableSandboxedEnvironment() - ast = env.parse(self.skip_when) - return list(meta.find_undeclared_variables(ast)) +@model_validator(mode="after") +def _validate_skip_scope(self) -> Self: + if self.skip is not None: + if self.column_type in ("sampler", "seed-dataset"): + raise ValueError( + f"skip is not supported on {self.column_type} columns. " + "Sampler/seed columns are collapsed into shared multi-column generators " + "and cannot be skipped individually." + ) + if self.allow_resize: + raise ValueError( + "skip and allow_resize cannot be used together. " + "The async engine (required for skip) does not support allow_resize." + ) + if self.name in self.skip.columns: + raise ValueError( + f"skip.when expression for column '{self.name}' references itself. " + "A column cannot gate its own generation." + ) + return self ``` ### 2. DAG: add `skip_when_columns` edges @@ -103,7 +225,7 @@ def skip_when_columns(self) -> list[str]: **File:** `packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dag.py` -After the `for req_col_name in col.required_columns:` block (line 35-47), add a matching block for `col.skip_when_columns` that adds edges using the same pattern (direct column match + side-effect resolution). +After the `for req_col_name in col.required_columns:` block (line 35-47), add a matching block for `col.skip.columns` (guarded by `if col.skip is not None`) that adds edges using the same pattern (direct column match + side-effect resolution). #### 2b. `execution_graph.py` — `ExecutionGraph.create()` @@ -112,101 +234,307 @@ After the `for req_col_name in col.required_columns:` block (line 35-47), add a In the second pass (line 78-88), after the `for req in sub.required_columns:` edge loop, add: ```python -for skip_col in sub.skip_when_columns: - resolved = graph.resolve_side_effect(skip_col) - if resolved not in known_columns: - continue # seed/sampler columns not in graph - if resolved == name: - continue - graph.add_edge(upstream=resolved, downstream=name) +if sub.skip is not None: + for skip_col in sub.skip.columns: + resolved = graph.resolve_side_effect(skip_col) + if resolved not in known_columns: + raise ValueError( + f"Column '{name}' skip.when references '{skip_col}' " + f"(resolved to '{resolved}') which is not a known producer." + ) + if resolved == name: + continue + graph.add_edge(upstream=resolved, downstream=name) ``` +This exactly matches the existing `required_columns` pattern (line 82-88) — `ExecutionGraph` only sees columns that participate in the DAG, and sampler/seed columns inside `MultiColumnConfig` wrappers are already flattened into `known_columns` during the first pass. No special-casing is needed. + Store skip metadata on the graph for runtime access: -- Add `_skip_when: dict[str, str]` to `__init__` -- Populate during first pass: `graph._skip_when[name] = sub.skip_when` (when not None) -- Add accessor: `get_skip_when(column) -> str | None` +- Add `_skip_configs: dict[str, SkipConfig]` to `__init__` +- Add `_propagate_skip: dict[str, bool]` to `__init__` +- Populate during first pass: `if sub.skip is not None: graph._skip_configs[name] = sub.skip` and `graph._propagate_skip[name] = sub.propagate_skip` (for all columns, not just those with `SkipConfig`) +- Add accessors: `get_skip_config(column) -> SkipConfig | None`, `should_propagate_skip(column) -> bool` (defaults to `True` if column not in dict) ### 3. New utility: `skip_evaluator.py` **New file:** `packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/skip_evaluator.py` -Two pure functions, no engine state dependencies: +Two pure functions and one environment class, no engine state dependencies: ```python +from jinja2.nativetypes import NativeEnvironment +from jinja2.sandbox import SandboxedEnvironment + +from data_designer.engine.processing.utils import deserialize_json_values + + +class NativeSandboxedEnvironment(SandboxedEnvironment, NativeEnvironment): + """Sandboxed environment that returns native Python types instead of strings. + + Uses StrictUndefined so that references to missing variables raise + UndefinedError instead of silently returning a truthy Undefined object + (which would cause every row to be skipped on a typo). + """ + pass + + def evaluate_skip_when(expression: str, record: dict) -> bool: - """Render expression against record; return True if truthy.""" + """Render expression against deserialized record; return True if result is truthy.""" def should_skip_by_propagation( required_columns: list[str], skipped_columns_for_row: set[str], + propagate_skip: bool = True, ) -> bool: - """Return True if any required column was skipped.""" + """Return True if propagation is enabled and any required column was skipped.""" ``` -`evaluate_skip_when` wraps expression in `{{ }}`, renders via `ImmutableSandboxedEnvironment`, checks truthiness (result not in `("", "false", "0", "none", "null")`). +`evaluate_skip_when` implementation: +1. **Deserialize the record first** via `deserialize_json_values(record)` — this ensures the skip expression sees the same Python objects (dicts, lists) that generators see when rendering their own Jinja2 templates. Without this, a JSON string field like `'{"key": "val"}'` would be a raw string in the skip expression but a dict in the generator, causing inconsistent behavior. +2. **Render the stored expression directly** (no wrapping in `{{ }}`). The stored value already includes Jinja2 delimiters (e.g., `"{{ in_stock == 0 }}"`), so rendering it as-is produces the evaluated result. This is consistent with `skip_when_columns` which also parses the stored value directly. +3. **Use `NativeSandboxedEnvironment`** (combining `SandboxedEnvironment` + `NativeEnvironment` from `jinja2.nativetypes`). This returns native Python objects (`True`, `False`, `None`, `0`) instead of their string representations (`"True"`, `"False"`, `"None"`, `"0"`). This eliminates the string-truthiness bug entirely — Python's native `bool()` handles `False`, `None`, `0`, `""` correctly without needing a hand-rolled falsy string set. +4. **Check truthiness** via `bool(result)` on the native Python return value. + +```python +_env = NativeSandboxedEnvironment(undefined=StrictUndefined) -`should_skip_by_propagation` returns `True` if the intersection of `required_columns` and `skipped_columns_for_row` is non-empty. +@lru_cache(maxsize=64) +def _compile_skip_template(expression: str) -> Template: + return _env.from_string(expression) + +def evaluate_skip_when(expression: str, record: dict) -> bool: + template = _compile_skip_template(expression) + deserialized = deserialize_json_values(record) + result = template.render(deserialized) + return bool(result) +``` + +The module-level `_env` singleton and `lru_cache` on `_compile_skip_template` avoid re-creating the environment and re-compiling the Jinja2 AST on every call. For a 100k-row dataset with 5 skip-guarded columns, this reduces 500k template compilations to at most 5. + +`should_skip_by_propagation` returns `True` only if `propagate_skip` is `True` AND the intersection of `required_columns` and `skipped_columns_for_row` is non-empty. When `propagate_skip=False`, the column handles null inputs on its own (e.g., expression columns with Jinja2 fallback logic like `{{ 'unknown' if country is none else country|upper }}`). + +**Optimization:** `required_columns` is a `list[str]`, so `set(required_columns) & skipped_columns_for_row` creates a new set on every call. Use `not skipped_columns_for_row.isdisjoint(required_columns)` instead — this short-circuits on the first match and avoids the set construction. Alternatively, cache `frozenset(required_columns)` on the graph's skip metadata during `ExecutionGraph.create()`. ### 4. Sync engine: `DatasetBuilder` **File:** `packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py` -#### 4a. Add state -- `self._skipped_cells: dict[int, set[str]] = {}` — buffer index to skipped column names -- Clear at start of `_run_batch()` (line ~428) +Skip evaluation in the sync engine must be wired into two dispatch paths: `_fan_out_with_threads` (CELL_BY_CELL generators) and `_run_full_column_generator` (FULL_COLUMN generators). `_fan_out_with_async` is only reached when `DATA_DESIGNER_ASYNC_ENGINE=1`, which routes to the async scheduler (Step 5), so it does not need sync skip logic. + +#### 4a. Helper: `_should_skip_cell()` + +Add a private method on `DatasetBuilder` that centralizes the skip decision for one cell. Propagation and expression gating are evaluated independently: + +```python +def _should_skip_cell( + self, column_name: str, record: dict +) -> bool: + skipped_cols: set[str] = record.get("__skipped__", set()) + + # 1. Propagation — independent of SkipConfig + propagate = self._graph.should_propagate_skip(column_name) + if propagate: + required = self._graph.get_required_columns(column_name) + if should_skip_by_propagation(required, skipped_cols, propagate): + return True + + # 2. Expression gate — only if SkipConfig exists + skip_config = self._graph.get_skip_config(column_name) + if skip_config is not None: + return evaluate_skip_when(skip_config.when, record) + + return False +``` + +#### 4b. Helper: `_write_skip_to_record()` + +When a cell is skipped, write provenance and the skip value into the record in-place. The skip value comes from `SkipConfig.value` if the column has one, otherwise `None` (propagation-only skips always use `None`): + +```python +def _write_skip_to_record( + self, column_name: str, record: dict +) -> None: + skip_config = self._graph.get_skip_config(column_name) + skip_value = skip_config.value if skip_config is not None else None + record.setdefault("__skipped__", set()).add(column_name) + record[column_name] = skip_value + for se_col in self._graph.get_side_effect_columns(column_name): + record[se_col] = None +``` + +#### 4c. Modify `_fan_out_with_threads()` (line 631) + +In the `for i, record in self.batch_manager.iter_current_batch()` loop (line 638), add a skip check **before** `executor.submit()`. If the cell should be skipped, call `_write_skip_to_record()` and `batch_manager.update_record(i, record)` directly — no work is submitted to the thread pool. Record a success on the progress tracker so the progress bar stays accurate: + +```python +for i, record in self.batch_manager.iter_current_batch(): + if self._should_skip_cell(generator.config.name, record): + self._write_skip_to_record(generator.config.name, record) + self.batch_manager.update_record(i, record) + progress_tracker.record_success() + continue + executor.submit( + lambda record: generator.generate(record), + record, + context={"index": i, "column_name": generator.config.name}, + ) +``` + +Skipped cells never enter the thread pool, so `_records_to_drop` / `_finalize_fan_out` / `_cell_resize_results` are unaffected — the skip path writes directly to the buffer and moves on. -#### 4b. Add helper methods -- `_should_skip_cell(config, record_index, record) -> bool` — checks propagation (any upstream skipped?) then evaluates `skip_when` expression -- `_mark_cell_skipped(record_index, column_name, side_effect_columns, record)` — records skip, writes `None` to buffer +#### 4d. Modify `_run_full_column_generator()` (line 503) -#### 4c. Modify `_fan_out_with_threads()` (line 638) -Before `executor.submit()`, check `_should_skip_cell()`. If skip: write `None` for column + side effects, record success on progress tracker, `continue`. +FULL_COLUMN generators receive the entire batch as a DataFrame. Pre-filter skipped rows out, run the generator on the remaining rows, then merge results back: + +```python +def _run_full_column_generator(self, generator: ColumnGenerator) -> None: + column_name = generator.config.name + original_count = self.batch_manager.num_records_in_buffer + + # Pre-filter: evaluate skip for each row, write skip provenance + skip_indices: set[int] = set() + for i, record in self.batch_manager.iter_current_batch(): + if self._should_skip_cell(column_name, record): + self._write_skip_to_record(column_name, record) + self.batch_manager.update_record(i, record) + skip_indices.add(i) + + # Build DataFrame excluding skipped rows + batch = self.batch_manager.get_current_batch(as_dataframe=False) + active_records = [r for i, r in enumerate(batch) if i not in skip_indices] + + if active_records: + active_df = lazy.pd.DataFrame(active_records) + result_df = generator.generate(active_df) + result_records = result_df.to_dict(orient="records") + + # Merge results back at non-skipped indices + result_iter = iter(result_records) + merged = [] + for i, record in enumerate(batch): + if i in skip_indices: + merged.append(record) + else: + merged.append(next(result_iter)) + batch = merged + + allow_resize = getattr(generator.config, "allow_resize", False) + self._log_resize_if_changed( + self._column_display_name(generator.config), + original_count, len(batch), allow_resize, + ) + self.batch_manager.replace_buffer(batch, allow_resize=allow_resize) +``` -#### 4d. Modify `_fan_out_with_async()` (line 621) -Convert list comprehension to explicit loop with same skip check. +The merge-back loop preserves row order: skipped rows keep their skip provenance and `skip.value`; non-skipped rows get the generator's output. The generator only sees the active (non-skipped) DataFrame, so it produces exactly `len(active_records)` results. -#### 4e. Modify `_run_full_column_generator()` (line 503) -After `generator.generate()` returns, iterate records. For each row where `_should_skip_cell()` is true, overwrite that column + side effects with `None` and record in `_skipped_cells`. Replace buffer with updated records. +**Interaction with `allow_resize`:** The `@model_validator` on `SingleColumnConfig` already rejects `skip` + `allow_resize` on the same column (Step 1). This means `skip_indices` will always be empty for generators with `allow_resize=True`, so the pre-filter is a no-op and the existing resize logic is unaffected. ### 5. Async engine: `AsyncTaskScheduler` -#### 5a. `CompletionTracker` — add skip tracking +#### 5a. Skip provenance: record-inline `__skipped__` -**File:** `packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/completion_tracker.py` +Skip provenance is stored directly in each record dict under a hidden `__skipped__` key (a `set[str]` of skipped column names). When a cell is skipped, the column name is added to the record's `__skipped__` set: -- Add `_skipped: dict[int, dict[int, set[str]]]` (rg -> ri -> column names) -- `mark_cell_skipped(column, row_group, row_index)` -- `get_skipped_columns_for_row(row_group, row_index) -> set[str]` +```python +record.setdefault("__skipped__", set()).add(column_name) +``` + +The `__skipped__` set travels with the record through the buffer — no separate tracking state is needed on `CompletionTracker` or elsewhere. The async engine reads skip state from the buffer via `buffer_manager.get_row(rg, ri).get("__skipped__", set())`. + +The `__skipped__` key is stripped at the serialization boundary before converting records to DataFrames (see Step 8). #### 5b. Modify `_run_cell()` (line 767 of `async_scheduler.py`) After the `is_dropped` guard (line 772), add skip evaluation: -1. Get `skipped_cols` from tracker for this row -2. Check `should_skip_by_propagation` using `config.required_columns` and `skipped_cols` -3. If not propagation-skipped, check `evaluate_skip_when` using `self._graph.get_skip_when(task.column)` against `row_data` -4. If skip: write `None` to buffer for all output cols, call `tracker.mark_cell_skipped()`, return `None` +1. Get `skipped_cols` from `row_data.get("__skipped__", set())` — the row data is already read from the buffer at line 777, so no tracker query is needed. +2. Check propagation first (independent of `SkipConfig`): `should_skip_by_propagation(config.required_columns, skipped_cols, self._graph.should_propagate_skip(task.column))`. +3. If not propagation-skipped, get `skip_config = self._graph.get_skip_config(task.column)`. If not None, check `evaluate_skip_when(skip_config.when, row_data)`. +4. If skip (by either path), write to the buffer record via `buffer_manager.get_row(rg, ri)`: + - `record.setdefault("__skipped__", set()).add(task.column)` — records skip provenance. + - `record[task.column] = skip_config.value if skip_config else None` — the **column key must be present** in the record dict, not absent. Downstream `skip.when` expressions and Jinja2 templates may reference skipped columns (e.g., `{{ col is none }}`); an absent key would cause `UndefinedError`. Propagation-only skips (no `SkipConfig`) use `None`. + - `record[se_col] = None` for each side-effect column — side-effect columns (`__trace`, `__reasoning_content`, etc.) are always written as `None`, regardless of the parent column's `skip.value`. A skipped cell has no trace or reasoning. + - Return `None`. + +The caller (`_execute_task_inner_impl`) still marks the task complete — skipped cells ARE complete (they produced a value). Downstream tasks get unblocked and will themselves check propagation (respecting their own `propagate_skip` setting). Note: `_execute_task_inner_impl` also calls `_check_error_rate(success=True)` and `_reporter.record_success()` — skipped cells count as successes in metrics. This is acceptable for v1 (a skip is a successful outcome, not a failure), but consider adding a separate skip counter to the reporter for observability. -The caller (`_execute_task_inner_impl`) still marks the task complete — skipped cells ARE complete (they produced `None`). Downstream tasks get unblocked and will themselves check propagation. +**Error handling:** If `evaluate_skip_when` raises an exception (e.g., `UndefinedError` from `StrictUndefined`, or `SecurityError` from the sandbox), treat it as a non-retryable cell failure — log a warning, skip the cell (write `skip.value`), and continue. Do not crash the batch. This matches the "fail-safe" behavior: if the skip expression can't be evaluated, it's safer to skip the row (avoiding an expensive LLM call on a row with unknown filter status) than to run the generator. #### 5c. Modify `_run_batch()` (line 792 of `async_scheduler.py`) -After generation, iterate rows. For skipped rows, overwrite with `None` and mark in tracker. Same pattern as sync path Step 4e. +Pre-filter the DataFrame to exclude skipped rows before calling `generator.agenerate()`, then merge results back. Specific adjustments to the existing code: + +1. After computing `pre_dropped` (line 799), build `pre_skipped: set[int]` by evaluating `_should_skip_cell()` for each non-dropped row. Write `skip_config.value` and `__skipped__` provenance into the buffer record for each skipped row. +2. Filter `batch_df` to exclude both dropped and skipped rows before calling `generator.agenerate()`. +3. **Adjust the row-count assertion** (line 811): change `active_rows = rg_size - len(pre_dropped)` to `active_rows = rg_size - len(pre_dropped) - len(pre_skipped)`. The generator only receives non-dropped, non-skipped rows, so the result must match that count. +4. **Adjust the merge-back loop** (lines 816-825): skip both `pre_dropped` and `pre_skipped` indices when writing results back from `result_df`. -### 6. Expression generator: defensive `None` guard +### 6. Expression generator: no changes needed **File:** `packages/data-designer-engine/src/data_designer/engine/column_generators/generators/expression.py` -In `generate()` (line 24), inside the per-record loop: if any `required_columns` value is `None`, set `record[self.config.name] = None` instead of rendering the Jinja2 template (which would crash on `None` arithmetic like `{{ price * 1.1 }}`). +Expression columns are `FULL_COLUMN` generators. The pre-filtering in Step 5c already handles both `skip.when` evaluation and propagation before the generator receives the DataFrame — skipped rows are excluded from `batch_df` and never reach `ExpressionColumnGenerator.agenerate()`. The generator does not need its own skip guard; the engine handles it at the dispatch layer. -### 7. Validation: check `skip_when` references +This avoids the design problem of threading engine-internal skip state into the generator, which would break the `ColumnGeneratorFullColumn` interface (generators receive only a `pd.DataFrame` and have no reference to engine state). + +### 7. Validation: check `skip` references **File:** `packages/data-designer-engine/src/data_designer/engine/validation.py` -- Add `SKIP_WHEN_REFERENCE_MISSING` to `ViolationType` enum -- Add `validate_skip_when_references(columns, allowed_references)` — iterates columns with `skip_when`, checks each `skip_when_columns` entry exists in `allowed_references` +- Add `SKIP_REFERENCE_MISSING` to `ViolationType` enum +- Add `validate_skip_references(columns, allowed_references)` — iterates columns with `skip is not None`, checks each `skip.columns` entry exists in `allowed_references` +- **Severity: ERROR** (matching how missing `required_columns` references are treated). A misspelled column name like `skip=SkipConfig(when="{{ in_stokc == 0 }}")` must be caught at validation time, not silently ignored at runtime. Validation runs before execution, so this is the primary defense against typos. - Wire into `validate_data_designer_config()` +- Add `SKIP_ON_SAMPLER_SEED` violation for `skip` on sampler/seed column types (belt-and-suspenders with the model validator in Step 1) +- Add `SKIP_WITH_ALLOW_RESIZE` violation for `skip` + `allow_resize` on the same column + +### 8. Serialization boundary: strip `__skipped__` before DataFrame conversion + +The `__skipped__` key is a `set` object stored in the record dict. `pd.DataFrame(records)` would serialize it into a column, which is incorrect. Strip `__skipped__` at every point where records are converted to DataFrames — in both engines. + +**Async — `RowGroupBufferManager`** + +**File:** `packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/row_group_buffer.py` + +In `get_dataframe()` (line 68-72), filter out `__skipped__`: + +```python +def get_dataframe(self, row_group: int) -> pd.DataFrame: + dropped = self._dropped.get(row_group, set()) + rows = [ + {k: v for k, v in row.items() if k != "__skipped__"} + for i, row in enumerate(self._buffers[row_group]) + if i not in dropped + ] + return lazy.pd.DataFrame(rows) +``` + +**Sync — `DatasetBatchManager`** + +**File:** `packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dataset_batch_manager.py` + +In `get_current_batch()` (line 134), strip `__skipped__` when returning a DataFrame: + +```python +def get_current_batch(self, *, as_dataframe: bool = False) -> pd.DataFrame | list[dict]: + if as_dataframe: + return lazy.pd.DataFrame( + [{k: v for k, v in row.items() if k != "__skipped__"} for row in self._buffer] + ) + return self._buffer +``` + +Also in `write()` (line 172), which converts `self._buffer` to a DataFrame for parquet serialization: + +```python +dataframe=lazy.pd.DataFrame( + [{k: v for k, v in row.items() if k != "__skipped__"} for row in self._buffer] +), +``` + +The `__skipped__` key remains in the raw record dicts (accessible via `iter_current_batch()`, etc.) for propagation checks during generation, but is excluded from any DataFrame output, parquet serialization, or processor input. --- @@ -214,47 +542,57 @@ In `generate()` (line 24), inside the per-record loop: if any `required_columns` | File | Change | |---|---| -| `config/base.py` | `skip_when` field + validator + `skip_when_columns` property | -| `engine/.../dag.py` | Add `skip_when_columns` edges in topological sort | -| `engine/.../execution_graph.py` | Add `skip_when_columns` edges + skip metadata storage + accessor | -| `engine/.../skip_evaluator.py` | **NEW** — `evaluate_skip_when()`, `should_skip_by_propagation()` | -| `engine/.../dataset_builder.py` | `_skipped_cells` state, `_should_skip_cell()`, modify 3 fan-out/generation methods | -| `engine/.../async_scheduler.py` | Skip checks in `_run_cell()` and `_run_batch()` | -| `engine/.../completion_tracker.py` | `_skipped` dict + `mark_cell_skipped` + `get_skipped_columns_for_row` | -| `engine/.../expression.py` | Defensive `None` guard when upstream is null | -| `engine/validation.py` | `validate_skip_when_references()` | +| `config/base.py` | **NEW** `SkipConfig` model (`when`, `value` fields + `@field_validator` for Jinja2 syntax + `columns` cached property). `SingleColumnConfig` gets `skip: SkipConfig \| None = None` + `propagate_skip: bool = True` fields + `@model_validator` (sampler/seed rejection, `allow_resize` rejection, self-reference rejection) | +| `engine/.../dag.py` | Add `skip.columns` edges in topological sort (guarded by `if col.skip is not None`) | +| `engine/.../execution_graph.py` | Add `skip.columns` edges (matching existing `required_columns` pattern) + `_skip_configs: dict[str, SkipConfig]` + `_propagate_skip: dict[str, bool]` storage + accessors | +| `engine/.../skip_evaluator.py` | **NEW** — `NativeSandboxedEnvironment`, `_compile_skip_template()` (cached), `evaluate_skip_when()`, `should_skip_by_propagation()` | +| `engine/.../dataset_builder.py` | `_should_skip_cell()` + `_write_skip_to_record()` helpers; skip pre-check in `_fan_out_with_threads()`; pre-filter + merge-back in `_run_full_column_generator()` | +| `engine/.../async_scheduler.py` | Skip checks in `_run_cell()` and `_run_batch()` with FULL_COLUMN pre-filtering + adjusted row-count assertion (reads `__skipped__` from buffer records, `propagate_skip` + `SkipConfig` from graph) | +| `engine/.../dataset_batch_manager.py` | Strip `__skipped__` key in `get_current_batch(as_dataframe=True)` and `write()` | +| `engine/.../row_group_buffer.py` | Strip `__skipped__` key in `get_dataframe()` | +| `engine/.../expression.py` | No changes — skip handling is done at the engine dispatch layer (Step 5c / Step 4d pre-filtering) | +| `engine/validation.py` | `validate_skip_references()` (ERROR level) + sampler/seed + allow_resize checks | --- -## Open Questions +## Resolved Questions ### 1. What value should skipped cells contain? -The current plan sets skipped cells to `None` (which becomes `NaN`/`pd.NA` in the DataFrame). Alternatives: - -- **`None`** — simple, standard pandas null. Downside: indistinguishable from a legitimate `None` produced by a generator (e.g., an LLM that returns no output). -- **Sentinel value** (e.g., `SKIPPED = "__SKIPPED__"` or a dedicated `SkippedValue` type) — distinguishable from real nulls. Downside: leaks into user-facing DataFrames unless stripped at output time; complicates type handling. -- **`pd.NA` with metadata** — store skip status in a sidecar structure (the `_skipped_cells` / `CompletionTracker._skipped` dicts already track this) and write `None` to the cell. Users who need to distinguish skip-null from real-null can inspect the metadata. +**Resolution:** Use a configurable `value` field (default `None`, typed as `bool | int | float | str | None`) on `SkipConfig`. `bool` is listed first to ensure Pydantic matches `True`/`False` as `bool` rather than `int` (since `bool` is a subclass of `int`). Constrained to JSON-serializable scalars so configs can be checkpointed and logged without serialization errors. -Recommendation: Use `None` in the cell, track skip provenance in engine-internal state. If users need to distinguish, expose a `results.load_skip_mask()` or similar accessor. +- Default `None` works for most cases (becomes `NaN`/`pd.NA` in the DataFrame). +- Users can set `value=0` for numeric columns, `value=""` for string columns, etc., avoiding dtype issues. +- Skip provenance is tracked inline in the record dict (`__skipped__` key). If users need to distinguish skip-null from real-null, the `DropSkippedRowsProcessorConfig` processor can read `record.get("__skipped__")` directly. ### 2. Should there be an option to auto-remove skipped rows from the final output? -Many pipelines want to discard rows where a gate column failed — they don't need the skipped rows in the output at all. Options: +**Resolution (unchanged):** Start with a `DropSkippedRowsProcessorConfig` processor — it's opt-in, composable with other processors, and doesn't require new parameters on `create()` or column configs. + +With record-inline provenance (`__skipped__` key in each record dict), the processor can simply check `record.get("__skipped__")` to determine which rows have skipped columns. No `__skip_mask` column or sidecar file is needed — the provenance is already in the record. The `__skipped__` key is stripped at the serialization boundary (Step 8) so it does not leak into the final DataFrame or parquet output. + +## Open Questions + +### 1. Shared propagation path with #362 + +[Issue #362](https://github.com/NVIDIA-NeMo/DataDesigner/issues/362) (keep failed-parse fields as null) needs a similar "cell became unavailable, propagate through the DAG" mechanism. `skip_when` is intentional/config-driven; #362 is runtime failure. Both need: +- A way to mark a cell as unavailable +- Downstream propagation through the DAG +- Tracking of which cells were affected -- **Post-hoc filtering by the user** — `df = df.dropna(subset=["categories"])`. Simple but manual. -- **`drop_skipped_rows` option on `DataDesigner.create()`** — automatically remove any row where at least one column was skipped before writing to disk. Clean UX but may surprise users who want to inspect skipped rows. -- **A built-in `DropSkippedRowsProcessorConfig` processor** — runs as a post-generation processor that removes rows with any skipped cells. Fits the existing processor model and is opt-in. -- **`drop_when_skipped=True` on individual column configs** — drop the row if *this specific column* was skipped. More granular than a global flag. +The record-inline `__skipped__` infrastructure designed here could serve both use cases — #362 could write to the same `__skipped__` set (or a parallel `__failed__` set) in the record dict, and downstream propagation would work identically. Consider designing one shared propagation path instead of building two separate cascades. Not blocking for this implementation, but worth keeping in mind before the implementation PRs land to avoid a second refactor. -Recommendation: Start with a `DropSkippedRowsProcessorConfig` processor — it's opt-in, composable with other processors, and doesn't require new parameters on `create()` or column configs. +The shared propagation path needs to exist in both `DatasetBuilder` (sync — `_should_skip_cell()` / `_fan_out_with_threads()` / `_run_full_column_generator()`) and `AsyncTaskScheduler` (async — `_run_cell()` / `_run_batch()`). The `skip_evaluator.py` utility module is engine-agnostic and already serves both. --- ## Verification -1. **Unit tests:** Config field defaults, Jinja2 validation, `skip_when_columns` extraction, DAG edge creation, skip evaluator truthiness, CompletionTracker skip tracking -2. **Integration tests (sync):** Column with `skip_when` produces `None` for matching rows; downstream auto-skips; row count preserved (no drops) -3. **Integration tests (async):** Same scenarios under `DATA_DESIGNER_ASYNC_ENGINE=1` -4. **Validation tests:** Unknown column in `skip_when` produces ERROR violation -5. **Run:** `make check-all-fix` + `make test` + `make update-license-headers` +1. **Unit tests — config:** `SkipConfig` field defaults (`value=None`), Jinja2 syntax validation on `when`, `columns` extraction (cached), `SingleColumnConfig.skip` defaults to `None`, `SingleColumnConfig.propagate_skip` defaults to `True`, rejection of `skip` on sampler/seed types, rejection of `skip` + `allow_resize`, rejection of self-referencing `skip.when` (column references itself) +2. **Unit tests — skip evaluator:** `NativeSandboxedEnvironment` returns native Python types; `evaluate_skip_when` with truthy/falsy expressions (including `False`/`None`/`0` — the case-sensitivity bug); `evaluate_skip_when` against deserialized JSON records; `should_skip_by_propagation` with `propagate_skip=True` and `propagate_skip=False`; `StrictUndefined` raises `UndefinedError` for missing variables (not silently truthy); error handling in `evaluate_skip_when` (graceful failure on sandbox violations) +3. **Unit tests — DAG/graph:** `skip.columns` become edges; unknown column in `skip.when` raises `ValueError` (same behavior as unknown `required_columns`); `skip.when` referencing sampler/seed columns (available via `MultiColumnConfig` flattening) resolves correctly +4. **Integration tests (sync):** Column with `skip` produces `skip.value` for matching rows in `_fan_out_with_threads`; FULL_COLUMN generators pre-filter and merge-back correctly in `_run_full_column_generator`; downstream with no `SkipConfig` auto-skips via `propagate_skip=True`; downstream with `propagate_skip=False` does NOT auto-skip; row count preserved; skipped cells never submitted to thread pool (verify generator not called for skipped rows) +5. **Integration tests (async):** Column with `skip` produces `skip.value` for matching rows; downstream with no `SkipConfig` auto-skips via `propagate_skip=True`; downstream with `propagate_skip=False` does NOT auto-skip; **chained propagation** (A skipped -> B auto-skips -> C auto-skips, where B and C have no `SkipConfig`) works transitively; row count preserved; FULL_COLUMN generators pre-filter (verify LLM calls not made for skipped rows); custom `skip.value` written correctly; propagation-only skips write `None` (not a configured value); side-effect columns get `None`; skip count is logged per-column +6. **Validation tests:** Unknown column in `skip.when` produces ERROR violation; sampler/seed + `skip` produces violation; `allow_resize` + `skip` produces violation +7. **Serialization tests:** Verify `__skipped__` is stripped from DataFrame output in both `RowGroupBufferManager.get_dataframe()` (async) and `DatasetBatchManager.get_current_batch(as_dataframe=True)` / `write()` (sync) — the key must not appear as a column in the resulting DataFrame +8. **Run:** `make check-all-fix` + `make test` + `make update-license-headers`