From 9f77e131842b88ccda354ff82ac012939685f2c2 Mon Sep 17 00:00:00 2001 From: David Gamez Diaz <1192523+davidgamez@users.noreply.github.com> Date: Tue, 9 Jun 2026 15:37:11 -0400 Subject: [PATCH] add parquet generatin script --- .gitignore | 1 + infra/batch/main.tf | 7 +- scripts/gtfs-to-parquet.sh | 89 +++++++ scripts/gtfs_parquet_serve.py | 145 ++++++++++ scripts/gtfs_to_parquet.py | 482 ++++++++++++++++++++++++++++++++++ 5 files changed, 721 insertions(+), 3 deletions(-) create mode 100755 scripts/gtfs-to-parquet.sh create mode 100644 scripts/gtfs_parquet_serve.py create mode 100644 scripts/gtfs_to_parquet.py diff --git a/.gitignore b/.gitignore index 01d1986a1..ec2c0ef9f 100644 --- a/.gitignore +++ b/.gitignore @@ -40,6 +40,7 @@ api/src/feeds_gen api/src/user_service_gen database_gen users_database_gen +gtfs_parquet_output # Ignore openapitools binaries scripts/bin diff --git a/infra/batch/main.tf b/infra/batch/main.tf index 3c6d8083a..9ae89d5d9 100644 --- a/infra/batch/main.tf +++ b/infra/batch/main.tf @@ -83,9 +83,10 @@ resource "google_storage_bucket" "datasets_bucket" { retention_duration_seconds = local.retention_duration_seconds } cors { - origin = ["*"] - method = ["GET"] - response_header = ["*"] + origin = ["*"] + method = ["GET", "HEAD"] + response_header = ["Content-Range", "Accept-Ranges", "Content-Length", "Content-Type", "ETag", "Range"] + max_age_seconds = 3600 } } diff --git a/scripts/gtfs-to-parquet.sh b/scripts/gtfs-to-parquet.sh new file mode 100755 index 000000000..2f43f2f00 --- /dev/null +++ b/scripts/gtfs-to-parquet.sh @@ -0,0 +1,89 @@ +#!/bin/bash +# +# gtfs-to-parquet.sh +# +# Converts a GTFS ZIP file into Parquet files for efficient browser-side +# pagination and search via DuckDB-WASM + HTTP Range requests. +# +# Usage: +# scripts/gtfs-to-parquet.sh --feed-id [--env dev|qa|prod] [--upload] +# scripts/gtfs-to-parquet.sh --url [--upload] [--env dev|qa|prod] +# scripts/gtfs-to-parquet.sh --file [--output ] +# +# Options (passed through to gtfs_to_parquet.py): +# --feed-id ID MobilityDatabase feed ID (e.g. mdb-2014). Downloads from +# files.mobilitydatabase.org/{id}/latest.zip automatically. +# --url URL Direct URL of the GTFS ZIP to download +# --file FILE Path to a local GTFS ZIP file +# --env dev|qa|prod Target GCS environment for upload (default: dev) +# --upload Upload generated Parquet files to GCS after conversion +# --dataset-id ID Override the dataset ID for the GCS upload path +# --output DIR Local output directory (default: ./gtfs_parquet_output) +# --row-group-size N Rows per Parquet row group (default: 50000) +# --no-sort Skip sorting for faster ingestion +# +# Examples: +# # Convert latest mdb-2014 and upload to dev(Only intended for internal team it requires MobilityData permissions): +# ./scripts/gtfs-to-parquet.sh --feed-id mdb-2014 --upload --env dev +# +# # Convert and upload to prod(Only intended for internal team it requires MobilityData permissions): +# ./scripts/gtfs-to-parquet.sh --feed-id mdb-2014 --upload --env prod +# +# # Convert from direct URL, keep local only: +# ./scripts/gtfs-to-parquet.sh --url "https://files.mobilitydatabase.org/mdb-10/latest.zip" +# +# # Convert local file: +# ./scripts/gtfs-to-parquet.sh --file ~/Downloads/gtfs.zip --output /tmp/gtfs_out +# +# After running locally, serve for the POC viewer: +# python3 scripts/gtfs_parquet_serve.py --dir ./gtfs_parquet_output +# + +set -euo pipefail + +# Resolve script directory (works with symlinks and relative paths) +SCRIPT_PATH="$( + cd -- "$(dirname "${BASH_SOURCE[0]}")" >/dev/null 2>&1 + pwd -P +)" +REPO_ROOT="$SCRIPT_PATH/.." +VENV_DIR="$SCRIPT_PATH/.venv-gtfs-parquet" +PYTHON_SCRIPT="$SCRIPT_PATH/gtfs_to_parquet.py" + +# Color codes +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +CYAN='\033[0;36m' +NC='\033[0m' + +display_usage() { + # Print the leading comment block, skipping shebang; stop at first code line + awk 'NR==1{next} /^#/{sub(/^# ?/,""); print; next} /^$/{next} {exit}' "$0" + exit 0 +} + +# Show help if requested or no arguments +if [[ $# -eq 0 ]] || [[ "$1" == "--help" ]] || [[ "$1" == "-h" ]]; then + display_usage +fi + +printf "${CYAN}[gtfs-to-parquet]${NC} Setting up Python environment...\n" + +# Create virtual environment if it doesn't exist +if [ ! -d "$VENV_DIR" ]; then + pip3 install --disable-pip-version-check virtualenv >/dev/null 2>&1 + python3 -m virtualenv "$VENV_DIR" >/dev/null 2>&1 + printf "${GREEN}[OK]${NC} Created virtualenv at %s\n" "$VENV_DIR" +fi + +# Install / upgrade dependencies silently +"$VENV_DIR/bin/pip" install --disable-pip-version-check --quiet --upgrade \ + "duckdb>=0.10" \ + "requests>=2.28" 2>/dev/null + +printf "${GREEN}[OK]${NC} Dependencies ready\n" +printf "${CYAN}[gtfs-to-parquet]${NC} Running conversion...\n\n" + +# Run the Python script, passing all arguments through +"$VENV_DIR/bin/python" "$PYTHON_SCRIPT" "$@" diff --git a/scripts/gtfs_parquet_serve.py b/scripts/gtfs_parquet_serve.py new file mode 100644 index 000000000..91d81e68e --- /dev/null +++ b/scripts/gtfs_parquet_serve.py @@ -0,0 +1,145 @@ +#!/usr/bin/env python3 +""" +gtfs-parquet-serve.py + +Serves a directory of Parquet files over HTTP with CORS headers enabled, +so the GTFS Viewer POC (running on a different port) can load them directly +from the browser via DuckDB-WASM HTTP Range requests. + +Usage: + python3 scripts/gtfs_parquet_serve.py [--dir DIR] [--port PORT] + +Options: + --dir DIR Directory to serve (default: ./gtfs_parquet_output) + --port PORT Port to listen on (default: 8888) + +Example workflow: + # 1. Convert a feed + ./scripts/gtfs-to-parquet.sh --url "https://files.mobilitydatabase.org/mdb-10/..." --output /tmp/gtfs_out + + # 2. Serve it (in another terminal) + python3 scripts/gtfs_parquet_serve.py --dir /tmp/gtfs_out + + # 3. Open the viewer and load: + # http://localhost:8888/metadata.json +""" + +import argparse +import os +from http.server import HTTPServer, SimpleHTTPRequestHandler + + +class CORSRangeHandler(SimpleHTTPRequestHandler): + """ + Static file handler with: + - CORS headers (required for browser cross-origin DuckDB-WASM fetches) + - HTTP 206 Partial Content (Range requests, required for DuckDB row-group skipping) + """ + + def end_headers(self): + self.send_header("Access-Control-Allow-Origin", "*") + self.send_header("Access-Control-Allow-Methods", "GET, HEAD, OPTIONS") + self.send_header("Access-Control-Allow-Headers", "Range, Content-Type") + self.send_header("Access-Control-Expose-Headers", "Content-Range, Accept-Ranges, Content-Length") + super().end_headers() + + def do_OPTIONS(self): + self.send_response(200) + self.end_headers() + + def do_GET(self): + """Handle GET with Range support for Parquet files.""" + range_header = self.headers.get("Range") + if range_header and self.path.endswith(".parquet"): + self._serve_range(range_header) + else: + super().do_GET() + + def _serve_range(self, range_header: str): + """Serve a byte range (RFC 7233) — enables DuckDB-WASM row-group skipping.""" + path = self.translate_path(self.path) + try: + file_size = os.path.getsize(path) + except OSError: + self.send_error(404, "File not found") + return + + # Parse "bytes=START-END" + try: + unit, ranges = range_header.split("=", 1) + assert unit.strip() == "bytes" + start_str, end_str = ranges.split("-", 1) + start = int(start_str) if start_str else 0 + end = int(end_str) if end_str else file_size - 1 + except Exception: + self.send_error(416, "Requested Range Not Satisfiable") + return + + end = min(end, file_size - 1) + length = end - start + 1 + + try: + with open(path, "rb") as f: + f.seek(start) + data = f.read(length) + except OSError: + self.send_error(500, "Internal Server Error") + return + + self.send_response(206) + self.send_header("Content-Type", "application/octet-stream") + self.send_header("Content-Length", str(length)) + self.send_header("Content-Range", f"bytes {start}-{end}/{file_size}") + self.send_header("Accept-Ranges", "bytes") + self.end_headers() + self.wfile.write(data) + + def log_message(self, fmt, *args): + super().log_message(fmt, *args) + + +def main(): + parser = argparse.ArgumentParser(description="CORS-enabled static server for GTFS Parquet POC.") + parser.add_argument("--dir", default="./gtfs_parquet_output", metavar="DIR", + help="Directory to serve (default: ./gtfs_parquet_output)") + parser.add_argument("--port", type=int, default=8888, metavar="PORT", + help="Port to listen on (default: 8888)") + args = parser.parse_args() + + serve_dir = os.path.abspath(args.dir) + if not os.path.isdir(serve_dir): + print(f"[ERROR] Directory not found: {serve_dir}") + raise SystemExit(1) + + os.chdir(serve_dir) + + # List available tables from metadata.json if present + meta_path = os.path.join(serve_dir, "metadata.json") + if os.path.exists(meta_path): + import json + with open(meta_path) as f: + meta = json.load(f) + tables = meta.get("tables", {}) + print(f"\nšŸ“‚ Serving: {serve_dir}") + print(f" {len(tables)} tables:") + for name, info in tables.items(): + rows = info.get("row_count", "?") + size = info.get("size_bytes", 0) + size_str = f"{size/1e6:.2f} MB" if size >= 1e6 else f"{size/1e3:.1f} KB" + print(f" • {name:<20} {rows:>10,} rows {size_str}") + else: + print(f"\nšŸ“‚ Serving: {serve_dir}") + + print(f"\nšŸš€ Server running at: http://localhost:{args.port}") + print(f"\n Load in the GTFS Viewer (http://localhost:3000/en/gtfs-viewer):") + print(f" → http://localhost:{args.port}/metadata.json\n") + + server = HTTPServer(("0.0.0.0", args.port), CORSRangeHandler) + try: + server.serve_forever() + except KeyboardInterrupt: + print("\n[INFO] Server stopped.") + + +if __name__ == "__main__": + main() diff --git a/scripts/gtfs_to_parquet.py b/scripts/gtfs_to_parquet.py new file mode 100644 index 000000000..4f989b315 --- /dev/null +++ b/scripts/gtfs_to_parquet.py @@ -0,0 +1,482 @@ +#!/usr/bin/env python3 +""" +gtfs_to_parquet.py + +Converts a GTFS ZIP file into Parquet files suitable for efficient browser-side +pagination and search via DuckDB-WASM + HTTP Range requests (same principle as PMTiles). + +Each GTFS CSV is sorted by its primary key column(s) and written with row-group +statistics so that DuckDB can skip irrelevant row groups when paginating or filtering. + +Usage: + # From a MobilityDatabase feed ID (looks up latest dataset automatically): + python3 scripts/gtfs_to_parquet.py --feed-id mdb-2014 [--env dev|qa|prod] [--upload] + + # From a direct URL: + python3 scripts/gtfs_to_parquet.py --url [--upload] [--env dev|qa|prod] + + # From a local file: + python3 scripts/gtfs_to_parquet.py --file + +Options: + --feed-id ID MobilityDatabase feed ID (e.g. mdb-2014). Resolves the + latest dataset from GCS and uploads result alongside it. + --url URL Direct URL of the GTFS ZIP to download + --file FILE Path to a local GTFS ZIP file + --env ENV Target environment: dev, qa, or prod (default: dev). + Used with --feed-id or --upload. + --upload After conversion, upload Parquet files to GCS alongside + the source dataset (requires gcloud/gsutil auth). + --dataset-id ID Specific dataset stable ID to use (optional, overrides + latest resolution when used with --feed-id). + --output DIR Local output directory (default: ./gtfs_parquet_output) + --row-group-size N Rows per Parquet row group (default: 50000) + --no-sort Skip sorting (faster ingestion, slower queries) + +GCS layout: + gs://mobilitydata-datasets-{env}/{feed_id}/{dataset_id}/gtfs_parquet/ + metadata.json + stops.parquet + routes.parquet + trips.parquet + stop_times.parquet + ... +""" + +import argparse +import io +import json +import os +import shutil +import subprocess +import sys +import tempfile +import zipfile +from datetime import datetime, timezone +from pathlib import Path + +try: + import duckdb + import requests +except ImportError as e: + print(f"[ERROR] Missing dependency: {e}") + print(" Run: pip install duckdb requests") + sys.exit(1) + + +# Maps GTFS filename → columns to sort by (order matters for row-group skipping) +GTFS_FILE_CONFIGS: dict[str, list[str]] = { + "agency.txt": ["agency_id"], + "stops.txt": ["stop_id"], + "routes.txt": ["route_id"], + "trips.txt": ["route_id", "trip_id"], + "stop_times.txt": ["trip_id", "stop_sequence"], + "calendar.txt": ["service_id"], + "calendar_dates.txt": ["service_id", "date"], + "shapes.txt": ["shape_id", "shape_pt_sequence"], + "fare_attributes.txt": ["fare_id"], + "fare_rules.txt": ["route_id"], + "frequencies.txt": ["trip_id", "start_time"], + "transfers.txt": ["from_stop_id", "to_stop_id"], + "pathways.txt": ["pathway_id"], + "levels.txt": ["level_id"], + "attributions.txt": ["organization_name"], + "feed_info.txt": [], +} + +# Maps GTFS filename → columns that are semantically searchable (IDs, names, codes). +# Numeric columns (times, sequences, coordinates) are excluded — they are not useful +# for text search and prevent DuckDB from skipping row groups. +GTFS_SEARCH_COLUMNS: dict[str, list[str]] = { + "agency.txt": ["agency_id", "agency_name"], + "stops.txt": ["stop_id", "stop_name", "stop_code", "zone_id", "parent_station"], + "routes.txt": ["route_id", "route_short_name", "route_long_name", "agency_id"], + "trips.txt": ["trip_id", "route_id", "service_id", "trip_headsign", "shape_id"], + "stop_times.txt": ["trip_id", "stop_id", "stop_headsign"], + "calendar.txt": ["service_id"], + "calendar_dates.txt": ["service_id"], + "shapes.txt": ["shape_id"], + "fare_attributes.txt": ["fare_id", "agency_id"], + "fare_rules.txt": ["fare_id", "route_id", "origin_id", "destination_id", "contains_id"], + "frequencies.txt": ["trip_id"], + "transfers.txt": ["from_stop_id", "to_stop_id"], + "pathways.txt": ["pathway_id", "from_stop_id", "to_stop_id"], + "levels.txt": ["level_id"], + "attributions.txt": ["organization_name"], + "feed_info.txt": ["feed_publisher_name"], +} + +BUCKET_TEMPLATE = "mobilitydata-datasets-{env}" +DEFAULT_ROW_GROUP_SIZE = 50_000 +DOWNLOAD_CHUNK_SIZE = 8 * 1024 * 1024 # 8 MB + + +# ─── Argument parsing ──────────────────────────────────────────────────────── + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Convert a GTFS ZIP to Parquet files for efficient browser viewing.", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + source = parser.add_mutually_exclusive_group(required=True) + source.add_argument("--feed-id", metavar="ID", + help="MobilityDatabase feed ID (e.g. mdb-2014)") + source.add_argument("--url", metavar="URL", + help="Direct URL of the GTFS ZIP to download") + source.add_argument("--file", metavar="FILE", + help="Path to a local GTFS ZIP file") + + parser.add_argument("--env", choices=["dev", "qa", "prod"], default="dev", + help="Target GCS environment (default: dev). Used with --feed-id or --upload.") + parser.add_argument("--upload", action="store_true", + help="Upload generated Parquet files to GCS after conversion.") + parser.add_argument("--dataset-id", metavar="ID", + help="Specific dataset stable ID (optional override with --feed-id).") + parser.add_argument("--output", metavar="DIR", default="./gtfs_parquet_output", + help="Local output directory (default: ./gtfs_parquet_output)") + parser.add_argument("--row-group-size", type=int, default=DEFAULT_ROW_GROUP_SIZE, + metavar="N", help=f"Rows per row group (default: {DEFAULT_ROW_GROUP_SIZE})") + parser.add_argument("--memory-limit", type=int, default=4, metavar="GB", + help="DuckDB memory limit in GB for sorting large files (default: 4)") + parser.add_argument("--no-sort", action="store_true", + help="Skip sorting (faster ingestion, slower queries)") + return parser.parse_args() + + +# ─── GCS helpers ───────────────────────────────────────────────────────────── + +def gsutil(*args: str, capture: bool = True) -> str: + """Run a gsutil command. Raises on non-zero exit.""" + cmd = ["gsutil"] + list(args) + result = subprocess.run(cmd, capture_output=capture, text=True) + if result.returncode != 0: + raise RuntimeError( + f"gsutil {' '.join(args)} failed:\n{result.stderr.strip()}" + ) + return result.stdout.strip() if capture else "" + + +def resolve_latest_dataset_id(feed_id: str, env: str) -> str: + """ + Returns the latest dataset_stable_id for a feed by listing the GCS bucket. + Used to determine the upload target path — download always uses latest.zip. + """ + bucket = BUCKET_TEMPLATE.format(env=env) + prefix = f"gs://{bucket}/{feed_id}/" + + print(f"[INFO] Resolving latest dataset ID for {feed_id} in {bucket}…") + try: + listing = gsutil("ls", prefix) + except RuntimeError as e: + err = str(e) + if "matched no objects" in err or "BucketNotFoundException" in err: + raise RuntimeError( + f"Feed '{feed_id}' has no datasets in {bucket}.\n" + f"Use --dataset-id to specify the target dataset ID explicitly,\n" + f"or check the feed exists in the '{env}' environment." + ) + raise RuntimeError( + f"Cannot list GCS bucket. Ensure you are authenticated:\n gcloud auth login\n{e}" + ) + + # Versioned dataset folders look like: gs://.../mdb-2014-202507081807/ + folders = sorted([ + line.rstrip("/") + for line in listing.splitlines() + if line.endswith("/") and f"/{feed_id}-" in line + ]) + + if not folders: + raise RuntimeError( + f"No dataset folders found for {feed_id} in {prefix}.\n" + f"Use --dataset-id to specify one explicitly." + ) + + latest_folder = folders[-1] # alphabetical = chronological (YYYYMMDDHHII suffix) + dataset_id = latest_folder.split("/")[-1] + print(f"[INFO] Latest dataset: {dataset_id} ({len(folders)} datasets found)") + return dataset_id + + +def latest_zip_url(feed_id: str) -> str: + """Public URL for the latest dataset ZIP of a feed (all environments share prod files).""" + return f"https://files.mobilitydatabase.org/{feed_id}/latest.zip" + + +def upload_to_gcs(local_dir: Path, feed_id: str, dataset_id: str, env: str) -> None: + """Upload all Parquet + metadata.json from local_dir to GCS and make them public. + Uses gsutil (gcloud auth login credentials) for upload and per-object ACLs. + """ + bucket_name = BUCKET_TEMPLATE.format(env=env) + gcs_prefix = f"{feed_id}/{dataset_id}/gtfs_parquet" + + print(f"\n[UPLOAD] Uploading to gs://{bucket_name}/{gcs_prefix}/") + files = sorted( + list(local_dir.glob("*.parquet")) + [local_dir / "metadata.json"] + ) + files = [f for f in files if f.exists()] + + for local_file in files: + gcs_path = f"gs://{bucket_name}/{gcs_prefix}/{local_file.name}" + gsutil("cp", str(local_file), gcs_path, capture=False) + # Per-object public-read ACL (works with legacy ACL buckets like dev/qa) + try: + gsutil("acl", "ch", "-u", "AllUsers:R", gcs_path, capture=True) + except Exception: + pass # Uniform IAM bucket — public access is set at bucket level + size_mb = local_file.stat().st_size / 1e6 + print(f" āœ“ {local_file.name} ({size_mb:.2f} MB)") + + public_base = f"https://storage.googleapis.com/{bucket_name}/{gcs_prefix}" + print(f"\n[UPLOAD] āœ“ {len(files)} files uploaded") + print(f"\n[UPLOAD] Public metadata URL (paste in GTFS Viewer):") + print(f" {public_base}/metadata.json") + + +# ─── Download helpers ───────────────────────────────────────────────────────── + +def download_from_url(url: str) -> bytes: + print(f"[INFO] Downloading from URL: {url}") + response = requests.get(url, stream=True, timeout=120) + response.raise_for_status() + total = int(response.headers.get("content-length", 0)) + chunks = [] + downloaded = 0 + for chunk in response.iter_content(chunk_size=DOWNLOAD_CHUNK_SIZE): + chunks.append(chunk) + downloaded += len(chunk) + if total: + print(f" {downloaded / 1e6:.1f} / {total / 1e6:.1f} MB ({downloaded/total*100:.0f}%)", end="\r") + print() + return b"".join(chunks) + + +# ─── CSV → Parquet conversion ───────────────────────────────────────────────── + +def csv_to_parquet( + csv_bytes: bytes, + output_path: Path, + sort_columns: list[str], + search_columns: list[str], + row_group_size: int, + skip_sort: bool, + memory_limit_gb: int = 4, +) -> dict: + """ + Convert CSV bytes to a Parquet file using DuckDB. + + DuckDB handles out-of-core sorting (spills to disk when data exceeds memory_limit_gb) + so this works on arbitrarily large files without OOM — unlike pyarrow which requires + the entire dataset + its sorted copy to fit in RAM simultaneously. + """ + spill_dir = output_path.parent / ".duckdb_tmp" + spill_dir.mkdir(exist_ok=True) + + # Write CSV bytes to a temp file — DuckDB needs a file path + tmp_csv = Path(tempfile.mktemp(suffix=".csv", dir=output_path.parent)) + tmp_csv.write_bytes(csv_bytes) + + try: + con = duckdb.connect(config={ + "memory_limit": f"{memory_limit_gb}GB", + "temp_directory": str(spill_dir), + }) + + # Detect schema — use LIMIT 0 to avoid loading data, no pandas needed + def get_columns(encoding_opt: str = "") -> list[str]: + enc = f", encoding='latin-1'" if encoding_opt else "" + res = con.execute( + f"SELECT * FROM read_csv('{tmp_csv}', auto_detect=true, " + f"ignore_errors=true, null_padding=true{enc}) LIMIT 0" + ) + return [desc[0] for desc in res.description] + + try: + column_names = get_columns() + except Exception: + column_names = get_columns("latin-1") + valid_sorts = ( + [c for c in sort_columns if c in column_names] + if not skip_sort else [] + ) + order_clause = f"ORDER BY {', '.join(valid_sorts)}" if valid_sorts else "" + + # Try UTF-8 first; fall back to latin-1 for feeds with non-ASCII characters + for encoding_opt in ("", ", encoding='latin-1'"): + try: + con.execute(f""" + COPY ( + SELECT * FROM read_csv('{tmp_csv}', + auto_detect=true, + ignore_errors=true, + null_padding=true + {encoding_opt} + ) + {order_clause} + ) TO '{output_path}' ( + FORMAT PARQUET, + ROW_GROUP_SIZE {row_group_size}, + COMPRESSION 'snappy' + ) + """) + break + except Exception as e: + if encoding_opt: + raise + last_err = e + + row_count = con.execute( + f"SELECT COUNT(*) FROM read_parquet('{output_path}')" + ).fetchone()[0] + con.close() + + return { + "row_count": row_count, + "size_bytes": output_path.stat().st_size, + "columns": column_names, + "sort_columns": valid_sorts, + "search_columns": [c for c in search_columns if c in column_names], + } + + finally: + if tmp_csv.exists(): + tmp_csv.unlink() + if spill_dir.exists(): + shutil.rmtree(spill_dir, ignore_errors=True) + + +# ─── Main ───────────────────────────────────────────────────────────────────── + +def main() -> None: + args = parse_args() + output_dir = Path(args.output) + output_dir.mkdir(parents=True, exist_ok=True) + + feed_id: str | None = None + dataset_id: str | None = getattr(args, "dataset_id", None) + + # ── Acquire GTFS ZIP bytes + if args.feed_id: + feed_id = args.feed_id + url = latest_zip_url(feed_id) + zip_bytes = download_from_url(url) + source_label = url + + # Resolve dataset_id for the upload path (only when upload is needed) + if args.upload and not dataset_id: + dataset_id = resolve_latest_dataset_id(feed_id, args.env) + + elif args.url: + zip_bytes = download_from_url(args.url) + source_label = args.url + # Try to derive feed_id/dataset_id from files.mobilitydatabase.org URL + # e.g. https://files.mobilitydatabase.org/mdb-2014/mdb-2014-20250708/mdb-2014-20250708.zip + parts = args.url.rstrip("/").split("/") + if len(parts) >= 3 and parts[-3].startswith("mdb-"): + feed_id = parts[-3] + dataset_id = parts[-2] if parts[-2].startswith(feed_id) else None + + else: + local_path = Path(args.file) + if not local_path.exists(): + print(f"[ERROR] File not found: {local_path}") + sys.exit(1) + print(f"[INFO] Reading local file: {local_path}") + zip_bytes = local_path.read_bytes() + source_label = str(local_path.resolve()) + + print(f"[INFO] ZIP size: {len(zip_bytes) / 1e6:.1f} MB") + + # ── Open ZIP + try: + zf = zipfile.ZipFile(io.BytesIO(zip_bytes)) + except zipfile.BadZipFile as e: + print(f"[ERROR] Not a valid ZIP file: {e}") + sys.exit(1) + + zip_names = {Path(n).name: n for n in zf.namelist()} + print(f"[INFO] Found {len(zip_names)} files: {', '.join(sorted(zip_names.keys()))}") + + tables_meta: dict[str, dict] = {} + processed = 0 + skipped = [] + + for gtfs_filename, sort_cols in GTFS_FILE_CONFIGS.items(): + if gtfs_filename not in zip_names: + skipped.append(gtfs_filename) + continue + + parquet_name = gtfs_filename.replace(".txt", ".parquet") + output_path = output_dir / parquet_name + print(f"\n[INFO] Processing {gtfs_filename} → {parquet_name}") + csv_bytes = zf.read(zip_names[gtfs_filename]) + print(f" CSV size: {len(csv_bytes) / 1e6:.2f} MB") + + try: + search_cols = GTFS_SEARCH_COLUMNS.get(gtfs_filename, []) + meta = csv_to_parquet(csv_bytes, output_path, sort_cols, search_cols, + args.row_group_size, args.no_sort, args.memory_limit) + tables_meta[gtfs_filename.replace(".txt", "")] = {"file": parquet_name, **meta} + ratio = meta["size_bytes"] / max(len(csv_bytes), 1) * 100 + print(f" āœ“ {meta['row_count']:,} rows | " + f"{meta['size_bytes'] / 1e6:.2f} MB ({ratio:.0f}% of CSV)") + processed += 1 + except Exception as e: + print(f" āœ— Failed: {e}") + + # Convert any extra .txt files not in GTFS_FILE_CONFIGS + for name, zip_path in zip_names.items(): + if not name.endswith(".txt"): + continue + table_key = name.replace(".txt", "") + if table_key in tables_meta: + continue + parquet_name = name.replace(".txt", ".parquet") + print(f"\n[INFO] Processing extra file {name} → {parquet_name}") + try: + csv_bytes = zf.read(zip_path) + meta = csv_to_parquet(csv_bytes, output_dir / parquet_name, [], [], + args.row_group_size, args.no_sort, args.memory_limit) + tables_meta[table_key] = {"file": parquet_name, **meta} + print(f" āœ“ {meta['row_count']:,} rows") + processed += 1 + except Exception as e: + print(f" āœ— Failed: {e}") + + # ── Write metadata.json + metadata = { + "source": source_label, + "generated_at": datetime.now(timezone.utc).isoformat(), + "feed_id": feed_id, + "dataset_id": dataset_id, + "env": args.env if args.feed_id or args.upload else None, + "row_group_size": args.row_group_size, + "tables": tables_meta, + } + metadata_path = output_dir / "metadata.json" + metadata_path.write_text(json.dumps(metadata, indent=2)) + + print(f"\n{'='*60}") + print(f"[DONE] Converted {processed} tables → {output_dir}/") + if skipped: + print(f" Skipped (not in ZIP): {', '.join(skipped)}") + + # ── Upload to GCS if requested + if args.upload: + if not feed_id or not dataset_id: + print("\n[WARN] --upload requires a resolvable feed_id and dataset_id.") + print(" Use --feed-id, or a files.mobilitydatabase.org URL.") + else: + upload_to_gcs(output_dir, feed_id, dataset_id, args.env) + else: + print(f"\nTo serve locally for the POC viewer:") + print(f" python3 scripts/gtfs_parquet_serve.py --dir {output_dir}") + print(f"\nTo upload to GCS (dev by default):") + print(f" Re-run with --upload [--env dev|qa|prod]") + if feed_id and dataset_id: + print(f"\n Or add --upload to this command and set --env {args.env}") + + +if __name__ == "__main__": + main() +