From e27ee17a8c142909093eb89874f5203de5ce1c93 Mon Sep 17 00:00:00 2001 From: RomirJ Date: Thu, 11 Jun 2026 01:38:32 -0700 Subject: [PATCH] fix(curate): emit valid LeRobot v3 dataset layout Write consolidated v3 parquet shards, task parquet metadata, stats.json, and matching info.json path templates for the LeRobot v3 converter. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../curate/format_converters/lerobot_v3.py | 281 ++++++++++++++---- tests/test_curate_format_converters.py | 62 +++- 2 files changed, 276 insertions(+), 67 deletions(-) diff --git a/src/tether/curate/format_converters/lerobot_v3.py b/src/tether/curate/format_converters/lerobot_v3.py index 4562af9..ab841cc 100644 --- a/src/tether/curate/format_converters/lerobot_v3.py +++ b/src/tether/curate/format_converters/lerobot_v3.py @@ -8,12 +8,13 @@ ├── README.md auto-generated dataset card ├── meta/ │ ├── info.json codebase version + features schema - │ ├── tasks.jsonl {task_index, task} per line - │ └── episodes.jsonl {episode_index, tasks, length} + │ ├── stats.json global feature statistics + │ ├── tasks.parquet task index, indexed by task text + │ └── episodes/ + │ └── chunk-000/file-000.parquet episode metadata + offsets └── data/ └── chunk-000/ - ├── episode_000000.parquet (state + action + frame_index + ...) - └── episode_000001.parquet + └── file-000.parquet consolidated frame rows Phase 1 v1 ships parquet + metadata. Video materialization (mp4 in `videos/` subdir) requires ffmpeg-python + imageio-ffmpeg deps; deferred @@ -27,12 +28,13 @@ """ from __future__ import annotations -import hashlib import json import logging from pathlib import Path from typing import Any +import numpy as np + from tether.curate.format_converters.base import ( ConversionResult, FormatConverter, @@ -95,6 +97,7 @@ def convert( output = Path(output_dir).expanduser() output.mkdir(parents=True, exist_ok=True) (output / "meta").mkdir(exist_ok=True) + (output / "meta" / "episodes" / "chunk-000").mkdir(parents=True, exist_ok=True) (output / "data" / "chunk-000").mkdir(parents=True, exist_ok=True) # Read + group by episode across all input JSONLs. @@ -108,12 +111,14 @@ def convert( # Build the task index by hash-deduping instructions. task_index_by_text: dict[str, int] = {} - tasks_jsonl_lines: list[dict[str, Any]] = [] - episodes_jsonl_lines: list[dict[str, Any]] = [] - - # Sort episodes for deterministic episode_index assignment. - episode_index = 0 - global_step_index = 0 + task_rows: list[dict[str, Any]] = [] + pending_episodes: list[dict[str, Any]] = [] + action_dim = 0 + state_dim = 0 + + # Sort episodes for deterministic episode_index assignment. First pass + # discovers global action/state dims so every row in the consolidated + # v3 data shard has one stable schema. for episode_id, rows in sorted(all_episodes.items()): keep, reason = self._filter_episode( rows, min_quality=min_quality, canonical_only=canonical_only, @@ -128,32 +133,51 @@ def convert( result.skipped_episodes += 1 result.skipped_reasons["no_actions"] += 1 continue + action_dim = max(action_dim, max(len(a) for a in actions)) + state_dim = max(state_dim, max((len(s) for s in states if s is not None), default=0)) - # Task index lookup (one row in tasks.jsonl per unique instruction). + # Task index lookup (one row in tasks.parquet per unique instruction). instruction = rows[0].get("instruction_raw") if not isinstance(instruction, str): # Fall back to hash-only when raw instruction was redacted. instruction = rows[0].get("instruction_hash") or "unknown_task" if instruction not in task_index_by_text: task_index_by_text[instruction] = len(task_index_by_text) - tasks_jsonl_lines.append({ + task_rows.append({ "task_index": task_index_by_text[instruction], "task": instruction, }) task_idx = task_index_by_text[instruction] + pending_episodes.append({ + "episode_id": episode_id, + "rows": rows, + "actions": actions, + "states": states, + "instruction": instruction, + "task_idx": task_idx, + }) - # Build per-step rows for parquet. + if not pending_episodes: + result.warnings.append("no_episodes_passed_filter") + result.completed_at = _utc_now_iso() + return result + + data_rows: list[dict[str, Any]] = [] + episode_meta_rows: list[dict[str, Any]] = [] + all_actions: list[list[float]] = [] + all_states: list[list[float]] = [] + + global_step_index = 0 + for episode_index, episode in enumerate(pending_episodes): + rows = episode["rows"] + actions = episode["actions"] + states = episode["states"] + task_idx = episode["task_idx"] + instruction = episode["instruction"] step_count = len(actions) - frame_indices = list(range(step_count)) - global_indices = [global_step_index + i for i in range(step_count)] - episode_indices = [episode_index] * step_count - timestamps = [float(i) / self.fps for i in range(step_count)] - task_indices = [task_idx] * step_count # Pad missing states with zeros (state_vec replicated across chunk # rows; if state_vec is None for a row, we keep None → fill zeros). - state_dim = max((len(s) for s in states if s is not None), default=0) - action_dim = max(len(a) for a in actions) states_filled = [ s if s is not None else [0.0] * state_dim for s in states ] @@ -164,71 +188,106 @@ def convert( for s in states_filled ] - table_data = { - "frame_index": frame_indices, - "episode_index": episode_indices, - "index": global_indices, - "timestamp": timestamps, - "task_index": task_indices, - "action": actions_out, - } - if state_dim > 0: - table_data["observation.state"] = states_out - - table = pa.table(table_data) - parquet_path = output / "data" / "chunk-000" / f"episode_{episode_index:06d}.parquet" - pq.write_table(table, str(parquet_path)) - result.bytes_written += parquet_path.stat().st_size - result.episode_count += 1 - result.step_count += step_count - global_step_index += step_count - - episodes_jsonl_lines.append({ + for i, action in enumerate(actions_out): + data_row = { + "frame_index": i, + "episode_index": episode_index, + "index": global_step_index + i, + "timestamp": float(i) / self.fps, + "task_index": task_idx, + "action": action, + } + if state_dim > 0: + data_row["observation.state"] = states_out[i] + data_rows.append(data_row) + + all_actions.extend(actions_out) + all_states.extend(states_out) + episode_stats = self._build_feature_stats( + actions=actions_out, + states=states_out if state_dim > 0 else [], + ) + episode_meta = { "episode_index": episode_index, - "tasks": [task_idx], + "tasks": [instruction], "length": step_count, - }) + "data/chunk_index": 0, + "data/file_index": 0, + "dataset_from_index": global_step_index, + "dataset_to_index": global_step_index + step_count, + "meta/episodes/chunk_index": 0, + "meta/episodes/file_index": 0, + } + episode_meta.update(self._flatten_stats(episode_stats)) + episode_meta_rows.append(episode_meta) # Video materialization (per [curate-video] extra). Skips when # frames aren't decodable (hash-only image_b64) or when the # encoder dep isn't installed. - video_dims = None if self.encode_videos: - video_dims = self._maybe_encode_episode_video( + self._maybe_encode_episode_video( output=output, episode_index=episode_index, rows=rows, result=result, ) - episode_index += 1 + result.episode_count += 1 + result.step_count += step_count + global_step_index += step_count + + # Write the v3 file-based shards and metadata. + data_path = output / "data" / "chunk-000" / "file-000.parquet" + data_schema_fields = [ + ("frame_index", pa.int64()), + ("episode_index", pa.int64()), + ("index", pa.int64()), + ("timestamp", pa.float32()), + ("task_index", pa.int64()), + ("action", pa.list_(pa.float32(), list_size=action_dim)), + ] + if state_dim > 0: + data_schema_fields.append( + ("observation.state", pa.list_(pa.float32(), list_size=state_dim)) + ) + data_table = pa.Table.from_pylist(data_rows, schema=pa.schema(data_schema_fields)) + pq.write_table(data_table, str(data_path), compression="snappy") + result.bytes_written += data_path.stat().st_size - if result.episode_count == 0: - result.warnings.append("no_episodes_passed_filter") - result.completed_at = _utc_now_iso() - return result + episodes_path = output / "meta" / "episodes" / "chunk-000" / "file-000.parquet" + pq.write_table(pa.Table.from_pylist(episode_meta_rows), str(episodes_path), compression="snappy") + result.bytes_written += episodes_path.stat().st_size + + tasks_path = output / "meta" / "tasks.parquet" + self._write_tasks_parquet(pa=pa, pq=pq, path=tasks_path, tasks=task_rows) + result.bytes_written += tasks_path.stat().st_size - # Write tasks.jsonl + episodes.jsonl. - with open(output / "meta" / "tasks.jsonl", "w") as f: - for line in tasks_jsonl_lines: - f.write(json.dumps(line) + "\n") - with open(output / "meta" / "episodes.jsonl", "w") as f: - for line in episodes_jsonl_lines: - f.write(json.dumps(line) + "\n") + stats = self._build_feature_stats( + actions=all_actions, + states=all_states if state_dim > 0 else [], + ) + stats_path = output / "meta" / "stats.json" + with open(stats_path, "w") as f: + json.dump(stats, f, indent=2) + result.bytes_written += stats_path.stat().st_size # Write info.json. info = self._build_info_json( action_dim=action_dim, state_dim=state_dim, episode_count=result.episode_count, + frame_count=result.step_count, + task_count=len(task_rows), ) with open(output / "meta" / "info.json", "w") as f: json.dump(info, f, indent=2) + result.bytes_written += (output / "meta" / "info.json").stat().st_size # Write README.md (dataset card). - readme = self._build_readme(result, tasks_jsonl_lines) + readme = self._build_readme(result, task_rows) with open(output / "README.md", "w") as f: f.write(readme) + result.bytes_written += (output / "README.md").stat().st_size result.completed_at = _utc_now_iso() return result @@ -267,9 +326,10 @@ def _maybe_encode_episode_video( return None video_path = ( - output / "videos" / "chunk-000" + output / "videos" / f"observation.images.{self.video_camera_name}" - / f"episode_{episode_index:06d}.mp4" + / "chunk-000" + / f"file-{episode_index:03d}.mp4" ) try: bytes_written = encode_frames_to_mp4( @@ -306,6 +366,8 @@ def _build_info_json( action_dim: int, state_dim: int, episode_count: int, + frame_count: int, + task_count: int, ) -> dict[str, Any]: action_names = self.action_names or [ f"axis_{i}" for i in range(action_dim) @@ -317,9 +379,15 @@ def _build_info_json( "codebase_version": LEROBOT_V3_VERSION, "robot_type": self.robot_type, "fps": self.fps, + "total_episodes": episode_count, + "total_frames": frame_count, + "total_tasks": task_count, + "chunks_size": 1000, + "data_files_size_in_mb": 100, + "video_files_size_in_mb": 200, "splits": {"train": f"0:{episode_count}"}, - "data_path": "data/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.parquet", - "video_path": "videos/chunk-{episode_chunk:03d}/{video_key}/episode_{episode_index:06d}.mp4", + "data_path": "data/chunk-{chunk_index:03d}/file-{file_index:03d}.parquet", + "video_path": "videos/{video_key}/chunk-{chunk_index:03d}/file-{file_index:03d}.mp4", "features": { "action": { "dtype": "float32", @@ -341,6 +409,95 @@ def _build_info_json( } return info + @staticmethod + def _build_feature_stats( + *, + actions: list[list[float]], + states: list[list[float]], + ) -> dict[str, dict[str, list[float] | list[int]]]: + stats: dict[str, dict[str, list[float] | list[int]]] = {} + if actions: + stats["action"] = LeRobotV3Converter._vector_stats(actions) + if states: + stats["observation.state"] = LeRobotV3Converter._vector_stats(states) + return stats + + @staticmethod + def _vector_stats(vectors: list[list[float]]) -> dict[str, list[float] | list[int]]: + arr = np.asarray(vectors, dtype=np.float32) + return { + "min": arr.min(axis=0).astype(float).tolist(), + "max": arr.max(axis=0).astype(float).tolist(), + "mean": arr.mean(axis=0).astype(float).tolist(), + "std": arr.std(axis=0).astype(float).tolist(), + "count": [int(arr.shape[0])], + "q01": np.quantile(arr, 0.01, axis=0).astype(float).tolist(), + "q10": np.quantile(arr, 0.10, axis=0).astype(float).tolist(), + "q50": np.quantile(arr, 0.50, axis=0).astype(float).tolist(), + "q90": np.quantile(arr, 0.90, axis=0).astype(float).tolist(), + "q99": np.quantile(arr, 0.99, axis=0).astype(float).tolist(), + } + + @staticmethod + def _flatten_stats(stats: dict[str, dict[str, Any]]) -> dict[str, Any]: + flat: dict[str, Any] = {} + for feature_name, feature_stats in stats.items(): + for stat_name, value in feature_stats.items(): + flat[f"stats/{feature_name}/{stat_name}"] = value + return flat + + @staticmethod + def _write_tasks_parquet( + *, + pa: Any, + pq: Any, + path: Path, + tasks: list[dict[str, Any]], + ) -> None: + table = pa.table( + { + "task_index": [int(task["task_index"]) for task in tasks], + "task": [str(task["task"]) for task in tasks], + } + ) + pandas_metadata = { + "index_columns": ["task"], + "column_indexes": [ + { + "name": None, + "field_name": None, + "pandas_type": "unicode", + "numpy_type": "object", + "metadata": {"encoding": "UTF-8"}, + } + ], + "columns": [ + { + "name": "task_index", + "field_name": "task_index", + "pandas_type": "int64", + "numpy_type": "int64", + "metadata": None, + }, + { + "name": "task", + "field_name": "task", + "pandas_type": "unicode", + "numpy_type": "object", + "metadata": None, + }, + ], + "attributes": {}, + "creator": { + "library": "pyarrow", + "version": getattr(pa, "__version__", "unknown"), + }, + "pandas_version": "2.0.0", + } + metadata = dict(table.schema.metadata or {}) + metadata[b"pandas"] = json.dumps(pandas_metadata).encode() + pq.write_table(table.replace_schema_metadata(metadata), str(path), compression="snappy") + def _build_readme( self, result: ConversionResult, diff --git a/tests/test_curate_format_converters.py b/tests/test_curate_format_converters.py index 072c34c..a0075f2 100644 --- a/tests/test_curate_format_converters.py +++ b/tests/test_curate_format_converters.py @@ -67,6 +67,9 @@ def test_get_converter_unknown_raises() -> None: def test_lerobot_v3_basic_convert(tmp_path: Path) -> None: pytest.importorskip("pyarrow") + import pyarrow as pa + import pyarrow.parquet as pq + jsonl = tmp_path / "input.jsonl" _seed_jsonl(jsonl) out = tmp_path / "out_lerobot" @@ -76,11 +79,27 @@ def test_lerobot_v3_basic_convert(tmp_path: Path) -> None: assert result.episode_count == 2 assert result.step_count == 200 # 2 ep × 20 rows × 5 steps_per_chunk assert (out / "meta" / "info.json").exists() - assert (out / "meta" / "tasks.jsonl").exists() - assert (out / "meta" / "episodes.jsonl").exists() + assert (out / "meta" / "stats.json").exists() + assert (out / "meta" / "tasks.parquet").exists() + assert (out / "meta" / "episodes" / "chunk-000" / "file-000.parquet").exists() assert (out / "README.md").exists() - assert (out / "data" / "chunk-000" / "episode_000000.parquet").exists() - assert (out / "data" / "chunk-000" / "episode_000001.parquet").exists() + assert (out / "data" / "chunk-000" / "file-000.parquet").exists() + assert not (out / "meta" / "tasks.jsonl").exists() + assert not (out / "meta" / "episodes.jsonl").exists() + assert not list((out / "data" / "chunk-000").glob("episode_*.parquet")) + + data = pq.read_table(out / "data" / "chunk-000" / "file-000.parquet") + episodes = pq.read_table(out / "meta" / "episodes" / "chunk-000" / "file-000.parquet") + tasks = pq.read_table(out / "meta" / "tasks.parquet") + assert data.num_rows == 200 + assert data.schema.field("timestamp").type == pa.float32() + assert data.schema.field("action").type == pa.list_(pa.float32(), list_size=7) + assert episodes.num_rows == 2 + assert "dataset_from_index" in episodes.column_names + assert "dataset_to_index" in episodes.column_names + assert "data/chunk_index" in episodes.column_names + assert tasks.column_names == ["task_index", "task"] + assert json.loads(tasks.schema.metadata[b"pandas"])["index_columns"] == ["task"] def test_lerobot_v3_info_json_shape(tmp_path: Path) -> None: @@ -94,10 +113,43 @@ def test_lerobot_v3_info_json_shape(tmp_path: Path) -> None: info = json.loads((out / "meta" / "info.json").read_text()) assert info["robot_type"] == "franka" assert info["fps"] == 30 + assert info["total_episodes"] == 2 + assert info["total_frames"] == 200 + assert info["total_tasks"] == 2 + assert info["data_path"] == "data/chunk-{chunk_index:03d}/file-{file_index:03d}.parquet" assert "action" in info["features"] assert "observation.state" in info["features"] +def test_lerobot_v3_output_passes_dataset_validator(tmp_path: Path) -> None: + pytest.importorskip("pyarrow") + from tether.validation import Decision, overall_decision, run_all_checks + + jsonl = tmp_path / "input.jsonl" + _seed_jsonl(jsonl) + out = tmp_path / "out_valid" + + LeRobotV3Converter(encode_videos=False).convert(input_jsonl=jsonl, output_dir=out) + results = run_all_checks(out) + assert overall_decision(results) == Decision.OK, [ + (result.check_id, result.decision.value, result.summary) + for result in results + ] + + +def test_lerobot_v3_writes_stats_json(tmp_path: Path) -> None: + pytest.importorskip("pyarrow") + jsonl = tmp_path / "input.jsonl" + _seed_jsonl(jsonl) + out = tmp_path / "out_stats" + + LeRobotV3Converter(encode_videos=False).convert(input_jsonl=jsonl, output_dir=out) + stats = json.loads((out / "meta" / "stats.json").read_text()) + assert set(stats) == {"action", "observation.state"} + assert stats["action"]["count"] == [200] + assert len(stats["action"]["mean"]) == 7 + + def test_lerobot_v3_min_quality_filters(tmp_path: Path) -> None: pytest.importorskip("pyarrow") jsonl = tmp_path / "input.jsonl" @@ -179,7 +231,7 @@ def test_hdf5_basic_convert(tmp_path: Path) -> None: def test_hdf5_split_episodes(tmp_path: Path) -> None: - h5py = pytest.importorskip("h5py") + pytest.importorskip("h5py") jsonl = tmp_path / "input.jsonl" _seed_jsonl(jsonl) out = tmp_path / "out_split"