Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/tinybird_sdk/generator/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ def _generate_engine_config(engine: EngineConfig | None) -> str:
return get_engine_clause(engine)


def _generate_backfill(backfill: str | None) -> str | None:
if backfill == "skip":
return "BACKFILL skip"
return None


def _generate_kafka_config(kafka: Any) -> str:
lines = [
f"KAFKA_CONNECTION_NAME {kafka.connection._name}",
Expand Down Expand Up @@ -164,6 +170,10 @@ def generate_datasource(datasource: DatasourceDefinition) -> GeneratedDatasource
parts.append("")
parts.append(_generate_engine_config(datasource.options.engine))

backfill = _generate_backfill(datasource.options.backfill)
if backfill:
parts.append(backfill)

indexes = _generate_indexes(datasource.options.indexes)
if indexes:
parts.extend(["", indexes])
Expand Down
3 changes: 3 additions & 0 deletions src/tinybird_sdk/migrate/emit_ts.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ def _emit_datasource(ds: DatasourceModel) -> str:
if ds.engine:
lines.append(f" 'engine': {_emit_engine_options(ds.engine)},")

if ds.backfill:
lines.append(f" 'backfill': {_escape_string(ds.backfill)},")

if ds.indexes:
lines.append(" 'indexes': [")
for index in ds.indexes:
Expand Down
12 changes: 12 additions & 0 deletions src/tinybird_sdk/migrate/parse_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ def parse_datasource_file(resource: ResourceFile) -> DatasourceModel:
shared_with: list[str] = []
description: str | None = None
forward_query: str | None = None
backfill: str | None = None

engine_type: str | None = None
sorting_key: list[str] = []
Expand Down Expand Up @@ -394,6 +395,16 @@ def parse_datasource_file(resource: ResourceFile) -> DatasourceModel:
import_schedule = parse_quoted_value(value)
elif key == "IMPORT_FROM_TIMESTAMP":
import_from_timestamp = parse_quoted_value(value)
elif key == "BACKFILL":
normalized = value.strip().lower()
if normalized != "skip":
raise MigrationParseError(
resource.file_path,
"datasource",
resource.name,
f'Invalid BACKFILL value: "{value}"',
)
backfill = "skip"
elif key == "TOKEN":
tokens.append(_parse_token(resource.file_path, resource.name, value))
else:
Expand Down Expand Up @@ -495,6 +506,7 @@ def parse_datasource_file(resource: ResourceFile) -> DatasourceModel:
)
if engine_type
else None,
backfill=backfill, # type: ignore[arg-type]
indexes=indexes,
kafka=kafka,
s3=imported,
Expand Down
1 change: 1 addition & 0 deletions src/tinybird_sdk/migrate/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class DatasourceModel:
columns: list[DatasourceColumnModel]
engine: DatasourceEngineModel | None = None
description: str | None = None
backfill: Literal["skip"] | None = None
indexes: list[DatasourceIndexModel] = field(default_factory=list)
kafka: DatasourceKafkaModel | None = None
s3: DatasourceS3Model | None = None
Expand Down
8 changes: 7 additions & 1 deletion src/tinybird_sdk/schema/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import re
from dataclasses import dataclass, field
from typing import Any
from typing import Any, Literal

from .connection import GCSConnectionDefinition, KafkaConnectionDefinition, S3ConnectionDefinition
from .engines import EngineConfig
Expand All @@ -19,6 +19,7 @@ class ColumnDefinition:


SchemaDefinition = dict[str, TypeValidator | ColumnDefinition]
BackfillOption = Literal["skip"]


@dataclass(frozen=True, slots=True)
Expand Down Expand Up @@ -74,6 +75,7 @@ class DatasourceOptions:
schema: SchemaDefinition
description: str | None = None
engine: EngineConfig | None = None
backfill: BackfillOption | None = None
tokens: tuple[TokenConfig, ...] = field(default_factory=tuple)
shared_with: tuple[str, ...] = field(default_factory=tuple)
json_paths: bool = True
Expand Down Expand Up @@ -120,6 +122,7 @@ def define_datasource(name: str, options: dict[str, Any] | DatasourceOptions) ->
description=options.get("description"),
schema=options["schema"],
engine=options.get("engine"),
backfill=options.get("backfill"),
tokens=tokens,
shared_with=shared_with,
json_paths=options.get("json_paths", True),
Expand All @@ -134,6 +137,9 @@ def define_datasource(name: str, options: dict[str, Any] | DatasourceOptions) ->
if ingestion_count > 1:
raise ValueError("Datasource can only define one ingestion option: `kafka`, `s3`, or `gcs`.")

