diff --git a/functions-python/batch_process_dataset/src/pipeline_tasks.py b/functions-python/batch_process_dataset/src/pipeline_tasks.py index 9e739d318..5012b65eb 100644 --- a/functions-python/batch_process_dataset/src/pipeline_tasks.py +++ b/functions-python/batch_process_dataset/src/pipeline_tasks.py @@ -7,8 +7,12 @@ from sqlalchemy.orm import Session from shared.database.database import with_db_session -from shared.database_gen.sqlacodegen_models import Gtfsdataset -from shared.helpers.utils import create_http_task, create_http_pmtiles_builder_task +from shared.database_gen.sqlacodegen_models import Gtfsdataset, GtfsDatasetChangelog +from shared.helpers.utils import ( + create_http_task, + create_http_pmtiles_builder_task, + create_http_gtfs_datasets_comparer_task, +) def create_http_reverse_geolocation_processor_task( @@ -136,3 +140,45 @@ def create_pipeline_tasks(dataset: Gtfsdataset, db_session: Session) -> None: f"routes.txt file size : {routes_file.file_size_bytes} bytes" f" and changed files: {changed_files}" ) + + # Create GTFS change tracker task when a previous dataset exists + previous_dataset = ( + db_session.query(Gtfsdataset) + .filter( + Gtfsdataset.feed_id == dataset.feed_id, + Gtfsdataset.id != dataset.id, + ) + .order_by(Gtfsdataset.downloaded_at.desc()) + .first() + ) + if previous_dataset: + # Check the DB for an existing changelog record rather than the GCS blob presence. + # The unique constraint on (previous_dataset_id, current_dataset_id) makes this the + # authoritative idempotency check. GCS blob presence could be used instead, but that + # would require an extra API call and could miss cases where the blob exists but the + # DB record does not (or vice versa). + changelog_exists = ( + db_session.query(GtfsDatasetChangelog) + .filter_by( + previous_dataset_id=previous_dataset.id, + current_dataset_id=dataset.id, + ) + .first() + is not None + ) + if changelog_exists: + logging.info( + "Skipping change tracker task for dataset %s: changelog already exists.", + dataset_stable_id, + ) + else: + create_http_gtfs_datasets_comparer_task( + feed_stable_id=stable_id, + base_dataset_stable_id=previous_dataset.stable_id, + new_dataset_stable_id=dataset_stable_id, + ) + else: + logging.info( + "Skipping change tracker task for dataset %s: no previous dataset found.", + dataset_stable_id, + ) diff --git a/functions-python/batch_process_dataset/tests/test_batch_process_dataset_main.py b/functions-python/batch_process_dataset/tests/test_batch_process_dataset_main.py index 784c40612..4bdd8901c 100644 --- a/functions-python/batch_process_dataset/tests/test_batch_process_dataset_main.py +++ b/functions-python/batch_process_dataset/tests/test_batch_process_dataset_main.py @@ -449,8 +449,9 @@ def mock_remove_side_effect(path): @patch.dict( os.environ, {"FEEDS_CREDENTIALS": '{"test_stable_id": "test_credentials"}'} ) + @patch("pipeline_tasks.create_http_gtfs_datasets_comparer_task") @with_db_session(db_url=default_db_url) - def test_process(self, db_session): + def test_process(self, mock_gtfs_datasets_comparer_task, db_session): feeds = db_session.query(Gtfsfeed).all() feed_id = feeds[0].id diff --git a/functions-python/gtfs_datasets_comparer/README.md b/functions-python/gtfs_datasets_comparer/README.md new file mode 100644 index 000000000..cb7f46293 --- /dev/null +++ b/functions-python/gtfs_datasets_comparer/README.md @@ -0,0 +1,81 @@ +# GTFS Change Tracker + +This function computes a structured diff between two consecutive GTFS datasets and stores the resulting changelog in GCS and the database. + +The function reads pre-extracted GTFS files from a GCS-mounted bucket (uploaded by `batch_process_dataset`), runs the diff engine, uploads the changelog JSON to GCS, and upserts a row in `gtfs_dataset_changelog`. + +## Usage + +The function receives the following request: +``` +{ + "feed_stable_id": str, – stable_id of the GTFS feed + "base_dataset_stable_id": str, – stable_id of the base (older) dataset + "new_dataset_stable_id": str, – stable_id of the new (recent) dataset + "disallow_overwrite": bool (optional), – skip if changelog already exists (default: false) + "dry_run": bool (optional) – compute diff but skip GCS upload and DB write (default: false) +} +``` + +Example: +```json +{ + "feed_stable_id": "mdb-2142", + "base_dataset_stable_id": "mdb-2142-202502251658", + "new_dataset_stable_id": "mdb-2142-202507081652" +} +``` + +Example curl call: +```bash +curl -X POST https:// \ + -H "Authorization: Bearer $(gcloud auth print-identity-token)" \ + -H "Content-Type: application/json" \ + -d '{ + "feed_stable_id": "mdb-2142", + "base_dataset_stable_id": "mdb-2142-202502251658", + "new_dataset_stable_id": "mdb-2142-202507081652" + }' +``` + +### `disallow_overwrite` +By default the function will overwrite an existing changelog for the same dataset pair. Set `disallow_overwrite: true` to skip execution if a changelog already exists in GCS. + +### `dry_run` +When `dry_run: true`, the diff is computed and a summary is returned in the response, but nothing is written to GCS or the database. Useful for validating that the extracted files are present and the diff engine runs correctly. + +## Response + +Success: +```json +{ + "status": "success", + "message": "Changelog generated successfully.", + "changelog_url": "https://storage.googleapis.com////..." +} +``` + +Dry run: +```json +{ + "status": "success", + "message": "Dry run completed. Diff computed but not persisted.", + "summary": { + "total_changes": 42, + "files_added_count": 0, + "files_deleted_count": 0, + "files_modified_count": 3, + ... + } +} +``` + +The function always returns HTTP 200, including on errors. Errors are reported in the response body under `"status": "error"`. This prevents GCP from retrying failures where re-running with the same parameters would produce the same result. + +## GCP environment variables + +- `DATASETS_BUCKET_NAME`: The GCS bucket where datasets are stored (required). Must include the environment suffix, e.g. `mobilitydata-datasets-dev`. +- `DATASETS_BUCKET_MOUNT`: Mount path for the GCS bucket (default: `/mobilitydata-datasets`). +- `GTFS_DIFF_DUCKDB_TMPDIR`: Mount path for the in-memory tmpfs used by the diff engine (default: `/tmp/in-memory`). Used by `limit_gcp_memory` to compute the available process memory and set `RLIMIT_AS`, preventing silent OOM kills. +- `MEMORY_MARGIN_MB`: Safety margin in MiB subtracted from the memory limit before setting `RLIMIT_AS` (default: `200`). +- `LOGGING_LEVEL`: Log level (default: `INFO`). diff --git a/functions-python/gtfs_datasets_comparer/function_config.json b/functions-python/gtfs_datasets_comparer/function_config.json new file mode 100644 index 000000000..a2e36b0f2 --- /dev/null +++ b/functions-python/gtfs_datasets_comparer/function_config.json @@ -0,0 +1,31 @@ +{ + "name": "gtfs-datasets-comparer", + "description": "Tracks changes between two GTFS datasets and stores a structured changelog in GCS and the database", + "entry_point": "gtfs_datasets_comparer", + "timeout": 540, + "memory": "8Gi", + "trigger_http": true, + "include_folders": ["helpers"], + "include_api_folders": ["database_gen", "database", "common"], + "environment_variables": [ + { + "key": "DATASETS_BUCKET_NAME" + }, + { + "key": "LOGGING_LEVEL" + }, + { + "key": "GTFS_DIFF_DUCKDB_TMPDIR" + } + ], + "secret_environment_variables": [ + { + "key": "FEEDS_DATABASE_URL" + } + ], + "ingress_settings": "ALLOW_ALL", + "max_instance_request_concurrency": 1, + "max_instance_count": 10, + "min_instance_count": 0, + "available_cpu": 2 +} diff --git a/functions-python/gtfs_datasets_comparer/requirements.txt b/functions-python/gtfs_datasets_comparer/requirements.txt new file mode 100644 index 000000000..9bfc2b25a --- /dev/null +++ b/functions-python/gtfs_datasets_comparer/requirements.txt @@ -0,0 +1,22 @@ +# Common packages +functions-framework==3.* +google-cloud-logging +psycopg2-binary==2.9.6 +urllib3~=2.6.3 +attrs~=23.1.0 +certifi~=2025.8.3 + +# SQL Alchemy and Geo Alchemy +SQLAlchemy==2.0.23 +geoalchemy2==0.14.7 + +# Google specific packages for this function +flask +google-cloud-storage +google-cloud-tasks + +# GTFS diff engine +gtfs-diff-engine + +# Configuration +python-dotenv==1.2.2 diff --git a/functions-python/gtfs_datasets_comparer/requirements_dev.txt b/functions-python/gtfs_datasets_comparer/requirements_dev.txt new file mode 100644 index 000000000..9fbb0d627 --- /dev/null +++ b/functions-python/gtfs_datasets_comparer/requirements_dev.txt @@ -0,0 +1,2 @@ +pytest~=7.4.3 +requests-mock diff --git a/functions-python/gtfs_datasets_comparer/src/main.py b/functions-python/gtfs_datasets_comparer/src/main.py new file mode 100644 index 000000000..4cac88714 --- /dev/null +++ b/functions-python/gtfs_datasets_comparer/src/main.py @@ -0,0 +1,327 @@ +# +# MobilityData 2025 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# This module provides the GtfsDatasetsComparer Cloud Function. It orchestrates change tracking +# between two consecutive GTFS datasets: reads the pre-extracted GTFS files from GCS (uploaded +# by batch_process_dataset at //extracted/), computes a +# structured diff using gtfs-diff-engine, uploads the changelog JSON to GCS, and persists the +# record in the gtfs_dataset_changelog database table. +import logging +import os + +import flask +import functions_framework +from google.cloud import storage +from gtfs_diff.engine import diff_feeds +from sqlalchemy.dialects.postgresql import insert +from sqlalchemy.orm import Session +from sqlalchemy.sql import func + +from shared.common.gcp_memory_utils import limit_gcp_memory +from shared.database.database import with_db_session +from shared.database_gen.sqlacodegen_models import ( + GtfsDatasetChangelog, + Gtfsdataset, +) +from shared.helpers.logger import get_logger, init_logger +from shared.helpers.runtime_metrics import track_metrics + +init_logger() +limit_gcp_memory(os.getenv("GTFS_DIFF_DUCKDB_TMPDIR", "/tmp/in-memory")) + + +@functions_framework.http +def gtfs_datasets_comparer(request: flask.Request) -> dict: + """ + HTTP entrypoint for the GTFS datasets comparer function. + + Expects a JSON body with: + feed_stable_id – stable_id of the GTFS feed + base_dataset_stable_id – stable_id of the base (previous) Gtfsdataset + new_dataset_stable_id – stable_id of the new (current) Gtfsdataset + disallow_overwrite – (optional, default false) skip if changelog already exists + dry_run – (optional, default false) compute diff but skip GCS upload and DB write + + Always returns HTTP 200 — errors are reported in the response body. + This prevents GCP from retrying failures: we cannot distinguish transient from + permanent errors (e.g. a DB blip vs the DB being down), so retrying would only + waste resources. The idempotency check in run() makes explicit reruns safe. + """ + payload = request.get_json(silent=True) or {} + feed_stable_id = payload.get("feed_stable_id") + base_dataset_stable_id = payload.get("base_dataset_stable_id") + new_dataset_stable_id = payload.get("new_dataset_stable_id") + disallow_overwrite = bool(payload.get("disallow_overwrite", False)) + dry_run = bool(payload.get("dry_run", False)) + + if not (feed_stable_id and base_dataset_stable_id and new_dataset_stable_id): + return flask.make_response( + { + "status": "error", + "error": "feed_stable_id, base_dataset_stable_id, and new_dataset_stable_id are required.", + }, + 200, + ) + + bucket_name = os.getenv("DATASETS_BUCKET_NAME") + if not bucket_name: + return flask.make_response( + { + "status": "error", + "error": "DATASETS_BUCKET_NAME environment variable is not set.", + }, + 200, + ) + + bucket_mount = os.getenv("DATASETS_BUCKET_MOUNT", "/tmp/mobilitydata-datasets") + try: + tracker = GtfsDatasetsComparer( + feed_stable_id=feed_stable_id, + base_dataset_stable_id=base_dataset_stable_id, + new_dataset_stable_id=new_dataset_stable_id, + bucket_name=bucket_name, + bucket_mount=bucket_mount, + disallow_overwrite=disallow_overwrite, + dry_run=dry_run, + ) + result = tracker.run() + return flask.make_response({"status": "success", **result}, 200) + except Exception as e: + # We cannot reliably distinguish transient from permanent errors, so we always + # return HTTP 200 to suppress GCP retries. If a specific exception type is + # identified as safely retriable in the future, catch it here and return 500. + logging.exception( + "Failed to generate changelog for feed=%s base=%s new=%s", + feed_stable_id, + base_dataset_stable_id, + new_dataset_stable_id, + ) + return flask.make_response( + { + "status": "error", + "error": f"Failed to generate changelog: {e}", + "payload": payload, + }, + 200, + ) + + +class GtfsDatasetsComparer: + """ + Orchestrates GTFS change tracking between two consecutive datasets. + + Steps: + 1. Resolve both datasets from the database using their stable_ids. + 2. Locate the pre-extracted GTFS files on the mounted GCS bucket filesystem + (///extracted/). + 3. Compute a structured diff using gtfs-diff-engine. + 4. Upload the changelog JSON to GCS. + 5. Upsert a row in gtfs_dataset_changelog. + """ + + def __init__( + self, + feed_stable_id: str, + base_dataset_stable_id: str, + new_dataset_stable_id: str, + bucket_name: str, + bucket_mount: str, + disallow_overwrite: bool = False, + dry_run: bool = False, + ): + self.feed_stable_id = feed_stable_id + self.base_dataset_stable_id = base_dataset_stable_id + self.new_dataset_stable_id = new_dataset_stable_id + self.bucket_name = bucket_name + self.bucket_mount = bucket_mount + self.disallow_overwrite = disallow_overwrite + self.dry_run = dry_run + self.logger = get_logger(GtfsDatasetsComparer.__name__, new_dataset_stable_id) + + @track_metrics(metrics=("time", "memory", "cpu")) + def run(self) -> dict: + """Execute the full change-tracking pipeline.""" + changelog_blob_path = ( + f"{self.feed_stable_id}/{self.new_dataset_stable_id}/" + f"{self.new_dataset_stable_id}_{self.base_dataset_stable_id}_changelog.json" + ) + blob = storage.Client().bucket(self.bucket_name).blob(changelog_blob_path) + + # Idempotency: skip if changelog already exists and disallow_overwrite is set. + if self.disallow_overwrite and blob.exists(): + changelog_url = f"https://storage.googleapis.com/{self.bucket_name}/{changelog_blob_path}" + self.logger.info("Changelog already exists, skipping: %s", changelog_url) + return { + "message": "Changelog already exists.", + "changelog_url": changelog_url, + } + + prev_dataset_uuid, curr_dataset_uuid, feed_uuid = self._resolve_datasets() + + self.logger.info( + "Computing diff for feed %s: %s -> %s", + self.feed_stable_id, + self.base_dataset_stable_id, + self.new_dataset_stable_id, + ) + + prev_dir = self._extracted_dir(self.feed_stable_id, self.base_dataset_stable_id) + curr_dir = self._extracted_dir(self.feed_stable_id, self.new_dataset_stable_id) + + diff_result = diff_feeds(prev_dir, curr_dir) + + if self.dry_run: + self.logger.info("Dry run — skipping GCS upload and DB write.") + return { + "message": "Dry run completed. Diff computed but not persisted.", + "summary": diff_result.summary.model_dump(), + } + + changelog_json = diff_result.model_dump_json(indent=2).encode("utf-8") + changelog_url = self._upload_changelog( + changelog_json, + self.feed_stable_id, + self.base_dataset_stable_id, + self.new_dataset_stable_id, + ) + + diff_summary = diff_result.summary.model_dump() + self._save_changelog_record( + feed_uuid=feed_uuid, + prev_dataset_uuid=prev_dataset_uuid, + curr_dataset_uuid=curr_dataset_uuid, + changelog_url=changelog_url, + diff_summary=diff_summary, + ) + + self.logger.info("Changelog stored at %s", changelog_url) + return { + "message": "Changelog generated successfully.", + "changelog_url": changelog_url, + } + + @with_db_session + def _resolve_datasets(self, db_session: Session = None) -> tuple: + """ + Validate both datasets exist and belong to the given feed. + Returns (prev_dataset_uuid, curr_dataset_uuid, feed_uuid) as plain strings. + """ + prev_dataset = ( + db_session.query(Gtfsdataset) + .filter(Gtfsdataset.stable_id == self.base_dataset_stable_id) + .one_or_none() + ) + if prev_dataset is None: + raise ValueError( + f"Previous dataset not found: {self.base_dataset_stable_id}" + ) + if prev_dataset.feed.stable_id != self.feed_stable_id: + raise ValueError( + f"Dataset {self.base_dataset_stable_id} does not belong to feed {self.feed_stable_id}." + ) + + curr_dataset = ( + db_session.query(Gtfsdataset) + .filter(Gtfsdataset.stable_id == self.new_dataset_stable_id) + .one_or_none() + ) + if curr_dataset is None: + raise ValueError(f"Current dataset not found: {self.new_dataset_stable_id}") + + if curr_dataset.feed.stable_id != self.feed_stable_id: + raise ValueError( + f"Dataset {self.new_dataset_stable_id} does not belong to feed {self.feed_stable_id}." + ) + + return prev_dataset.id, curr_dataset.id, curr_dataset.feed.id + + def _extracted_dir(self, feed_stable_id: str, dataset_stable_id: str) -> str: + """ + Return the path to the pre-extracted GTFS files on the mounted bucket filesystem. + + batch_process_dataset uploads each file to: + //extracted/ + which appears on the mount at: + ///extracted/ + """ + path = os.path.join( + self.bucket_mount, feed_stable_id, dataset_stable_id, "extracted" + ) + if not os.path.isdir(path): + raise ValueError(f"Extracted files not found on mounted bucket at {path}") + self.logger.debug("Using extracted dir from mounted bucket: %s", path) + return path + + def _upload_changelog( + self, + json_bytes: bytes, + feed_stable_id: str, + prev_dataset_id: str, + curr_dataset_id: str, + ) -> str: + """ + Upload the changelog JSON to GCS at: + //__changelog.json + + Returns the GCS public URL. + """ + blob_path = f"{feed_stable_id}/{curr_dataset_id}/{curr_dataset_id}_{prev_dataset_id}_changelog.json" + bucket = storage.Client().bucket(self.bucket_name) + blob = bucket.blob(blob_path) + blob.upload_from_string(json_bytes, content_type="application/json") + blob.make_public() + self.logger.info( + "Uploaded changelog to gs://%s/%s", self.bucket_name, blob_path + ) + return f"https://storage.googleapis.com/{self.bucket_name}/{blob_path}" + + @with_db_session + def _save_changelog_record( + self, + feed_uuid: str, + prev_dataset_uuid: str, + curr_dataset_uuid: str, + changelog_url: str, + diff_summary: dict, + db_session: Session = None, + ) -> None: + """ + Upsert a row into gtfs_dataset_changelog. + The UNIQUE constraint on (previous_dataset_id, current_dataset_id) ensures idempotency. + """ + stmt = ( + insert(GtfsDatasetChangelog) + .values( + feed_id=feed_uuid, + previous_dataset_id=prev_dataset_uuid, + current_dataset_id=curr_dataset_uuid, + changelog_url=changelog_url, + diff_summary=diff_summary, + ) + .on_conflict_do_update( + constraint="gtfs_dataset_changelog_previous_current_key", + set_={ + "changelog_url": changelog_url, + "diff_summary": diff_summary, + "generated_at": func.now(), + }, + ) + ) + db_session.execute(stmt) + db_session.commit() + self.logger.info( + "Saved changelog record for %s -> %s", + self.base_dataset_stable_id, + self.new_dataset_stable_id, + ) diff --git a/functions-python/gtfs_datasets_comparer/tests/__init__.py b/functions-python/gtfs_datasets_comparer/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/functions-python/gtfs_datasets_comparer/tests/test_main.py b/functions-python/gtfs_datasets_comparer/tests/test_main.py new file mode 100644 index 000000000..69d8a2721 --- /dev/null +++ b/functions-python/gtfs_datasets_comparer/tests/test_main.py @@ -0,0 +1,462 @@ +# +# MobilityData 2025 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import tempfile +import unittest +from unittest.mock import MagicMock, patch + +import flask + +from main import GtfsDatasetsComparer, gtfs_datasets_comparer + + +def _make_dataset( + id_, stable_id, hosted_url, feed_stable_id="mdb-1", feed_id="feed-uuid" +): + dataset = MagicMock() + dataset.id = id_ + dataset.stable_id = stable_id + dataset.hosted_url = hosted_url + dataset.feed = MagicMock() + dataset.feed.id = feed_id + dataset.feed.stable_id = feed_stable_id + return dataset + + +class TestGtfsDatasetsComparerHandler(unittest.TestCase): + """Tests for the HTTP handler function.""" + + app = flask.Flask(__name__) + + def _request(self, payload): + with self.app.test_request_context(json=payload): + response = gtfs_datasets_comparer(flask.request) + with self.app.app_context(): + return response.status_code, response.get_json() + + def test_missing_all_params(self): + status, body = self._request({}) + self.assertEqual(status, 200) + self.assertEqual(body["status"], "error") + self.assertIn("required", body["error"]) + + def test_missing_one_param(self): + status, body = self._request( + {"feed_stable_id": "mdb-1", "base_dataset_stable_id": "mdb-1-20240101"} + ) + self.assertEqual(status, 200) + self.assertEqual(body["status"], "error") + + def test_missing_bucket_env(self): + os.environ.pop("DATASETS_BUCKET_NAME", None) + status, body = self._request( + { + "feed_stable_id": "mdb-1", + "base_dataset_stable_id": "mdb-1-20240101", + "new_dataset_stable_id": "mdb-1-20240201", + } + ) + self.assertEqual(status, 200) + self.assertEqual(body["status"], "error") + self.assertIn("DATASETS_BUCKET_NAME", body["error"]) + + @patch("main.GtfsDatasetsComparer") + def test_success(self, mock_tracker_cls): + os.environ["DATASETS_BUCKET_NAME"] = "test-bucket" + os.environ["DATASETS_BUCKET_MOUNT"] = "/mobilitydata-datasets" + instance = MagicMock() + instance.run.return_value = { + "message": "Changelog generated successfully.", + "changelog_url": "https://storage.googleapis.com/test-bucket/mdb-1/c1/c1_p1_changelog.json", + } + mock_tracker_cls.return_value = instance + + status, body = self._request( + { + "feed_stable_id": "mdb-1", + "base_dataset_stable_id": "mdb-1-20240101", + "new_dataset_stable_id": "mdb-1-20240201", + } + ) + self.assertEqual(status, 200) + self.assertEqual(body["status"], "success") + self.assertIn("changelog_url", body) + mock_tracker_cls.assert_called_once_with( + feed_stable_id="mdb-1", + base_dataset_stable_id="mdb-1-20240101", + new_dataset_stable_id="mdb-1-20240201", + bucket_name="test-bucket", + bucket_mount="/mobilitydata-datasets", + disallow_overwrite=False, + dry_run=False, + ) + + @patch("main.GtfsDatasetsComparer") + def test_disallow_overwrite_and_dry_run_passed(self, mock_tracker_cls): + os.environ["DATASETS_BUCKET_NAME"] = "test-bucket" + os.environ["DATASETS_BUCKET_MOUNT"] = "/mobilitydata-datasets" + instance = MagicMock() + instance.run.return_value = {"message": "Dry run completed.", "summary": {}} + mock_tracker_cls.return_value = instance + + self._request( + { + "feed_stable_id": "mdb-1", + "base_dataset_stable_id": "mdb-1-20240101", + "new_dataset_stable_id": "mdb-1-20240201", + "disallow_overwrite": True, + "dry_run": True, + } + ) + mock_tracker_cls.assert_called_once_with( + feed_stable_id="mdb-1", + base_dataset_stable_id="mdb-1-20240101", + new_dataset_stable_id="mdb-1-20240201", + bucket_name="test-bucket", + bucket_mount="/mobilitydata-datasets", + disallow_overwrite=True, + dry_run=True, + ) + + @patch("main.GtfsDatasetsComparer") + def test_exception_returns_200_with_error(self, mock_tracker_cls): + """All exceptions return HTTP 200 to suppress GCP retries.""" + os.environ["DATASETS_BUCKET_NAME"] = "test-bucket" + instance = MagicMock() + instance.run.side_effect = Exception("something went wrong") + mock_tracker_cls.return_value = instance + + status, body = self._request( + { + "feed_stable_id": "mdb-1", + "base_dataset_stable_id": "mdb-1-20240101", + "new_dataset_stable_id": "mdb-1-20240201", + } + ) + self.assertEqual(status, 200) + self.assertEqual(body["status"], "error") + self.assertIn("something went wrong", body["error"]) + + +class TestGtfsDatasetsComparerRun(unittest.TestCase): + """Tests for GtfsDatasetsComparer.run() with mocked collaborators.""" + + def setUp(self): + os.environ["DATASETS_BUCKET_NAME"] = "test-bucket" + self.tracker = GtfsDatasetsComparer( + feed_stable_id="mdb-1", + base_dataset_stable_id="mdb-1-20240101", + new_dataset_stable_id="mdb-1-20240201", + bucket_name="test-bucket", + bucket_mount="/mobilitydata-datasets", + ) + + @patch("main.storage.Client") + @patch("main.GtfsDatasetsComparer._save_changelog_record") + @patch("main.GtfsDatasetsComparer._upload_changelog") + @patch("main.GtfsDatasetsComparer._extracted_dir") + @patch("main.GtfsDatasetsComparer._resolve_datasets") + def test_run_happy_path( + self, mock_resolve, mock_extracted_dir, mock_upload, mock_save, mock_storage + ): + mock_storage.return_value.bucket.return_value.blob.return_value.exists.return_value = ( + False + ) + mock_resolve.return_value = ("prev-uuid", "curr-uuid", "feed-uuid") + mock_extracted_dir.side_effect = [ + "/mobilitydata-datasets/mdb-1/mdb-1-20240101/extracted", + "/mobilitydata-datasets/mdb-1/mdb-1-20240201/extracted", + ] + + fake_summary = MagicMock() + fake_summary.model_dump.return_value = {"total_changes": 42} + fake_diff = MagicMock() + fake_diff.summary = fake_summary + fake_diff.model_dump_json.return_value = ( + '{"metadata": {}, "summary": {}, "file_diffs": []}' + ) + + changelog_url = ( + "https://storage.googleapis.com/test-bucket/mdb-1/" + "mdb-1-20240201/mdb-1-20240201_mdb-1-20240101_changelog.json" + ) + mock_upload.return_value = changelog_url + + with patch("main.diff_feeds", return_value=fake_diff, create=True) as mock_diff: + result = self.tracker.run() + + mock_resolve.assert_called_once() + self.assertEqual(mock_extracted_dir.call_count, 2) + extracted_calls = mock_extracted_dir.call_args_list + self.assertEqual(extracted_calls[0].args, ("mdb-1", "mdb-1-20240101")) + self.assertEqual(extracted_calls[1].args, ("mdb-1", "mdb-1-20240201")) + mock_diff.assert_called_once() + mock_upload.assert_called_once_with( + fake_diff.model_dump_json.return_value.encode("utf-8"), + "mdb-1", + "mdb-1-20240101", + "mdb-1-20240201", + ) + mock_save.assert_called_once_with( + feed_uuid="feed-uuid", + prev_dataset_uuid="prev-uuid", + curr_dataset_uuid="curr-uuid", + changelog_url=changelog_url, + diff_summary={"total_changes": 42}, + ) + self.assertEqual(result["changelog_url"], changelog_url) + + @patch("main.storage.Client") + @patch("main.GtfsDatasetsComparer._resolve_datasets") + def test_run_raises_when_resolve_fails(self, mock_resolve, mock_storage): + mock_storage.return_value.bucket.return_value.blob.return_value.exists.return_value = ( + False + ) + mock_resolve.side_effect = ValueError( + "Previous dataset not found: mdb-1-20240101" + ) + with self.assertRaises(ValueError): + self.tracker.run() + + @patch("main.storage.Client") + def test_run_skips_when_changelog_exists(self, mock_storage): + mock_storage.return_value.bucket.return_value.blob.return_value.exists.return_value = ( + True + ) + tracker = GtfsDatasetsComparer( + feed_stable_id="mdb-1", + base_dataset_stable_id="mdb-1-20240101", + new_dataset_stable_id="mdb-1-20240201", + bucket_name="test-bucket", + bucket_mount="/mobilitydata-datasets", + disallow_overwrite=True, + ) + result = tracker.run() + self.assertIn("already exists", result["message"]) + self.assertIn("changelog_url", result) + + @patch("main.storage.Client") + def test_disallow_overwrite_skips_when_exists(self, mock_storage): + """disallow_overwrite=True should skip when the blob already exists.""" + mock_storage.return_value.bucket.return_value.blob.return_value.exists.return_value = ( + True + ) + tracker = GtfsDatasetsComparer( + feed_stable_id="mdb-1", + base_dataset_stable_id="mdb-1-20240101", + new_dataset_stable_id="mdb-1-20240201", + bucket_name="test-bucket", + bucket_mount="/mobilitydata-datasets", + disallow_overwrite=True, + ) + result = tracker.run() + self.assertIn("already exists", result["message"]) + + @patch("main.storage.Client") + @patch("main.GtfsDatasetsComparer._save_changelog_record") + @patch("main.GtfsDatasetsComparer._upload_changelog") + @patch("main.GtfsDatasetsComparer._extracted_dir") + @patch("main.GtfsDatasetsComparer._resolve_datasets") + def test_overwrite_proceeds_when_exists_by_default( + self, mock_resolve, mock_extracted_dir, mock_upload, mock_save, mock_storage + ): + """By default (disallow_overwrite=False) the changelog is overwritten even if it exists.""" + mock_storage.return_value.bucket.return_value.blob.return_value.exists.return_value = ( + True + ) + mock_resolve.return_value = ("prev-uuid", "curr-uuid", "feed-uuid") + mock_extracted_dir.side_effect = [ + "/mobilitydata-datasets/mdb-1/mdb-1-20240101/extracted", + "/mobilitydata-datasets/mdb-1/mdb-1-20240201/extracted", + ] + fake_diff = MagicMock() + fake_diff.summary.model_dump.return_value = {} + fake_diff.model_dump_json.return_value = "{}" + mock_upload.return_value = ( + "https://storage.googleapis.com/test-bucket/changelog.json" + ) + with patch("main.diff_feeds", return_value=fake_diff, create=True): + result = self.tracker.run() + self.assertIn("generated successfully", result["message"]) + mock_upload.assert_called_once() + + @patch("main.storage.Client") + @patch("main.GtfsDatasetsComparer._upload_changelog") + @patch("main.GtfsDatasetsComparer._extracted_dir") + @patch("main.GtfsDatasetsComparer._resolve_datasets") + def test_dry_run_skips_upload_and_db( + self, mock_resolve, mock_extracted_dir, mock_upload, mock_storage + ): + """dry_run=True should compute the diff but not upload or write to DB.""" + mock_storage.return_value.bucket.return_value.blob.return_value.exists.return_value = ( + False + ) + mock_resolve.return_value = ("prev-uuid", "curr-uuid", "feed-uuid") + mock_extracted_dir.side_effect = [ + "/mobilitydata-datasets/mdb-1/mdb-1-20240101/extracted", + "/mobilitydata-datasets/mdb-1/mdb-1-20240201/extracted", + ] + fake_diff = MagicMock() + fake_diff.summary.model_dump.return_value = {"total_changes": 5} + + tracker = GtfsDatasetsComparer( + feed_stable_id="mdb-1", + base_dataset_stable_id="mdb-1-20240101", + new_dataset_stable_id="mdb-1-20240201", + bucket_name="test-bucket", + bucket_mount="/mobilitydata-datasets", + dry_run=True, + ) + with patch("main.diff_feeds", return_value=fake_diff, create=True): + result = tracker.run() + + self.assertIn("Dry run", result["message"]) + self.assertIn("summary", result) + mock_upload.assert_not_called() + + +class TestResolveDatasets(unittest.TestCase): + """Tests for _resolve_datasets — verifies plain string UUIDs are returned.""" + + def setUp(self): + self.tracker = GtfsDatasetsComparer( + feed_stable_id="mdb-1", + base_dataset_stable_id="mdb-1-20240101", + new_dataset_stable_id="mdb-1-20240201", + bucket_name="my-bucket", + bucket_mount="/mobilitydata-datasets", + ) + + def _mock_session_with_datasets(self): + prev_ds = MagicMock() + prev_ds.id = "prev-uuid" + prev_ds.stable_id = "mdb-1-20240101" + prev_ds.feed.id = "feed-uuid" + prev_ds.feed.stable_id = "mdb-1" + + curr_ds = MagicMock() + curr_ds.id = "curr-uuid" + curr_ds.stable_id = "mdb-1-20240201" + curr_ds.feed.id = "feed-uuid" + curr_ds.feed.stable_id = "mdb-1" + + mock_session = MagicMock() + mock_session.query.return_value.filter.return_value.one_or_none.side_effect = [ + prev_ds, + curr_ds, + ] + return mock_session + + @patch("main.with_db_session", lambda f: f) + def test_returns_plain_string_uuids(self): + """_resolve_datasets must return str UUIDs, not ORM objects. + If ORM objects were returned, accessing lazy attributes after session + close would raise DetachedInstanceError in production.""" + mock_session = self._mock_session_with_datasets() + result = self.tracker._resolve_datasets(db_session=mock_session) + prev_uuid, curr_uuid, feed_uuid = result + self.assertIsInstance(prev_uuid, str, "prev_uuid must be a plain string") + self.assertIsInstance(curr_uuid, str, "curr_uuid must be a plain string") + self.assertIsInstance(feed_uuid, str, "feed_uuid must be a plain string") + self.assertEqual(prev_uuid, "prev-uuid") + self.assertEqual(curr_uuid, "curr-uuid") + self.assertEqual(feed_uuid, "feed-uuid") + + @patch("main.with_db_session", lambda f: f) + def test_raises_when_prev_feed_mismatch(self): + prev_ds = MagicMock() + prev_ds.id = "prev-uuid" + prev_ds.feed.stable_id = "mdb-999" # wrong feed + + mock_session = MagicMock() + mock_session.query.return_value.filter.return_value.one_or_none.side_effect = [ + prev_ds, + ] + with self.assertRaises( + ValueError, msg="should reject prev dataset from wrong feed" + ): + self.tracker._resolve_datasets(db_session=mock_session) + + @patch("main.with_db_session", lambda f: f) + def test_raises_when_feed_mismatch(self): + prev_ds = MagicMock() + prev_ds.id = "prev-uuid" + curr_ds = MagicMock() + curr_ds.id = "curr-uuid" + curr_ds.feed.stable_id = "mdb-999" # wrong feed + + mock_session = MagicMock() + mock_session.query.return_value.filter.return_value.one_or_none.side_effect = [ + prev_ds, + curr_ds, + ] + with self.assertRaises(ValueError, msg="should reject dataset from wrong feed"): + self.tracker._resolve_datasets(db_session=mock_session) + + +class TestExtractedDir(unittest.TestCase): + def setUp(self): + self.tracker = GtfsDatasetsComparer( + feed_stable_id="mdb-1", + base_dataset_stable_id="mdb-1-20240101", + new_dataset_stable_id="mdb-1-20240201", + bucket_name="my-bucket", + bucket_mount="/mobilitydata-datasets", + ) + + def test_returns_correct_path_when_dir_exists(self): + with tempfile.TemporaryDirectory() as mount: + extracted = os.path.join(mount, "mdb-1", "mdb-1-20240101", "extracted") + os.makedirs(extracted) + self.tracker.bucket_mount = mount + result = self.tracker._extracted_dir("mdb-1", "mdb-1-20240101") + self.assertEqual(result, extracted) + + def test_raises_when_dir_not_found(self): + with self.assertRaises(ValueError, msg="Extracted files not found"): + self.tracker._extracted_dir("mdb-1", "mdb-1-20240101") + + +class TestUploadChangelog(unittest.TestCase): + def setUp(self): + self.tracker = GtfsDatasetsComparer( + feed_stable_id="mdb-1", + base_dataset_stable_id="mdb-1-20240101", + new_dataset_stable_id="mdb-1-20240201", + bucket_name="my-bucket", + bucket_mount="/mobilitydata-datasets", + ) + + @patch("main.storage.Client") + def test_uploads_one_blob_and_returns_url(self, mock_storage_cls): + mock_bucket = MagicMock() + mock_blob = MagicMock() + mock_storage_cls.return_value.bucket.return_value = mock_bucket + mock_bucket.blob.return_value = mock_blob + + url = self.tracker._upload_changelog( + b'{"data": 1}', "mdb-1", "mdb-1-20240101", "mdb-1-20240201" + ) + + mock_bucket.blob.assert_called_once_with( + "mdb-1/mdb-1-20240201/mdb-1-20240201_mdb-1-20240101_changelog.json" + ) + mock_blob.upload_from_string.assert_called_once_with( + b'{"data": 1}', content_type="application/json" + ) + self.assertEqual( + url, + "https://storage.googleapis.com/my-bucket/" + "mdb-1/mdb-1-20240201/mdb-1-20240201_mdb-1-20240101_changelog.json", + ) diff --git a/functions-python/helpers/utils.py b/functions-python/helpers/utils.py index bb66b4049..bb7914bfc 100644 --- a/functions-python/helpers/utils.py +++ b/functions-python/helpers/utils.py @@ -535,6 +535,41 @@ def create_http_pmtiles_builder_task( ) +def create_http_gtfs_datasets_comparer_task( + feed_stable_id: str, + base_dataset_stable_id: str, + new_dataset_stable_id: str, +) -> None: + """ + Create a Cloud Task to run the gtfs-datasets-comparer function for a pair of datasets. + """ + from google.cloud import tasks_v2 + import json + + client = tasks_v2.CloudTasksClient() + body = json.dumps( + { + "feed_stable_id": feed_stable_id, + "base_dataset_stable_id": base_dataset_stable_id, + "new_dataset_stable_id": new_dataset_stable_id, + "disallow_overwrite": True, + } + ).encode() + queue_name = os.getenv("GTFS_CHANGE_TRACKER_QUEUE") + project_id = os.getenv("PROJECT_ID") + gcp_region = os.getenv("GCP_REGION") + gcp_env = os.getenv("ENVIRONMENT") + + create_http_task( + client, + body, + f"https://{gcp_region}-{project_id}.cloudfunctions.net/gtfs-datasets-comparer-{gcp_env}", + project_id, + gcp_region, + queue_name, + ) + + def get_execution_id(json_payload: dict, stable_id: Optional[str]) -> str: """ Extracts the execution_id from the JSON payload. diff --git a/infra/batch/main.tf b/infra/batch/main.tf index 3c6d8083a..e559facb3 100644 --- a/infra/batch/main.tf +++ b/infra/batch/main.tf @@ -44,6 +44,8 @@ locals { deployment_timestamp = formatdate("YYYYMMDDhhmmss", timestamp()) function_pmtiles_builder_config = jsondecode(file("${path.module}/../../functions-python/pmtiles_builder/function_config.json")) + + function_gtfs_datasets_comparer_config = jsondecode(file("${path.module}/../../functions-python/gtfs_datasets_comparer/function_config.json")) } data "google_vpc_access_connector" "vpc_connector" { @@ -272,6 +274,26 @@ resource "google_cloud_tasks_queue" "pmtiles_builder_task_queue" { } } +# Task queue to invoke gtfs_datasets_comparer function +resource "google_cloud_tasks_queue" "gtfs_datasets_comparer_task_queue" { + project = var.project_id + location = var.gcp_region + name = "gtfs-datasets-comparer-queue-${var.environment}-${local.deployment_timestamp}" + + rate_limits { + max_concurrent_dispatches = 10 + max_dispatches_per_second = 1 + } + + retry_config { + # Retries span ~10 minutes: initial try + 2 retries at 120s then 240s + max_attempts = 3 + min_backoff = "120s" + max_backoff = "240s" + max_doublings = 1 + } +} + # Batch process dataset function resource "google_cloudfunctions2_function" "pubsub_function" { @@ -310,6 +332,7 @@ resource "google_cloudfunctions2_function" "pubsub_function" { MATERIALIZED_VIEW_QUEUE = google_cloud_tasks_queue.refresh_materialized_view_task_queue.name PMTILES_BUILDER_QUEUE = google_cloud_tasks_queue.pmtiles_builder_task_queue.name REVERSE_GEOLOCATION_QUEUE = "reverse-geolocation-processor-task-queue" + GTFS_CHANGE_TRACKER_QUEUE = google_cloud_tasks_queue.gtfs_datasets_comparer_task_queue.name WEB_REVALIDATION_QUEUE = google_cloud_tasks_queue.web_revalidation_task_queue.name } dynamic "secret_environment_variables" { @@ -468,4 +491,12 @@ resource "google_cloud_run_service_iam_member" "pmtiles_builder_invoker" { service = "${local.function_pmtiles_builder_config.name}-${var.environment}" role = "roles/run.invoker" member = "serviceAccount:${google_service_account.functions_service_account.email}" +} + +resource "google_cloud_run_service_iam_member" "gtfs_datasets_comparer_invoker" { + project = var.project_id + location = var.gcp_region + service = "${local.function_gtfs_datasets_comparer_config.name}-${var.environment}" + role = "roles/run.invoker" + member = "serviceAccount:${google_service_account.functions_service_account.email}" } \ No newline at end of file diff --git a/infra/functions-python/main.tf b/infra/functions-python/main.tf index 07a4a757f..f6fe26c51 100644 --- a/infra/functions-python/main.tf +++ b/infra/functions-python/main.tf @@ -65,6 +65,9 @@ locals { function_pmtiles_builder_config = jsondecode(file("${path.module}/../../functions-python/pmtiles_builder/function_config.json")) function_pmtiles_builder_zip = "${path.module}/../../functions-python/pmtiles_builder/.dist/pmtiles_builder.zip" + + function_gtfs_datasets_comparer_config = jsondecode(file("${path.module}/../../functions-python/gtfs_datasets_comparer/function_config.json")) + function_gtfs_datasets_comparer_zip = "${path.module}/../../functions-python/gtfs_datasets_comparer/.dist/gtfs_datasets_comparer.zip" } locals { @@ -76,7 +79,8 @@ locals { local.function_update_feed_status_config.secret_environment_variables, local.function_export_csv_config.secret_environment_variables, local.function_tasks_executor_config.secret_environment_variables, - local.function_pmtiles_builder_config.secret_environment_variables + local.function_pmtiles_builder_config.secret_environment_variables, + local.function_gtfs_datasets_comparer_config.secret_environment_variables ) # Remove duplicates by key, keeping the first occurrence @@ -222,6 +226,13 @@ resource "google_storage_bucket_object" "pmtiles_builder_zip" { source = local.function_pmtiles_builder_zip } +# 16. GTFS Change Tracker +resource "google_storage_bucket_object" "gtfs_datasets_comparer_zip" { + bucket = google_storage_bucket.functions_bucket.name + name = "gtfs-datasets-comparer-${substr(filebase64sha256(local.function_gtfs_datasets_comparer_zip), 0, 10)}.zip" + source = local.function_gtfs_datasets_comparer_zip +} + # Web app revalidation secret resource "google_secret_manager_secret" "web_app_revalidate_secret" { project = var.project_id @@ -1034,6 +1045,24 @@ resource "google_cloudfunctions2_function_iam_member" "pmtiles_builder_invoker" member = "serviceAccount:${google_service_account.functions_service_account.email}" } +# Grant execution permission to batchfunctions service account to the gtfs_datasets_comparer function +resource "google_cloudfunctions2_function_iam_member" "gtfs_datasets_comparer_invoker_batch_sa" { + project = var.project_id + location = var.gcp_region + cloud_function = google_cloudfunctions2_function.gtfs_datasets_comparer.name + role = "roles/cloudfunctions.invoker" + member = "serviceAccount:${local.batchfunctions_sa_email}" +} + +# Grant execution permission to the functions service account to the gtfs_datasets_comparer function +resource "google_cloudfunctions2_function_iam_member" "gtfs_datasets_comparer_invoker" { + project = var.project_id + location = var.gcp_region + cloud_function = google_cloudfunctions2_function.gtfs_datasets_comparer.name + role = "roles/cloudfunctions.invoker" + member = "serviceAccount:${google_service_account.functions_service_account.email}" +} + # 13.3 functions/reverse_geolocation - batch cloud function resource "google_cloudfunctions2_function" "reverse_geolocation_batch" { name = "${local.function_reverse_geolocation_config.name}-batch" @@ -1166,7 +1195,7 @@ resource "google_cloudfunctions2_function" "tasks_executor" { } } -# Grant execution permission to bathcfunctions service account to the tasks_executor function +# Grant execution permission to batchfunctions service account to the tasks_executor function resource "google_cloudfunctions2_function_iam_member" "tasks_executor_invoker" { project = var.project_id location = var.gcp_region @@ -1227,6 +1256,104 @@ resource "google_cloudfunctions2_function" "pmtiles_builder" { } } + +# 16. functions/gtfs_datasets_comparer cloud function +resource "google_cloudfunctions2_function" "gtfs_datasets_comparer" { + name = "${local.function_gtfs_datasets_comparer_config.name}-${var.environment}" + project = var.project_id + description = local.function_gtfs_datasets_comparer_config.description + location = var.gcp_region + depends_on = [google_secret_manager_secret_iam_member.secret_iam_member] + + build_config { + runtime = var.python_runtime + entry_point = local.function_gtfs_datasets_comparer_config.entry_point + source { + storage_source { + bucket = google_storage_bucket.functions_bucket.name + object = google_storage_bucket_object.gtfs_datasets_comparer_zip.name + } + } + } + service_config { + environment_variables = { + ENVIRONMENT = var.environment + PROJECT_ID = var.project_id + GCP_REGION = var.gcp_region + DATASETS_BUCKET_NAME = "${var.datasets_bucket_name}-${var.environment}" + DATASETS_BUCKET_MOUNT = "/mobilitydata-datasets" + # GTFS_DIFF_DUCKDB_TMPDIR: directs DuckDB spill files to the in-memory volume. + GTFS_DIFF_DUCKDB_TMPDIR = "/tmp/in-memory" + } + available_memory = local.function_gtfs_datasets_comparer_config.memory + timeout_seconds = local.function_gtfs_datasets_comparer_config.timeout + available_cpu = local.function_gtfs_datasets_comparer_config.available_cpu + max_instance_request_concurrency = local.function_gtfs_datasets_comparer_config.max_instance_request_concurrency + max_instance_count = local.function_gtfs_datasets_comparer_config.max_instance_count + min_instance_count = local.function_gtfs_datasets_comparer_config.min_instance_count + service_account_email = google_service_account.functions_service_account.email + ingress_settings = local.function_gtfs_datasets_comparer_config.ingress_settings + vpc_connector = data.google_vpc_access_connector.vpc_connector.id + vpc_connector_egress_settings = "PRIVATE_RANGES_ONLY" + + dynamic "secret_environment_variables" { + for_each = local.function_gtfs_datasets_comparer_config.secret_environment_variables + content { + key = secret_environment_variables.value["key"] + project_id = var.project_id + secret = lookup(secret_environment_variables.value, "secret", "${upper(var.environment)}_${secret_environment_variables.value["key"]}") + version = "latest" + } + } + } +} + +# google_cloudfunctions2_function does not expose volume mounts in its schema. +# This terraform_data resource mounts both the datasets GCS bucket and an in-memory tmpfs +# on the underlying Cloud Run service after the function is deployed. +resource "terraform_data" "gtfs_datasets_comparer_gcs_mount" { + triggers_replace = { + function_name = google_cloudfunctions2_function.gtfs_datasets_comparer.name + bucket = "${var.datasets_bucket_name}-${var.environment}" + region = var.gcp_region + project = var.project_id + } + + provisioner "local-exec" { + command = <<-EOT + MOUNTS=$(gcloud run services describe ${google_cloudfunctions2_function.gtfs_datasets_comparer.name} \ + --project ${var.project_id} \ + --region ${var.gcp_region} \ + --format='value(spec.template.spec.volumes[].name)' 2>/dev/null) + + ARGS="" + if echo "$MOUNTS" | grep -q "datasets-bucket"; then + echo "GCS volume already mounted, skipping." + else + ARGS="$ARGS --add-volume name=datasets-bucket,type=cloud-storage,bucket=${var.datasets_bucket_name}-${var.environment},readonly=true" + ARGS="$ARGS --add-volume-mount volume=datasets-bucket,mount-path=/mobilitydata-datasets" + fi + + if echo "$MOUNTS" | grep -q "in-memory"; then + echo "In-memory volume already mounted, skipping." + else + ARGS="$ARGS --add-volume name=in-memory,type=in-memory,size-limit=${var.gtfs_datasets_comparer_in_memory_size}" + ARGS="$ARGS --add-volume-mount volume=in-memory,mount-path=/tmp/in-memory" + fi + + if [ -n "$ARGS" ]; then + gcloud run services update ${google_cloudfunctions2_function.gtfs_datasets_comparer.name} \ + --project ${var.project_id} \ + --region ${var.gcp_region} \ + $ARGS \ + --quiet + fi + EOT + } + + depends_on = [google_cloudfunctions2_function.gtfs_datasets_comparer] +} + # Create the Pub/Sub topic used for publishing messages about rebuilding missing bounding boxes resource "google_pubsub_topic" "rebuild_missing_bounding_boxes" { name = "rebuild-bounding-boxes-topic" diff --git a/infra/functions-python/vars.tf b/infra/functions-python/vars.tf index f70df30c7..156b60abc 100644 --- a/infra/functions-python/vars.tf +++ b/infra/functions-python/vars.tf @@ -134,4 +134,9 @@ variable "brevo_api_announcements_list_id" { type = string description = "Brevo list ID for API announcements" default = "" -} \ No newline at end of file +} +variable "gtfs_datasets_comparer_in_memory_size" { + type = string + description = "Size limit for the gtfs_datasets_comparer in-memory tmpfs volume" + default = "3Gi" +}