diff --git a/src/google/adk/skills/_utils.py b/src/google/adk/skills/_utils.py index cab70a8d4b..e3185135e7 100644 --- a/src/google/adk/skills/_utils.py +++ b/src/google/adk/skills/_utils.py @@ -17,6 +17,7 @@ from __future__ import annotations import logging +import os import pathlib from typing import Union @@ -401,7 +402,7 @@ def _load_skill_from_gcs_dir( f" name '{skill_name_expected}'." ) - def _load_files_in_dir(subdir: str) -> Dict[str, Union[str, bytes]]: + def _load_files_in_dir(subdir: str) -> dict[str, str | bytes]: prefix = f"{skill_dir_prefix}{subdir}/" blobs = bucket.list_blobs(prefix=prefix) result = {} @@ -411,10 +412,26 @@ def _load_files_in_dir(subdir: str) -> Dict[str, Union[str, bytes]]: if not relative_path: continue + # Use PurePosixPath for platform-independent GCS path validation + p = pathlib.PurePosixPath(relative_path) + + # Reject absolute paths and traversal sequences + if p.is_absolute() or ".." in p.parts: + raise ValueError(f"Unsafe path in skill resource: {relative_path!r}") + + normalized = p.as_posix() + + # Prevent silent file overwrites via path aliasing + if normalized in result: + raise ValueError(f"Duplicate normalized path detected: {normalized!r}") + + # NOTE: Final path safety enforced during materialization + # via realpath + commonpath checks in skill_toolset.py try: - result[relative_path] = blob.download_as_text() + result[normalized] = blob.download_as_text() except UnicodeDecodeError: - result[relative_path] = blob.download_as_bytes() + result[normalized] = blob.download_as_bytes() + return result references = _load_files_in_dir("references") diff --git a/src/google/adk/tools/skill_toolset.py b/src/google/adk/tools/skill_toolset.py index 32ab45e7d2..a9d870dbfb 100644 --- a/src/google/adk/tools/skill_toolset.py +++ b/src/google/adk/tools/skill_toolset.py @@ -535,6 +535,7 @@ def _build_wrapper_code( ) # Build the boilerplate extract string + code_lines = [ "import os", "import tempfile", @@ -546,8 +547,14 @@ def _build_wrapper_code( "def _materialize_and_run():", " _orig_cwd = os.getcwd()", " with tempfile.TemporaryDirectory() as td:", + " _real_base = os.path.realpath(td)", " for rel_path, content in _files.items():", - " full_path = os.path.join(td, rel_path)", + " if os.path.isabs(rel_path):", + " raise ValueError(f'Absolute path rejected: {rel_path!r}')", + " _safe = os.path.realpath(os.path.join(td, rel_path))", + " if os.path.commonpath([_real_base, _safe]) != _real_base:", + " raise ValueError(f'Path traversal detected: {rel_path!r}')", + " full_path = _safe", " os.makedirs(os.path.dirname(full_path), exist_ok=True)", " mode = 'wb' if isinstance(content, bytes) else 'w'", " with open(full_path, mode) as f:", diff --git a/tests/unittests/skills/test__utils.py b/tests/unittests/skills/test__utils.py index ae5df00c1c..01b4f39ae3 100644 --- a/tests/unittests/skills/test__utils.py +++ b/tests/unittests/skills/test__utils.py @@ -340,3 +340,148 @@ def test_list_skills_in_dir_missing_base_path(tmp_path): skills = list_skills_in_dir(tmp_path / "nonexistent") assert skills == {} + + +# ── Security: path traversal prevention (VRP #499557362) ── + + +@mock.patch("google.cloud.storage.Client") +def test__load_skill_from_gcs_dir_rejects_dotdot_traversal(mock_client_class): + """Ingestion layer must reject blob names containing .. sequences.""" + mock_client = mock.MagicMock() + mock_client_class.return_value = mock_client + mock_bucket = mock.MagicMock() + mock_client.bucket.return_value = mock_bucket + + def mock_blob_side_effect(path): + m = mock.MagicMock() + if path.endswith("SKILL.md"): + m.exists.return_value = True + m.download_as_text.return_value = ( + "---\nname: my-skill\ndescription: Test\n---\nBody" + ) + else: + m.exists.return_value = False + return m + + mock_bucket.blob.side_effect = mock_blob_side_effect + + def list_blobs_side_effect(prefix=None): + m = mock.MagicMock() + m.name = prefix + "../../../../tmp/malicious.py" + m.download_as_text.return_value = "malicious content" + return [m] + + mock_bucket.list_blobs.side_effect = list_blobs_side_effect + + with pytest.raises(ValueError, match="Unsafe path"): + _load_skill_from_gcs_dir("my-bucket", "my-skill") + + +@mock.patch("google.cloud.storage.Client") +def test__load_skill_from_gcs_dir_rejects_absolute_path(mock_client_class): + """Ingestion layer must reject blob names that resolve to absolute paths.""" + mock_client = mock.MagicMock() + mock_client_class.return_value = mock_client + mock_bucket = mock.MagicMock() + mock_client.bucket.return_value = mock_bucket + + def mock_blob_side_effect(path): + m = mock.MagicMock() + if path.endswith("SKILL.md"): + m.exists.return_value = True + m.download_as_text.return_value = ( + "---\nname: my-skill\ndescription: Test\n---\nBody" + ) + else: + m.exists.return_value = False + return m + + mock_bucket.blob.side_effect = mock_blob_side_effect + + def list_blobs_side_effect(prefix=None): + m = mock.MagicMock() + m.name = prefix + "/etc/passwd" + m.download_as_text.return_value = "root:x:0:0" + return [m] + + mock_bucket.list_blobs.side_effect = list_blobs_side_effect + + with pytest.raises(ValueError, match="Unsafe path"): + _load_skill_from_gcs_dir("my-bucket", "my-skill") + + +@mock.patch("google.cloud.storage.Client") +def test__load_skill_from_gcs_dir_rejects_duplicate_normalized_path( + mock_client_class, +): + """Ingestion layer must reject two blobs that normalize to the same path.""" + mock_client = mock.MagicMock() + mock_client_class.return_value = mock_client + mock_bucket = mock.MagicMock() + mock_client.bucket.return_value = mock_bucket + + def mock_blob_side_effect(path): + m = mock.MagicMock() + if path.endswith("SKILL.md"): + m.exists.return_value = True + m.download_as_text.return_value = ( + "---\nname: my-skill\ndescription: Test\n---\nBody" + ) + else: + m.exists.return_value = False + return m + + mock_bucket.blob.side_effect = mock_blob_side_effect + + def list_blobs_side_effect(prefix=None): + if "references" in prefix: + blob1 = mock.MagicMock() + blob1.name = prefix + "doc.md" + blob1.download_as_text.return_value = "first" + blob2 = mock.MagicMock() + blob2.name = prefix + "doc.md" + blob2.download_as_text.return_value = "second" + return [blob1, blob2] + return [] + + mock_bucket.list_blobs.side_effect = list_blobs_side_effect + + with pytest.raises(ValueError, match="Duplicate normalized path"): + _load_skill_from_gcs_dir("my-bucket", "my-skill") + + +@mock.patch("google.cloud.storage.Client") +def test__load_skill_from_gcs_dir_safe_path_succeeds(mock_client_class): + """Clean blob names must pass ingestion and load correctly (happy path).""" + mock_client = mock.MagicMock() + mock_client_class.return_value = mock_client + mock_bucket = mock.MagicMock() + mock_client.bucket.return_value = mock_bucket + + def mock_blob_side_effect(path): + m = mock.MagicMock() + if path.endswith("SKILL.md"): + m.exists.return_value = True + m.download_as_text.return_value = ( + "---\nname: my-skill\ndescription: Test\n---\nBody" + ) + else: + m.exists.return_value = False + return m + + mock_bucket.blob.side_effect = mock_blob_side_effect + + def list_blobs_side_effect(prefix=None): + if "references" in prefix: + m = mock.MagicMock() + m.name = prefix + "safe_doc.md" + m.download_as_text.return_value = "safe content" + return [m] + return [] + + mock_bucket.list_blobs.side_effect = list_blobs_side_effect + + skill = _load_skill_from_gcs_dir("my-bucket", "my-skill") + assert skill.name == "my-skill" + assert skill.resources.get_reference("safe_doc.md") == "safe content" diff --git a/tests/unittests/tools/test_skill_toolset.py b/tests/unittests/tools/test_skill_toolset.py index 36120009f1..cb4609cc97 100644 --- a/tests/unittests/tools/test_skill_toolset.py +++ b/tests/unittests/tools/test_skill_toolset.py @@ -1727,3 +1727,95 @@ async def test_skill_toolset_resolution_error_handling(mock_skill1, caplog): # Should still return basic skill tools assert len(tools) == 4 + + +# ── Security: materialization-layer path traversal (VRP #499557362) ── + + +def test_materialization_rejects_dotdot_traversal(): + """Materialization guard must raise on .. traversal in _files dict.""" + malicious_files = {"../../tmp/pwned.txt": "pwned"} + code_lines = [ + "import os", + "import tempfile", + f"_files = {malicious_files!r}", + "def _materialize_and_run():", + " with tempfile.TemporaryDirectory() as td:", + " _real_base = os.path.realpath(td)", + " for rel_path, content in _files.items():", + " if os.path.isabs(rel_path):", + " raise ValueError(f'Absolute path rejected: {rel_path!r}')", + " _safe = os.path.realpath(os.path.join(td, rel_path))", + " if os.path.commonpath([_real_base, _safe]) != _real_base:", + " raise ValueError(f'Path traversal detected: {rel_path!r}')", + " full_path = _safe", + " os.makedirs(os.path.dirname(full_path), exist_ok=True)", + " mode = 'wb' if isinstance(content, bytes) else 'w'", + " with open(full_path, mode) as f:", + " f.write(content)", + "_materialize_and_run()", + ] + code = "\n".join(code_lines) + ns = {"__builtins__": __builtins__} + with pytest.raises(ValueError, match="Path traversal detected"): + exec(compile(code, "", "exec"), ns) # pylint: disable=exec-used + + +def test_materialization_rejects_absolute_path(): + """Materialization guard must raise on absolute path in _files dict.""" + malicious_files = {"/tmp/absolute_pwned.txt": "pwned"} + code_lines = [ + "import os", + "import tempfile", + f"_files = {malicious_files!r}", + "def _materialize_and_run():", + " with tempfile.TemporaryDirectory() as td:", + " _real_base = os.path.realpath(td)", + " for rel_path, content in _files.items():", + " if os.path.isabs(rel_path):", + " raise ValueError(f'Absolute path rejected: {rel_path!r}')", + " _safe = os.path.realpath(os.path.join(td, rel_path))", + " if os.path.commonpath([_real_base, _safe]) != _real_base:", + " raise ValueError(f'Path traversal detected: {rel_path!r}')", + " full_path = _safe", + " os.makedirs(os.path.dirname(full_path), exist_ok=True)", + " mode = 'wb' if isinstance(content, bytes) else 'w'", + " with open(full_path, mode) as f:", + " f.write(content)", + "_materialize_and_run()", + ] + code = "\n".join(code_lines) + ns = {"__builtins__": __builtins__} + with pytest.raises(ValueError, match="Absolute path rejected"): + exec(compile(code, "", "exec"), ns) # pylint: disable=exec-used + + +def test_materialization_safe_path_writes_correctly(): + """Safe paths must materialize and write without error.""" + safe_files = {"scripts/hello.py": "print('hello')"} + code_lines = [ + "import os", + "import tempfile", + f"_files = {safe_files!r}", + "_written = []", + "def _materialize_and_run():", + " with tempfile.TemporaryDirectory() as td:", + " _real_base = os.path.realpath(td)", + " for rel_path, content in _files.items():", + " if os.path.isabs(rel_path):", + " raise ValueError(f'Absolute path rejected: {rel_path!r}')", + " _safe = os.path.realpath(os.path.join(td, rel_path))", + " if os.path.commonpath([_real_base, _safe]) != _real_base:", + " raise ValueError(f'Path traversal detected: {rel_path!r}')", + " full_path = _safe", + " os.makedirs(os.path.dirname(full_path), exist_ok=True)", + " mode = 'wb' if isinstance(content, bytes) else 'w'", + " with open(full_path, mode) as f:", + " f.write(content)", + " _written.append(rel_path)", + "_materialize_and_run()", + ] + code = "\n".join(code_lines) + ns = {} + exec(compile(code, "", "exec"), ns) # pylint: disable=exec-used + assert "scripts/hello.py" in ns["_written"]