if normalized.backfill not in {None, "skip"}:
raise ValueError('Invalid datasource backfill value: only "skip" is supported.')

for index in normalized.indexes:
if not index.name or any(char.isspace() for char in index.name):
raise ValueError(
Expand Down
22 changes: 22 additions & 0 deletions tests/test_phase1_schema_generator_parity.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,28 @@ def test_generate_datasource_includes_indexes_and_store_raw_value() -> None:
assert "KAFKA_STORE_RAW_VALUE True" in generated.content


def test_generate_datasource_includes_backfill_skip() -> None:
datasource = define_datasource(
"daily_page_visits",
{
"schema": {
"date": t.date(),
"page_url": t.string(),
"visits": t.uint64(),
},
"backfill": "skip",
},
)

generated = generate_datasource(datasource)
assert "BACKFILL skip" in generated.content


def test_define_datasource_rejects_unsupported_backfill_value() -> None:
with pytest.raises(ValueError, match="Invalid datasource backfill value"):
define_datasource("events", {"schema": {"id": t.int32()}, "backfill": "run"})


def test_define_datasource_validates_index_name_and_granularity() -> None:
with pytest.raises(ValueError, match="Invalid datasource index name"):
define_datasource(
Expand Down
37 changes: 37 additions & 0 deletions tests/test_phase3_migrate_parser_parity.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,43 @@ def test_parse_datasource_supports_import_directives() -> None:
assert parsed.s3.from_timestamp == "2025-01-01 00:00:00"


def test_parse_datasource_supports_backfill_skip() -> None:
parsed = parse_datasource_file(
_resource(
"datasource",
"daily_page_visits",
"\n".join(
[
"SCHEMA >",
" date Date",
" page_url String",
" visits UInt64",
"BACKFILL skip",
]
),
)
)

assert parsed.backfill == "skip"


def test_parse_datasource_rejects_unsupported_backfill_value() -> None:
with pytest.raises(MigrationParseError, match="Invalid BACKFILL value"):
parse_datasource_file(
_resource(
"datasource",
"events",
"\n".join(
[
"SCHEMA >",
" id Int64",
"BACKFILL run",
]
),
)
)


def test_parse_pipe_supports_param_options_and_placeholder_normalization() -> None:
parsed = parse_pipe_file(
_resource(
Expand Down
30 changes: 30 additions & 0 deletions tests/test_phase4_migrate_runner_emitter_parity.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def test_emit_migration_includes_phase4_connection_and_datasource_fields() -> No
ver="version",
is_deleted="is_deleted",
),
backfill="skip",
indexes=[
DatasourceIndexModel(
name="idx_id",
Expand All @@ -58,6 +59,7 @@ def test_emit_migration_includes_phase4_connection_and_datasource_fields() -> No

assert "'schema_registry_url': \"https://registry.example.com\"" in emitted
assert "'is_deleted': \"is_deleted\"" in emitted
assert "'backfill': \"skip\"" in emitted
assert "'indexes': [" in emitted
assert "'store_raw_value': True" in emitted

Expand Down Expand Up @@ -228,3 +230,31 @@ def test_run_migrate_emits_default_expr_for_sql_function_defaults(tmp_path: Path
assert result.output_content is not None
assert '\'id\': t.uuid().default_expr("generateUUIDv4()"),' in result.output_content
assert "'payload': t.string().default('{}')," in result.output_content


def test_run_migrate_emits_backfill_skip(tmp_path: Path) -> None:
(tmp_path / "daily_page_visits.datasource").write_text(
"\n".join(
[
"SCHEMA >",
" date Date",
" page_url String",
" visits UInt64",
"BACKFILL skip",
]
),
encoding="utf-8",
)

result = run_migrate(
{
"cwd": str(tmp_path),
"patterns": ["*.datasource"],
"dry_run": True,
}
)

assert result.success is True
assert result.errors == []
assert result.output_content is not None
assert "'backfill': \"skip\"" in result.output_content