From 1338fda8eefd9d358e77a6f6bb392718e6043fd9 Mon Sep 17 00:00:00 2001 From: p1930n Date: Tue, 5 May 2026 22:28:53 +0800 Subject: [PATCH 1/5] fix(plugin): support flat plugin upload archives --- astrbot/core/star/updator.py | 80 ++++++++++++++++++++++++++++-------- tests/test_updator_socks.py | 50 ++++++++++++++++++++++ 2 files changed, 114 insertions(+), 16 deletions(-) diff --git a/astrbot/core/star/updator.py b/astrbot/core/star/updator.py index cb462d8b2c..95897c700d 100644 --- a/astrbot/core/star/updator.py +++ b/astrbot/core/star/updator.py @@ -1,6 +1,7 @@ import os import shutil import zipfile +from pathlib import Path, PurePosixPath from astrbot.core import logger from astrbot.core.utils.astrbot_path import get_astrbot_plugin_path @@ -71,29 +72,76 @@ async def update( return plugin_path def unzip_file(self, zip_path: str, target_dir: str) -> None: - ensure_dir(target_dir) - update_dir = "" + target_path = Path(target_dir) + ensure_dir(target_path) logger.info(f"Extracting archive: {zip_path}") + + archive_root_dir = None with zipfile.ZipFile(zip_path, "r") as z: - update_dir = z.namelist()[0] - z.extractall(target_dir) - - files = os.listdir(os.path.join(target_dir, update_dir)) - for f in files: - if os.path.isdir(os.path.join(target_dir, update_dir, f)): - if os.path.exists(os.path.join(target_dir, f)): - shutil.rmtree(os.path.join(target_dir, f), onerror=on_error) - elif os.path.exists(os.path.join(target_dir, f)): - os.remove(os.path.join(target_dir, f)) - shutil.move(os.path.join(target_dir, update_dir, f), target_dir) + archive_root_dir = self._get_archive_root_dir(z.infolist()) + self._extract_archive_safely(z, target_path) + + extracted_root = target_path / archive_root_dir if archive_root_dir else None + if extracted_root and extracted_root.is_dir(): + for child in extracted_root.iterdir(): + destination = target_path / child.name + if destination.is_dir() and not destination.is_symlink(): + shutil.rmtree(destination, onerror=on_error) + elif destination.exists() or destination.is_symlink(): + destination.unlink() + shutil.move(str(child), str(target_path)) try: logger.info( - f"Removing temporary files: {zip_path} and {os.path.join(target_dir, update_dir)}", + f"Removing temporary files: {zip_path}" + + (f" and {extracted_root}" if extracted_root else ""), ) - shutil.rmtree(os.path.join(target_dir, update_dir), onerror=on_error) + if extracted_root and extracted_root.exists(): + shutil.rmtree(extracted_root, onerror=on_error) os.remove(zip_path) except BaseException: logger.warning( - f"Failed to remove update files; you can manually delete {zip_path} and {os.path.join(target_dir, update_dir)}", + f"Failed to remove update files; you can manually delete {zip_path}" + + (f" and {extracted_root}" if extracted_root else ""), ) + + @classmethod + def _get_archive_root_dir(cls, members: list[zipfile.ZipInfo]) -> str | None: + root_dir = None + for member in members: + parts = cls._get_safe_member_parts(member.filename) + if not parts or member.is_dir(): + continue + if len(parts) == 1: + return None + if root_dir is None: + root_dir = parts[0] + elif root_dir != parts[0]: + return None + return root_dir + + @classmethod + def _extract_archive_safely( + cls, archive: zipfile.ZipFile, target_path: Path + ) -> None: + for member in archive.infolist(): + cls._get_safe_member_parts(member.filename) + for member in archive.infolist(): + archive.extract(member, target_path) + + @staticmethod + def _get_safe_member_parts(member_name: str) -> tuple[str, ...]: + if not member_name: + return () + if "\\" in member_name: + raise ValueError(f"Unsafe path in zip archive: {member_name}") + + member_path = PurePosixPath(member_name) + parts = tuple(part for part in member_path.parts if part) + if ( + member_path.is_absolute() + or any(part in {".", ".."} for part in parts) + or any(":" in part for part in parts) + ): + raise ValueError(f"Unsafe path in zip archive: {member_name}") + return parts diff --git a/tests/test_updator_socks.py b/tests/test_updator_socks.py index 11009ff85c..8bf7db23c9 100644 --- a/tests/test_updator_socks.py +++ b/tests/test_updator_socks.py @@ -1,3 +1,4 @@ +import zipfile from dataclasses import dataclass, field from pathlib import Path from types import SimpleNamespace @@ -177,6 +178,55 @@ def fake_unzip_file(zip_path: str, target_dir: str): assert calls["unzip"] == (str(expected_path) + ".zip", str(expected_path)) +def test_plugin_updator_unzip_file_accepts_flat_plugin_archive(tmp_path: Path) -> None: + archive_path = tmp_path / "flat_plugin.zip" + target_path = tmp_path / "plugin_upload" + with zipfile.ZipFile(archive_path, "w") as archive: + archive.writestr("main.py", "print('loaded')\n") + archive.writestr("metadata.yaml", "name: flat_plugin\n") + archive.writestr("commands/__init__.py", "") + + PluginUpdator().unzip_file(str(archive_path), str(target_path)) + + assert (target_path / "main.py").read_text(encoding="utf-8") == "print('loaded')\n" + assert (target_path / "metadata.yaml").read_text(encoding="utf-8") == ( + "name: flat_plugin\n" + ) + assert (target_path / "commands" / "__init__.py").exists() + assert not archive_path.exists() + + +def test_plugin_updator_unzip_file_flattens_single_root_dir(tmp_path: Path) -> None: + archive_path = tmp_path / "rooted_plugin.zip" + target_path = tmp_path / "plugin_upload" + with zipfile.ZipFile(archive_path, "w") as archive: + archive.writestr("astrbot_plugin_demo-main/main.py", "print('loaded')\n") + archive.writestr("astrbot_plugin_demo-main/metadata.yaml", "name: demo\n") + archive.writestr("astrbot_plugin_demo-main/services/__init__.py", "") + + PluginUpdator().unzip_file(str(archive_path), str(target_path)) + + assert (target_path / "main.py").exists() + assert (target_path / "metadata.yaml").exists() + assert (target_path / "services" / "__init__.py").exists() + assert not (target_path / "astrbot_plugin_demo-main").exists() + assert not archive_path.exists() + + +def test_plugin_updator_unzip_file_rejects_unsafe_member_path( + tmp_path: Path, +) -> None: + archive_path = tmp_path / "unsafe_plugin.zip" + target_path = tmp_path / "plugin_upload" + with zipfile.ZipFile(archive_path, "w") as archive: + archive.writestr("../escape.py", "print('escape')\n") + + with pytest.raises(ValueError, match="Unsafe path in zip archive"): + PluginUpdator().unzip_file(str(archive_path), str(target_path)) + + assert not (tmp_path / "escape.py").exists() + + @pytest.mark.asyncio async def test_fetch_release_info_uses_httpx_client_with_env_proxy_support( monkeypatch: pytest.MonkeyPatch, From 481aed99365970a0bddd3e49af03a88b97fa8988 Mon Sep 17 00:00:00 2001 From: p1930n Date: Tue, 5 May 2026 22:46:13 +0800 Subject: [PATCH 2/5] fix(plugin): avoid root flattening name conflict --- astrbot/core/star/updator.py | 21 ++++++++++++++++----- tests/test_updator_socks.py | 17 +++++++++++++++++ 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/astrbot/core/star/updator.py b/astrbot/core/star/updator.py index 95897c700d..877299f215 100644 --- a/astrbot/core/star/updator.py +++ b/astrbot/core/star/updator.py @@ -83,12 +83,10 @@ def unzip_file(self, zip_path: str, target_dir: str) -> None: extracted_root = target_path / archive_root_dir if archive_root_dir else None if extracted_root and extracted_root.is_dir(): + extracted_root = self._rename_extracted_root_for_flattening(extracted_root) for child in extracted_root.iterdir(): destination = target_path / child.name - if destination.is_dir() and not destination.is_symlink(): - shutil.rmtree(destination, onerror=on_error) - elif destination.exists() or destination.is_symlink(): - destination.unlink() + self._remove_existing_path(destination) shutil.move(str(child), str(target_path)) try: @@ -99,12 +97,25 @@ def unzip_file(self, zip_path: str, target_dir: str) -> None: if extracted_root and extracted_root.exists(): shutil.rmtree(extracted_root, onerror=on_error) os.remove(zip_path) - except BaseException: + except Exception: logger.warning( f"Failed to remove update files; you can manually delete {zip_path}" + (f" and {extracted_root}" if extracted_root else ""), ) + def _rename_extracted_root_for_flattening(self, extracted_root: Path) -> Path: + temp_root = extracted_root.with_name(f".{extracted_root.name}.tmp") + self._remove_existing_path(temp_root) + extracted_root.rename(temp_root) + return temp_root + + @staticmethod + def _remove_existing_path(path: Path) -> None: + if path.is_dir() and not path.is_symlink(): + shutil.rmtree(path, onerror=on_error) + elif path.exists() or path.is_symlink(): + path.unlink() + @classmethod def _get_archive_root_dir(cls, members: list[zipfile.ZipInfo]) -> str | None: root_dir = None diff --git a/tests/test_updator_socks.py b/tests/test_updator_socks.py index 8bf7db23c9..b9ffe5afc2 100644 --- a/tests/test_updator_socks.py +++ b/tests/test_updator_socks.py @@ -213,6 +213,23 @@ def test_plugin_updator_unzip_file_flattens_single_root_dir(tmp_path: Path) -> N assert not archive_path.exists() +def test_plugin_updator_unzip_file_flattens_root_dir_with_same_named_child( + tmp_path: Path, +) -> None: + archive_path = tmp_path / "same_named_child.zip" + target_path = tmp_path / "plugin_upload" + with zipfile.ZipFile(archive_path, "w") as archive: + archive.writestr("my_plugin/main.py", "print('loaded')\n") + archive.writestr("my_plugin/my_plugin/__init__.py", "") + + PluginUpdator().unzip_file(str(archive_path), str(target_path)) + + assert (target_path / "main.py").exists() + assert (target_path / "my_plugin" / "__init__.py").exists() + assert not (target_path / ".my_plugin.tmp").exists() + assert not archive_path.exists() + + def test_plugin_updator_unzip_file_rejects_unsafe_member_path( tmp_path: Path, ) -> None: From 99ae084997654867dac7a867cec192a6a7631c9b Mon Sep 17 00:00:00 2001 From: p1930n Date: Tue, 5 May 2026 23:16:15 +0800 Subject: [PATCH 3/5] fix(plugin): harden upload archive flattening --- astrbot/core/star/updator.py | 46 ++++++++++++++++++++++-------------- tests/test_updator_socks.py | 44 +++++++++++++++++++++++++++++++--- 2 files changed, 69 insertions(+), 21 deletions(-) diff --git a/astrbot/core/star/updator.py b/astrbot/core/star/updator.py index 877299f215..79fde80582 100644 --- a/astrbot/core/star/updator.py +++ b/astrbot/core/star/updator.py @@ -1,5 +1,6 @@ import os import shutil +import uuid import zipfile from pathlib import Path, PurePosixPath @@ -78,8 +79,10 @@ def unzip_file(self, zip_path: str, target_dir: str) -> None: archive_root_dir = None with zipfile.ZipFile(zip_path, "r") as z: - archive_root_dir = self._get_archive_root_dir(z.infolist()) - self._extract_archive_safely(z, target_path) + members = z.infolist() + archive_root_dir = self._get_archive_root_dir(members) + for member in members: + z.extract(member, target_path) extracted_root = target_path / archive_root_dir if archive_root_dir else None if extracted_root and extracted_root.is_dir(): @@ -104,11 +107,22 @@ def unzip_file(self, zip_path: str, target_dir: str) -> None: ) def _rename_extracted_root_for_flattening(self, extracted_root: Path) -> Path: - temp_root = extracted_root.with_name(f".{extracted_root.name}.tmp") - self._remove_existing_path(temp_root) + temp_root = self._get_unique_flatten_temp_path(extracted_root) extracted_root.rename(temp_root) return temp_root + @staticmethod + def _get_unique_flatten_temp_path(extracted_root: Path) -> Path: + for _ in range(100): + temp_root = extracted_root.with_name( + f".{extracted_root.name}.{uuid.uuid4().hex}.tmp" + ) + if not temp_root.exists() and not temp_root.is_symlink(): + return temp_root + raise FileExistsError( + f"Could not allocate a temporary path for {extracted_root}" + ) + @staticmethod def _remove_existing_path(path: Path) -> None: if path.is_dir() and not path.is_symlink(): @@ -116,30 +130,26 @@ def _remove_existing_path(path: Path) -> None: elif path.exists() or path.is_symlink(): path.unlink() - @classmethod - def _get_archive_root_dir(cls, members: list[zipfile.ZipInfo]) -> str | None: + @staticmethod + def _get_archive_root_dir(members: list[zipfile.ZipInfo]) -> str | None: root_dir = None + has_root_file = False + has_multiple_roots = False for member in members: - parts = cls._get_safe_member_parts(member.filename) + parts = PluginUpdator._get_safe_member_parts(member.filename) if not parts or member.is_dir(): continue if len(parts) == 1: - return None + has_root_file = True + continue if root_dir is None: root_dir = parts[0] elif root_dir != parts[0]: - return None + has_multiple_roots = True + if has_root_file or has_multiple_roots: + return None return root_dir - @classmethod - def _extract_archive_safely( - cls, archive: zipfile.ZipFile, target_path: Path - ) -> None: - for member in archive.infolist(): - cls._get_safe_member_parts(member.filename) - for member in archive.infolist(): - archive.extract(member, target_path) - @staticmethod def _get_safe_member_parts(member_name: str) -> tuple[str, ...]: if not member_name: diff --git a/tests/test_updator_socks.py b/tests/test_updator_socks.py index b9ffe5afc2..ffd8f69eaa 100644 --- a/tests/test_updator_socks.py +++ b/tests/test_updator_socks.py @@ -213,6 +213,23 @@ def test_plugin_updator_unzip_file_flattens_single_root_dir(tmp_path: Path) -> N assert not archive_path.exists() +def test_plugin_updator_unzip_file_keeps_multiple_root_entries( + tmp_path: Path, +) -> None: + archive_path = tmp_path / "multi_root.zip" + target_path = tmp_path / "plugin_upload" + with zipfile.ZipFile(archive_path, "w") as archive: + archive.writestr("plugin_a/main.py", "print('a')\n") + archive.writestr("plugin_b/main.py", "print('b')\n") + + PluginUpdator().unzip_file(str(archive_path), str(target_path)) + + assert (target_path / "plugin_a" / "main.py").exists() + assert (target_path / "plugin_b" / "main.py").exists() + assert not (target_path / "main.py").exists() + assert not archive_path.exists() + + def test_plugin_updator_unzip_file_flattens_root_dir_with_same_named_child( tmp_path: Path, ) -> None: @@ -226,24 +243,45 @@ def test_plugin_updator_unzip_file_flattens_root_dir_with_same_named_child( assert (target_path / "main.py").exists() assert (target_path / "my_plugin" / "__init__.py").exists() - assert not (target_path / ".my_plugin.tmp").exists() + assert not any( + path.name.startswith(".my_plugin.") and path.name.endswith(".tmp") + for path in target_path.iterdir() + ) assert not archive_path.exists() -def test_plugin_updator_unzip_file_rejects_unsafe_member_path( +@pytest.mark.parametrize( + "member_name", + [ + "../escape.py", + "nested/../../escape.py", + "/absolute.py", + "C:/absolute.py", + "nested/colon:name.py", + ], +) +def test_plugin_updator_unzip_file_rejects_unsafe_member_paths( tmp_path: Path, + member_name: str, ) -> None: archive_path = tmp_path / "unsafe_plugin.zip" target_path = tmp_path / "plugin_upload" with zipfile.ZipFile(archive_path, "w") as archive: - archive.writestr("../escape.py", "print('escape')\n") + archive.writestr("main.py", "print('safe')\n") + archive.writestr(member_name, "print('escape')\n") with pytest.raises(ValueError, match="Unsafe path in zip archive"): PluginUpdator().unzip_file(str(archive_path), str(target_path)) + assert not (target_path / "main.py").exists() assert not (tmp_path / "escape.py").exists() +def test_plugin_updator_rejects_backslash_member_path() -> None: + with pytest.raises(ValueError, match="Unsafe path in zip archive"): + PluginUpdator._get_safe_member_parts(r"nested\windows.py") + + @pytest.mark.asyncio async def test_fetch_release_info_uses_httpx_client_with_env_proxy_support( monkeypatch: pytest.MonkeyPatch, From 2d5509ee26ac9c7938a4a09a78302a01fdc9aa05 Mon Sep 17 00:00:00 2001 From: p1930n Date: Tue, 5 May 2026 23:36:11 +0800 Subject: [PATCH 4/5] test(plugin): cover archive edge cases --- astrbot/core/star/updator.py | 9 +++++++-- tests/test_updator_socks.py | 29 +++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/astrbot/core/star/updator.py b/astrbot/core/star/updator.py index 79fde80582..7fa4053e03 100644 --- a/astrbot/core/star/updator.py +++ b/astrbot/core/star/updator.py @@ -133,19 +133,24 @@ def _remove_existing_path(path: Path) -> None: @staticmethod def _get_archive_root_dir(members: list[zipfile.ZipInfo]) -> str | None: root_dir = None + has_file = False has_root_file = False has_multiple_roots = False for member in members: parts = PluginUpdator._get_safe_member_parts(member.filename) - if not parts or member.is_dir(): + if not parts: continue - if len(parts) == 1: + if not member.is_dir(): + has_file = True + if len(parts) == 1 and not member.is_dir(): has_root_file = True continue if root_dir is None: root_dir = parts[0] elif root_dir != parts[0]: has_multiple_roots = True + if not has_file: + raise ValueError("Empty plugin archive") if has_root_file or has_multiple_roots: return None return root_dir diff --git a/tests/test_updator_socks.py b/tests/test_updator_socks.py index ffd8f69eaa..ee10c26793 100644 --- a/tests/test_updator_socks.py +++ b/tests/test_updator_socks.py @@ -196,6 +196,18 @@ def test_plugin_updator_unzip_file_accepts_flat_plugin_archive(tmp_path: Path) - assert not archive_path.exists() +def test_plugin_updator_unzip_file_rejects_empty_archive(tmp_path: Path) -> None: + archive_path = tmp_path / "empty_plugin.zip" + target_path = tmp_path / "plugin_upload" + with zipfile.ZipFile(archive_path, "w"): + pass + + with pytest.raises(ValueError, match="Empty plugin archive"): + PluginUpdator().unzip_file(str(archive_path), str(target_path)) + + assert not any(target_path.iterdir()) + + def test_plugin_updator_unzip_file_flattens_single_root_dir(tmp_path: Path) -> None: archive_path = tmp_path / "rooted_plugin.zip" target_path = tmp_path / "plugin_upload" @@ -230,6 +242,23 @@ def test_plugin_updator_unzip_file_keeps_multiple_root_entries( assert not archive_path.exists() +def test_plugin_updator_unzip_file_keeps_root_dir_with_extra_empty_root_dir( + tmp_path: Path, +) -> None: + archive_path = tmp_path / "rooted_plugin_with_empty_dir.zip" + target_path = tmp_path / "plugin_upload" + with zipfile.ZipFile(archive_path, "w") as archive: + archive.writestr("plugin/main.py", "print('loaded')\n") + archive.writestr("docs/", "") + + PluginUpdator().unzip_file(str(archive_path), str(target_path)) + + assert (target_path / "plugin" / "main.py").exists() + assert (target_path / "docs").is_dir() + assert not (target_path / "main.py").exists() + assert not archive_path.exists() + + def test_plugin_updator_unzip_file_flattens_root_dir_with_same_named_child( tmp_path: Path, ) -> None: From 86c9addbb2b6aac8866ab527412f03a487216f75 Mon Sep 17 00:00:00 2001 From: p1930n Date: Wed, 6 May 2026 13:50:22 +0800 Subject: [PATCH 5/5] fix(plugin): stage upload archive extraction --- astrbot/core/star/updator.py | 105 +++++++++++++++++++++++------------ tests/test_updator_socks.py | 21 +++++++ 2 files changed, 89 insertions(+), 37 deletions(-) diff --git a/astrbot/core/star/updator.py b/astrbot/core/star/updator.py index 7fa4053e03..7d08ece4cb 100644 --- a/astrbot/core/star/updator.py +++ b/astrbot/core/star/updator.py @@ -1,6 +1,6 @@ import os import shutil -import uuid +import tempfile import zipfile from pathlib import Path, PurePosixPath @@ -11,6 +11,9 @@ from ..star.star import StarMetadata from ..updator import RepoZipUpdator +ARCHIVE_METADATA_ROOT_DIRS = {"__MACOSX"} +ARCHIVE_METADATA_FILE_NAMES = {".DS_Store"} + class PluginUpdator(RepoZipUpdator): def __init__(self, repo_mirror: str = "", verify: str | bool | None = None) -> None: @@ -77,51 +80,69 @@ def unzip_file(self, zip_path: str, target_dir: str) -> None: ensure_dir(target_path) logger.info(f"Extracting archive: {zip_path}") - archive_root_dir = None - with zipfile.ZipFile(zip_path, "r") as z: - members = z.infolist() - archive_root_dir = self._get_archive_root_dir(members) - for member in members: - z.extract(member, target_path) - - extracted_root = target_path / archive_root_dir if archive_root_dir else None - if extracted_root and extracted_root.is_dir(): - extracted_root = self._rename_extracted_root_for_flattening(extracted_root) - for child in extracted_root.iterdir(): - destination = target_path / child.name - self._remove_existing_path(destination) - shutil.move(str(child), str(target_path)) - + staging_path = self._create_extract_temp_dir(target_path) try: - logger.info( - f"Removing temporary files: {zip_path}" - + (f" and {extracted_root}" if extracted_root else ""), + archive_root_dir = None + with zipfile.ZipFile(zip_path, "r") as z: + members = [ + member + for member in z.infolist() + if not self._is_archive_metadata_member(member.filename) + ] + archive_root_dir = self._get_archive_root_dir(members) + for member in members: + z.extract(member, staging_path) + + source_path = ( + staging_path / archive_root_dir if archive_root_dir else staging_path + ) + self._move_extracted_children(source_path, target_path) + self._remove_update_files(zip_path, staging_path) + if not staging_path.exists(): + staging_path = None + finally: + if staging_path: + self._remove_staging_path_safely(staging_path) + + @staticmethod + def _create_extract_temp_dir(target_path: Path) -> Path: + return Path( + tempfile.mkdtemp( + prefix=f".{target_path.name}.", + suffix=".extract", + dir=target_path.parent, ) - if extracted_root and extracted_root.exists(): - shutil.rmtree(extracted_root, onerror=on_error) + ) + + def _move_extracted_children(self, source_path: Path, target_path: Path) -> None: + for child in source_path.iterdir(): + destination = target_path / child.name + self._remove_existing_path(destination) + shutil.move(str(child), str(target_path)) + + @staticmethod + def _remove_update_files(zip_path: str, staging_path: Path) -> None: + try: + logger.info(f"Removing temporary files: {zip_path} and {staging_path}") + shutil.rmtree(staging_path, onerror=on_error) os.remove(zip_path) except Exception: logger.warning( - f"Failed to remove update files; you can manually delete {zip_path}" - + (f" and {extracted_root}" if extracted_root else ""), + f"Failed to remove update files; you can manually delete {zip_path} " + f"and {staging_path}", ) - def _rename_extracted_root_for_flattening(self, extracted_root: Path) -> Path: - temp_root = self._get_unique_flatten_temp_path(extracted_root) - extracted_root.rename(temp_root) - return temp_root - @staticmethod - def _get_unique_flatten_temp_path(extracted_root: Path) -> Path: - for _ in range(100): - temp_root = extracted_root.with_name( - f".{extracted_root.name}.{uuid.uuid4().hex}.tmp" + def _remove_staging_path_safely(staging_path: Path) -> None: + if not staging_path.exists(): + return + try: + shutil.rmtree(staging_path, onerror=on_error) + except Exception: + logger.warning( + f"Failed to remove temporary extract directory; " + f"you can manually delete {staging_path}", ) - if not temp_root.exists() and not temp_root.is_symlink(): - return temp_root - raise FileExistsError( - f"Could not allocate a temporary path for {extracted_root}" - ) @staticmethod def _remove_existing_path(path: Path) -> None: @@ -155,6 +176,16 @@ def _get_archive_root_dir(members: list[zipfile.ZipInfo]) -> str | None: return None return root_dir + @staticmethod + def _is_archive_metadata_member(member_name: str) -> bool: + parts = PluginUpdator._get_safe_member_parts(member_name) + if not parts: + return False + return ( + parts[0] in ARCHIVE_METADATA_ROOT_DIRS + or parts[-1] in ARCHIVE_METADATA_FILE_NAMES + ) + @staticmethod def _get_safe_member_parts(member_name: str) -> tuple[str, ...]: if not member_name: diff --git a/tests/test_updator_socks.py b/tests/test_updator_socks.py index ee10c26793..db1fee7564 100644 --- a/tests/test_updator_socks.py +++ b/tests/test_updator_socks.py @@ -225,6 +225,27 @@ def test_plugin_updator_unzip_file_flattens_single_root_dir(tmp_path: Path) -> N assert not archive_path.exists() +def test_plugin_updator_unzip_file_ignores_macos_metadata_when_flattening( + tmp_path: Path, +) -> None: + archive_path = tmp_path / "rooted_plugin_with_macos_metadata.zip" + target_path = tmp_path / "plugin_upload" + with zipfile.ZipFile(archive_path, "w") as archive: + archive.writestr("astrbot_plugin_demo-main/main.py", "print('loaded')\n") + archive.writestr("astrbot_plugin_demo-main/metadata.yaml", "name: demo\n") + archive.writestr("astrbot_plugin_demo-main/.DS_Store", "") + archive.writestr("__MACOSX/._astrbot_plugin_demo-main", "") + + PluginUpdator().unzip_file(str(archive_path), str(target_path)) + + assert (target_path / "main.py").exists() + assert (target_path / "metadata.yaml").exists() + assert not (target_path / "astrbot_plugin_demo-main").exists() + assert not (target_path / "__MACOSX").exists() + assert not (target_path / ".DS_Store").exists() + assert not archive_path.exists() + + def test_plugin_updator_unzip_file_keeps_multiple_root_entries( tmp_path: Path, ) -> None: