From b5cbcc673bdb38df5ac21ea2ed1bcdc38050de2e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=82=B9=E6=B0=B8=E8=B5=AB?= <1259085392@qq.com> Date: Wed, 6 May 2026 00:12:13 +0900 Subject: [PATCH 1/8] fix: normalize updater zip root paths on windows --- astrbot/core/star/updator.py | 25 ++++--- astrbot/core/zip_updator.py | 30 +++++--- tests/test_updator_socks.py | 141 ++++++++++++++++++++++++++++++++++- 3 files changed, 172 insertions(+), 24 deletions(-) diff --git a/astrbot/core/star/updator.py b/astrbot/core/star/updator.py index 2ae0039db0..e71507c46f 100644 --- a/astrbot/core/star/updator.py +++ b/astrbot/core/star/updator.py @@ -57,25 +57,28 @@ def unzip_file(self, zip_path: str, target_dir: str) -> None: update_dir = "" logger.info(f"正在解压压缩包: {zip_path}") with zipfile.ZipFile(zip_path, "r") as z: - update_dir = z.namelist()[0] + update_dir = self._normalize_archive_root_dir(z.namelist()[0]) z.extractall(target_dir) - files = os.listdir(os.path.join(target_dir, update_dir)) + update_root_path = os.path.normpath(os.path.join(target_dir, update_dir)) + files = os.listdir(update_root_path) for f in files: - if os.path.isdir(os.path.join(target_dir, update_dir, f)): - if os.path.exists(os.path.join(target_dir, f)): - shutil.rmtree(os.path.join(target_dir, f), onerror=on_error) - elif os.path.exists(os.path.join(target_dir, f)): - os.remove(os.path.join(target_dir, f)) - shutil.move(os.path.join(target_dir, update_dir, f), target_dir) + update_item_path = os.path.normpath(os.path.join(update_root_path, f)) + target_item_path = os.path.normpath(os.path.join(target_dir, f)) + if os.path.isdir(update_item_path): + if os.path.exists(target_item_path): + shutil.rmtree(target_item_path, onerror=on_error) + elif os.path.exists(target_item_path): + os.remove(target_item_path) + shutil.move(update_item_path, target_dir) try: logger.info( - f"删除临时文件: {zip_path} 和 {os.path.join(target_dir, update_dir)}", + f"删除临时文件: {zip_path} 和 {update_root_path}", ) - shutil.rmtree(os.path.join(target_dir, update_dir), onerror=on_error) + shutil.rmtree(update_root_path, onerror=on_error) os.remove(zip_path) except BaseException: logger.warning( - f"删除更新文件失败,可以手动删除 {zip_path} 和 {os.path.join(target_dir, update_dir)}", + f"删除更新文件失败,可以手动删除 {zip_path} 和 {update_root_path}", ) diff --git a/astrbot/core/zip_updator.py b/astrbot/core/zip_updator.py index 116b55c57d..2bc7ad51ee 100644 --- a/astrbot/core/zip_updator.py +++ b/astrbot/core/zip_updator.py @@ -231,33 +231,41 @@ def parse_github_url(self, url: str): return author, repo, branch raise ValueError("无效的 GitHub URL") + @staticmethod + def _normalize_archive_root_dir(path: str) -> str: + normalized = os.path.normpath(path) + return "" if normalized == "." else normalized + def unzip_file(self, zip_path: str, target_dir: str) -> None: """解压缩文件, 并将压缩包内**第一个**文件夹内的文件移动到 target_dir""" os.makedirs(target_dir, exist_ok=True) update_dir = "" with zipfile.ZipFile(zip_path, "r") as z: - update_dir = z.namelist()[0] + update_dir = self._normalize_archive_root_dir(z.namelist()[0]) z.extractall(target_dir) logger.debug(f"解压文件完成: {zip_path}") - files = os.listdir(os.path.join(target_dir, update_dir)) + update_root_path = os.path.normpath(os.path.join(target_dir, update_dir)) + files = os.listdir(update_root_path) for f in files: - if os.path.isdir(os.path.join(target_dir, update_dir, f)): - if os.path.exists(os.path.join(target_dir, f)): - shutil.rmtree(os.path.join(target_dir, f), onerror=on_error) - elif os.path.exists(os.path.join(target_dir, f)): - os.remove(os.path.join(target_dir, f)) - shutil.move(os.path.join(target_dir, update_dir, f), target_dir) + update_item_path = os.path.normpath(os.path.join(update_root_path, f)) + target_item_path = os.path.normpath(os.path.join(target_dir, f)) + if os.path.isdir(update_item_path): + if os.path.exists(target_item_path): + shutil.rmtree(target_item_path, onerror=on_error) + elif os.path.exists(target_item_path): + os.remove(target_item_path) + shutil.move(update_item_path, target_dir) try: logger.debug( - f"删除临时更新文件: {zip_path} 和 {os.path.join(target_dir, update_dir)}", + f"删除临时更新文件: {zip_path} 和 {update_root_path}", ) - shutil.rmtree(os.path.join(target_dir, update_dir), onerror=on_error) + shutil.rmtree(update_root_path, onerror=on_error) os.remove(zip_path) except BaseException: logger.warning( - f"删除更新文件失败,可以手动删除 {zip_path} 和 {os.path.join(target_dir, update_dir)}", + f"删除更新文件失败,可以手动删除 {zip_path} 和 {update_root_path}", ) def format_name(self, name: str) -> str: diff --git a/tests/test_updator_socks.py b/tests/test_updator_socks.py index 6dec6fe0f4..fffc98e869 100644 --- a/tests/test_updator_socks.py +++ b/tests/test_updator_socks.py @@ -1,3 +1,4 @@ +import ntpath from dataclasses import dataclass, field from pathlib import Path from types import SimpleNamespace @@ -6,6 +7,7 @@ import httpx import pytest +from astrbot.core.star.updator import PluginUpdator from astrbot.core.zip_updator import RepoZipUpdator @@ -107,6 +109,23 @@ def stream(self, method: str, url: str): # noqa: ARG002 return _FakeFailingStreamResponse() +class _FakeZipArchive: + def __init__(self, names: list[str]): + self._names = names + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb) -> None: + return None + + def namelist(self) -> list[str]: + return self._names + + def extractall(self, target_dir: str) -> None: # noqa: ARG002 + return None + + def _build_fake_httpx_module(state: _FakeAsyncClientState) -> SimpleNamespace: class _FakeAsyncClient: def __init__(self, **kwargs): @@ -187,7 +206,9 @@ async def test_fetch_release_info_uses_httpx_client_with_env_proxy_support( "zipball_url": "https://example.com/astrbot.zip", } ] - assert fake_async_client_state.requested_urls == ["https://api.soulter.top/releases"] + assert fake_async_client_state.requested_urls == [ + "https://api.soulter.top/releases" + ] assert fake_async_client_state.init_kwargs is not None assert fake_async_client_state.init_kwargs["follow_redirects"] is True assert fake_async_client_state.init_kwargs["timeout"] == 30.0 @@ -221,7 +242,9 @@ async def fake_fetch_release_info(self, url: str, latest: bool = True): # noqa: zip_updator_module, "download_file", lambda *args, **kwargs: (_ for _ in ()).throw( - AssertionError("download_from_repo_url should not use aiohttp download_file") + AssertionError( + "download_from_repo_url should not use aiohttp download_file" + ) ), raising=False, ) @@ -357,3 +380,117 @@ async def test_download_file_logs_url_and_target_path_on_failure( assert any(url in message for message in log_messages) assert any(str(target_path) in message for message in log_messages) + + +def test_repo_unzip_file_normalizes_windows_extended_length_paths( + monkeypatch: pytest.MonkeyPatch, +) -> None: + import astrbot.core.zip_updator as zip_updator_module + + target_dir = r"\\?\\C:\\Users\\admin\\AppData\\Local\\AstrBot\\backend\\app" + archive_root = "AstrBotDevs-AstrBot-39386ee/" + expected_root = ntpath.normpath(ntpath.join(target_dir, archive_root)) + expected_file = ntpath.normpath( + ntpath.join(target_dir, archive_root, ".dockerignore") + ) + captured: dict[str, object] = {} + + def fake_listdir(path: str) -> list[str]: + captured.setdefault("listdir", path) + return [".dockerignore"] + + monkeypatch.setattr( + zip_updator_module.os, "makedirs", lambda path, exist_ok=True: None + ) + monkeypatch.setattr(zip_updator_module.os.path, "join", ntpath.join) + monkeypatch.setattr(zip_updator_module.os.path, "normpath", ntpath.normpath) + monkeypatch.setattr(zip_updator_module.os.path, "isdir", lambda path: False) + monkeypatch.setattr(zip_updator_module.os.path, "exists", lambda path: False) + monkeypatch.setattr( + zip_updator_module.zipfile, + "ZipFile", + lambda path, mode: _FakeZipArchive( + [archive_root, f"{archive_root}.dockerignore"] + ), + ) + monkeypatch.setattr(zip_updator_module.logger, "debug", lambda message: None) + monkeypatch.setattr(zip_updator_module.logger, "warning", lambda message: None) + monkeypatch.setattr(zip_updator_module.os, "listdir", fake_listdir) + monkeypatch.setattr( + zip_updator_module.shutil, + "move", + lambda src, dst: captured.setdefault("move", (src, dst)), + ) + monkeypatch.setattr( + zip_updator_module.shutil, + "rmtree", + lambda path, onerror=None: captured.setdefault("cleanup", path), + ) + monkeypatch.setattr( + zip_updator_module.os, + "remove", + lambda path: captured.setdefault("removed", path), + ) + + RepoZipUpdator().unzip_file("temp.zip", target_dir) + + assert captured["listdir"] == expected_root + assert captured["move"] == (expected_file, target_dir) + assert captured["cleanup"] == expected_root + + +def test_plugin_unzip_file_normalizes_windows_extended_length_paths( + monkeypatch: pytest.MonkeyPatch, +) -> None: + import astrbot.core.star.updator as plugin_updator_module + + target_dir = r"\\?\\C:\\Users\\admin\\AppData\\Local\\AstrBot\\data\\plugins\\demo" + archive_root = "AstrBotDevs-demo-39386ee/" + expected_root = ntpath.normpath(ntpath.join(target_dir, archive_root)) + expected_file = ntpath.normpath( + ntpath.join(target_dir, archive_root, ".dockerignore") + ) + captured: dict[str, object] = {} + + def fake_listdir(path: str) -> list[str]: + captured.setdefault("listdir", path) + return [".dockerignore"] + + monkeypatch.setattr( + plugin_updator_module.os, "makedirs", lambda path, exist_ok=True: None + ) + monkeypatch.setattr(plugin_updator_module.os.path, "join", ntpath.join) + monkeypatch.setattr(plugin_updator_module.os.path, "normpath", ntpath.normpath) + monkeypatch.setattr(plugin_updator_module.os.path, "isdir", lambda path: False) + monkeypatch.setattr(plugin_updator_module.os.path, "exists", lambda path: False) + monkeypatch.setattr( + plugin_updator_module.zipfile, + "ZipFile", + lambda path, mode: _FakeZipArchive( + [archive_root, f"{archive_root}.dockerignore"] + ), + ) + monkeypatch.setattr(plugin_updator_module.logger, "info", lambda message: None) + monkeypatch.setattr(plugin_updator_module.logger, "warning", lambda message: None) + monkeypatch.setattr(plugin_updator_module.os, "listdir", fake_listdir) + monkeypatch.setattr( + plugin_updator_module.shutil, + "move", + lambda src, dst: captured.setdefault("move", (src, dst)), + ) + monkeypatch.setattr( + plugin_updator_module.shutil, + "rmtree", + lambda path, onerror=None: captured.setdefault("cleanup", path), + ) + monkeypatch.setattr( + plugin_updator_module.os, + "remove", + lambda path: captured.setdefault("removed", path), + ) + + PluginUpdator.__new__(PluginUpdator).unzip_file("temp.zip", target_dir) + + assert captured["listdir"] == expected_root + assert captured["move"] == (expected_file, target_dir) + assert captured["cleanup"] == expected_root From e27094f1e5bc97f92472b79c769d8292a1e01468 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=82=B9=E6=B0=B8=E8=B5=AB?= <1259085392@qq.com> Date: Wed, 6 May 2026 00:36:36 +0900 Subject: [PATCH 2/8] refactor: share updater archive path normalization --- astrbot/core/star/updator.py | 7 ++++--- astrbot/core/zip_updator.py | 13 +++++++++---- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/astrbot/core/star/updator.py b/astrbot/core/star/updator.py index 2bd069ca9a..dd58ca8802 100644 --- a/astrbot/core/star/updator.py +++ b/astrbot/core/star/updator.py @@ -78,11 +78,12 @@ def unzip_file(self, zip_path: str, target_dir: str) -> None: update_dir = self._normalize_archive_root_dir(z.namelist()[0]) z.extractall(target_dir) - update_root_path = os.path.normpath(os.path.join(target_dir, update_dir)) + target_root_path = self._normalize_archive_path(target_dir) + update_root_path = self._normalize_archive_path(target_root_path, update_dir) files = os.listdir(update_root_path) for f in files: - update_item_path = os.path.normpath(os.path.join(update_root_path, f)) - target_item_path = os.path.normpath(os.path.join(target_dir, f)) + update_item_path = self._normalize_archive_path(update_root_path, f) + target_item_path = self._normalize_archive_path(target_root_path, f) if os.path.isdir(update_item_path): if os.path.exists(target_item_path): shutil.rmtree(target_item_path, onerror=on_error) diff --git a/astrbot/core/zip_updator.py b/astrbot/core/zip_updator.py index f61f222da4..45fb7c71a7 100644 --- a/astrbot/core/zip_updator.py +++ b/astrbot/core/zip_updator.py @@ -231,9 +231,13 @@ def parse_github_url(self, url: str): return author, repo, branch raise ValueError("无效的 GitHub URL") + @staticmethod + def _normalize_archive_path(*parts: str) -> str: + return os.path.normpath(os.path.join(*parts)) + @staticmethod def _normalize_archive_root_dir(path: str) -> str: - normalized = os.path.normpath(path) + normalized = RepoZipUpdator._normalize_archive_path(path) return "" if normalized == "." else normalized def unzip_file(self, zip_path: str, target_dir: str) -> None: @@ -245,11 +249,12 @@ def unzip_file(self, zip_path: str, target_dir: str) -> None: z.extractall(target_dir) logger.debug(f"解压文件完成: {zip_path}") - update_root_path = os.path.normpath(os.path.join(target_dir, update_dir)) + target_root_path = self._normalize_archive_path(target_dir) + update_root_path = self._normalize_archive_path(target_root_path, update_dir) files = os.listdir(update_root_path) for f in files: - update_item_path = os.path.normpath(os.path.join(update_root_path, f)) - target_item_path = os.path.normpath(os.path.join(target_dir, f)) + update_item_path = self._normalize_archive_path(update_root_path, f) + target_item_path = self._normalize_archive_path(target_root_path, f) if os.path.isdir(update_item_path): if os.path.exists(target_item_path): shutil.rmtree(target_item_path, onerror=on_error) From 909feeaf098cd7b82cd83fb9971de2eb29cbc06c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=82=B9=E6=B0=B8=E8=B5=AB?= <1259085392@qq.com> Date: Wed, 6 May 2026 00:51:32 +0900 Subject: [PATCH 3/8] test: expand updater archive root coverage --- astrbot/core/star/updator.py | 7 +++---- astrbot/core/zip_updator.py | 13 ++++--------- tests/test_updator_socks.py | 24 +++++++++++++++++------- 3 files changed, 24 insertions(+), 20 deletions(-) diff --git a/astrbot/core/star/updator.py b/astrbot/core/star/updator.py index dd58ca8802..2bd069ca9a 100644 --- a/astrbot/core/star/updator.py +++ b/astrbot/core/star/updator.py @@ -78,12 +78,11 @@ def unzip_file(self, zip_path: str, target_dir: str) -> None: update_dir = self._normalize_archive_root_dir(z.namelist()[0]) z.extractall(target_dir) - target_root_path = self._normalize_archive_path(target_dir) - update_root_path = self._normalize_archive_path(target_root_path, update_dir) + update_root_path = os.path.normpath(os.path.join(target_dir, update_dir)) files = os.listdir(update_root_path) for f in files: - update_item_path = self._normalize_archive_path(update_root_path, f) - target_item_path = self._normalize_archive_path(target_root_path, f) + update_item_path = os.path.normpath(os.path.join(update_root_path, f)) + target_item_path = os.path.normpath(os.path.join(target_dir, f)) if os.path.isdir(update_item_path): if os.path.exists(target_item_path): shutil.rmtree(target_item_path, onerror=on_error) diff --git a/astrbot/core/zip_updator.py b/astrbot/core/zip_updator.py index 45fb7c71a7..f61f222da4 100644 --- a/astrbot/core/zip_updator.py +++ b/astrbot/core/zip_updator.py @@ -231,13 +231,9 @@ def parse_github_url(self, url: str): return author, repo, branch raise ValueError("无效的 GitHub URL") - @staticmethod - def _normalize_archive_path(*parts: str) -> str: - return os.path.normpath(os.path.join(*parts)) - @staticmethod def _normalize_archive_root_dir(path: str) -> str: - normalized = RepoZipUpdator._normalize_archive_path(path) + normalized = os.path.normpath(path) return "" if normalized == "." else normalized def unzip_file(self, zip_path: str, target_dir: str) -> None: @@ -249,12 +245,11 @@ def unzip_file(self, zip_path: str, target_dir: str) -> None: z.extractall(target_dir) logger.debug(f"解压文件完成: {zip_path}") - target_root_path = self._normalize_archive_path(target_dir) - update_root_path = self._normalize_archive_path(target_root_path, update_dir) + update_root_path = os.path.normpath(os.path.join(target_dir, update_dir)) files = os.listdir(update_root_path) for f in files: - update_item_path = self._normalize_archive_path(update_root_path, f) - target_item_path = self._normalize_archive_path(target_root_path, f) + update_item_path = os.path.normpath(os.path.join(update_root_path, f)) + target_item_path = os.path.normpath(os.path.join(target_dir, f)) if os.path.isdir(update_item_path): if os.path.exists(target_item_path): shutil.rmtree(target_item_path, onerror=on_error) diff --git a/tests/test_updator_socks.py b/tests/test_updator_socks.py index ca46797da5..d3fe7277ad 100644 --- a/tests/test_updator_socks.py +++ b/tests/test_updator_socks.py @@ -1,4 +1,5 @@ import ntpath +import posixpath from dataclasses import dataclass, field from pathlib import Path from types import SimpleNamespace @@ -126,6 +127,10 @@ def extractall(self, target_dir: str) -> None: # noqa: ARG002 return None +def _build_fake_archive_entries(archive_root: str) -> list[str]: + return [archive_root, posixpath.join(archive_root, ".dockerignore")] + + def _build_fake_httpx_module(state: _FakeAsyncClientState) -> SimpleNamespace: class _FakeAsyncClient: def __init__(self, **kwargs): @@ -420,13 +425,22 @@ async def test_download_file_logs_url_and_target_path_on_failure( assert any(str(target_path) in message for message in log_messages) +@pytest.mark.parametrize( + "archive_root", + [ + "AstrBotDevs-AstrBot-39386ee/", + "AstrBotDevs-AstrBot-39386ee", + "owner-repo-branch/subdir/", + ".", + ], +) def test_repo_unzip_file_normalizes_windows_extended_length_paths( monkeypatch: pytest.MonkeyPatch, + archive_root: str, ) -> None: import astrbot.core.zip_updator as zip_updator_module target_dir = r"\\?\\C:\\Users\\admin\\AppData\\Local\\AstrBot\\backend\\app" - archive_root = "AstrBotDevs-AstrBot-39386ee/" expected_root = ntpath.normpath(ntpath.join(target_dir, archive_root)) expected_file = ntpath.normpath( ntpath.join(target_dir, archive_root, ".dockerignore") @@ -447,9 +461,7 @@ def fake_listdir(path: str) -> list[str]: monkeypatch.setattr( zip_updator_module.zipfile, "ZipFile", - lambda path, mode: _FakeZipArchive( - [archive_root, f"{archive_root}.dockerignore"] - ), + lambda path, mode: _FakeZipArchive(_build_fake_archive_entries(archive_root)), ) monkeypatch.setattr(zip_updator_module.logger, "debug", lambda message: None) monkeypatch.setattr(zip_updator_module.logger, "warning", lambda message: None) @@ -504,9 +516,7 @@ def fake_listdir(path: str) -> list[str]: monkeypatch.setattr( plugin_updator_module.zipfile, "ZipFile", - lambda path, mode: _FakeZipArchive( - [archive_root, f"{archive_root}.dockerignore"] - ), + lambda path, mode: _FakeZipArchive(_build_fake_archive_entries(archive_root)), ) monkeypatch.setattr(plugin_updator_module.logger, "info", lambda message: None) monkeypatch.setattr(plugin_updator_module.logger, "warning", lambda message: None) From 6de68a5e7dfb6004e7ffd96cde438cd7b92dd298 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=82=B9=E6=B0=B8=E8=B5=AB?= <1259085392@qq.com> Date: Wed, 6 May 2026 01:03:30 +0900 Subject: [PATCH 4/8] fix: guard empty updater archive roots --- astrbot/core/star/updator.py | 12 +++++- astrbot/core/zip_updator.py | 19 ++++++--- tests/test_updator_socks.py | 77 +++++++++++++++++++++++++++--------- 3 files changed, 82 insertions(+), 26 deletions(-) diff --git a/astrbot/core/star/updator.py b/astrbot/core/star/updator.py index 2bd069ca9a..90529732e0 100644 --- a/astrbot/core/star/updator.py +++ b/astrbot/core/star/updator.py @@ -5,6 +5,7 @@ from astrbot.core import logger from astrbot.core.utils.astrbot_path import get_astrbot_plugin_path from astrbot.core.utils.io import ensure_dir, on_error, remove_dir +from astrbot.core.zip_updator import normalize_archive_root_dir from ..star.star import StarMetadata from ..updator import RepoZipUpdator @@ -75,9 +76,18 @@ def unzip_file(self, zip_path: str, target_dir: str) -> None: update_dir = "" logger.info(f"Extracting archive: {zip_path}") with zipfile.ZipFile(zip_path, "r") as z: - update_dir = self._normalize_archive_root_dir(z.namelist()[0]) + update_dir = normalize_archive_root_dir(z.namelist()[0]) z.extractall(target_dir) + if not update_dir: + try: + os.remove(zip_path) + except BaseException: + logger.warning( + f"Failed to remove update files; you can manually delete {zip_path}", + ) + return + update_root_path = os.path.normpath(os.path.join(target_dir, update_dir)) files = os.listdir(update_root_path) for f in files: diff --git a/astrbot/core/zip_updator.py b/astrbot/core/zip_updator.py index f61f222da4..9795de5e80 100644 --- a/astrbot/core/zip_updator.py +++ b/astrbot/core/zip_updator.py @@ -13,6 +13,11 @@ from astrbot.core.utils.version_comparator import VersionComparator +def normalize_archive_root_dir(path: str) -> str: + normalized = os.path.normpath(path) + return "" if normalized == "." else normalized + + class ReleaseInfo: version: str published_at: str @@ -231,20 +236,22 @@ def parse_github_url(self, url: str): return author, repo, branch raise ValueError("无效的 GitHub URL") - @staticmethod - def _normalize_archive_root_dir(path: str) -> str: - normalized = os.path.normpath(path) - return "" if normalized == "." else normalized - def unzip_file(self, zip_path: str, target_dir: str) -> None: """解压缩文件, 并将压缩包内**第一个**文件夹内的文件移动到 target_dir""" ensure_dir(target_dir) update_dir = "" with zipfile.ZipFile(zip_path, "r") as z: - update_dir = self._normalize_archive_root_dir(z.namelist()[0]) + update_dir = normalize_archive_root_dir(z.namelist()[0]) z.extractall(target_dir) logger.debug(f"解压文件完成: {zip_path}") + if not update_dir: + try: + os.remove(zip_path) + except BaseException: + logger.warning(f"删除更新文件失败,可以手动删除 {zip_path}") + return + update_root_path = os.path.normpath(os.path.join(target_dir, update_dir)) files = os.listdir(update_root_path) for f in files: diff --git a/tests/test_updator_socks.py b/tests/test_updator_socks.py index d3fe7277ad..bba3110b82 100644 --- a/tests/test_updator_socks.py +++ b/tests/test_updator_socks.py @@ -440,15 +440,23 @@ def test_repo_unzip_file_normalizes_windows_extended_length_paths( ) -> None: import astrbot.core.zip_updator as zip_updator_module - target_dir = r"\\?\\C:\\Users\\admin\\AppData\\Local\\AstrBot\\backend\\app" - expected_root = ntpath.normpath(ntpath.join(target_dir, archive_root)) - expected_file = ntpath.normpath( - ntpath.join(target_dir, archive_root, ".dockerignore") + target_dir = r"\\?\C:\Users\admin\AppData\Local\AstrBot\backend\app" + normalized_root = ntpath.normpath(archive_root) + expected_root = ( + target_dir + if normalized_root == "." + else ntpath.join(target_dir, normalized_root) ) - captured: dict[str, object] = {} + expected_file = ntpath.join(expected_root, ".dockerignore") + captured: dict[str, object | None] = { + "listdir": None, + "move": None, + "cleanup": None, + "removed": None, + } def fake_listdir(path: str) -> list[str]: - captured.setdefault("listdir", path) + captured["listdir"] = path return [".dockerignore"] monkeypatch.setattr( @@ -469,41 +477,65 @@ def fake_listdir(path: str) -> list[str]: monkeypatch.setattr( zip_updator_module.shutil, "move", - lambda src, dst: captured.setdefault("move", (src, dst)), + lambda src, dst: captured.__setitem__("move", (src, dst)), ) monkeypatch.setattr( zip_updator_module.shutil, "rmtree", - lambda path, onerror=None: captured.setdefault("cleanup", path), + lambda path, onerror=None: captured.__setitem__("cleanup", path), ) monkeypatch.setattr( zip_updator_module.os, "remove", - lambda path: captured.setdefault("removed", path), + lambda path: captured.__setitem__("removed", path), ) RepoZipUpdator().unzip_file("temp.zip", target_dir) + assert captured["removed"] == "temp.zip" + if normalized_root == ".": + assert captured["listdir"] is None + assert captured["move"] is None + assert captured["cleanup"] is None + return + assert captured["listdir"] == expected_root assert captured["move"] == (expected_file, target_dir) assert captured["cleanup"] == expected_root +@pytest.mark.parametrize( + "archive_root", + [ + "AstrBotDevs-demo-39386ee/", + "AstrBotDevs-demo-39386ee", + "owner-repo-branch/subdir/", + ".", + ], +) def test_plugin_unzip_file_normalizes_windows_extended_length_paths( monkeypatch: pytest.MonkeyPatch, + archive_root: str, ) -> None: import astrbot.core.star.updator as plugin_updator_module - target_dir = r"\\?\\C:\\Users\\admin\\AppData\\Local\\AstrBot\\data\\plugins\\demo" - archive_root = "AstrBotDevs-demo-39386ee/" - expected_root = ntpath.normpath(ntpath.join(target_dir, archive_root)) - expected_file = ntpath.normpath( - ntpath.join(target_dir, archive_root, ".dockerignore") + target_dir = r"\\?\C:\Users\admin\AppData\Local\AstrBot\data\plugins\demo" + normalized_root = ntpath.normpath(archive_root) + expected_root = ( + target_dir + if normalized_root == "." + else ntpath.join(target_dir, normalized_root) ) - captured: dict[str, object] = {} + expected_file = ntpath.join(expected_root, ".dockerignore") + captured: dict[str, object | None] = { + "listdir": None, + "move": None, + "cleanup": None, + "removed": None, + } def fake_listdir(path: str) -> list[str]: - captured.setdefault("listdir", path) + captured["listdir"] = path return [".dockerignore"] monkeypatch.setattr( @@ -524,21 +556,28 @@ def fake_listdir(path: str) -> list[str]: monkeypatch.setattr( plugin_updator_module.shutil, "move", - lambda src, dst: captured.setdefault("move", (src, dst)), + lambda src, dst: captured.__setitem__("move", (src, dst)), ) monkeypatch.setattr( plugin_updator_module.shutil, "rmtree", - lambda path, onerror=None: captured.setdefault("cleanup", path), + lambda path, onerror=None: captured.__setitem__("cleanup", path), ) monkeypatch.setattr( plugin_updator_module.os, "remove", - lambda path: captured.setdefault("removed", path), + lambda path: captured.__setitem__("removed", path), ) PluginUpdator.__new__(PluginUpdator).unzip_file("temp.zip", target_dir) + assert captured["removed"] == "temp.zip" + if normalized_root == ".": + assert captured["listdir"] is None + assert captured["move"] is None + assert captured["cleanup"] is None + return + assert captured["listdir"] == expected_root assert captured["move"] == (expected_file, target_dir) assert captured["cleanup"] == expected_root From fa93f1748b482d5c2cfe5e5080c5f53185942d2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=82=B9=E6=B0=B8=E8=B5=AB?= <1259085392@qq.com> Date: Thu, 7 May 2026 09:13:16 +0900 Subject: [PATCH 5/8] refactor: share updater extraction safeguards --- astrbot/core/star/updator.py | 43 ++----- astrbot/core/zip_updator.py | 62 +++++++-- tests/test_updator_socks.py | 235 +++++++++++++++++++---------------- 3 files changed, 187 insertions(+), 153 deletions(-) diff --git a/astrbot/core/star/updator.py b/astrbot/core/star/updator.py index 90529732e0..e5d67a5b0f 100644 --- a/astrbot/core/star/updator.py +++ b/astrbot/core/star/updator.py @@ -1,10 +1,9 @@ import os -import shutil import zipfile from astrbot.core import logger from astrbot.core.utils.astrbot_path import get_astrbot_plugin_path -from astrbot.core.utils.io import ensure_dir, on_error, remove_dir +from astrbot.core.utils.io import ensure_dir, remove_dir from astrbot.core.zip_updator import normalize_archive_root_dir from ..star.star import StarMetadata @@ -73,40 +72,16 @@ async def update( def unzip_file(self, zip_path: str, target_dir: str) -> None: ensure_dir(target_dir) - update_dir = "" logger.info(f"Extracting archive: {zip_path}") with zipfile.ZipFile(zip_path, "r") as z: update_dir = normalize_archive_root_dir(z.namelist()[0]) z.extractall(target_dir) - if not update_dir: - try: - os.remove(zip_path) - except BaseException: - logger.warning( - f"Failed to remove update files; you can manually delete {zip_path}", - ) - return - - update_root_path = os.path.normpath(os.path.join(target_dir, update_dir)) - files = os.listdir(update_root_path) - for f in files: - update_item_path = os.path.normpath(os.path.join(update_root_path, f)) - target_item_path = os.path.normpath(os.path.join(target_dir, f)) - if os.path.isdir(update_item_path): - if os.path.exists(target_item_path): - shutil.rmtree(target_item_path, onerror=on_error) - elif os.path.exists(target_item_path): - os.remove(target_item_path) - shutil.move(update_item_path, target_dir) - - try: - logger.info( - f"Removing temporary files: {zip_path} and {update_root_path}", - ) - shutil.rmtree(update_root_path, onerror=on_error) - os.remove(zip_path) - except BaseException: - logger.warning( - f"Failed to remove update files; you can manually delete {zip_path} and {update_root_path}", - ) + self._finalize_extracted_archive( + zip_path, + target_dir, + update_dir, + extract_log=logger.info, + cleanup_log_template="Removing temporary files: {zip_path}{suffix}", + cleanup_warning_template="Failed to remove update files; you can manually delete {zip_path}{suffix}", + ) diff --git a/astrbot/core/zip_updator.py b/astrbot/core/zip_updator.py index 9795de5e80..2e0afe8e87 100644 --- a/astrbot/core/zip_updator.py +++ b/astrbot/core/zip_updator.py @@ -18,6 +18,13 @@ def normalize_archive_root_dir(path: str) -> str: return "" if normalized == "." else normalized +def _is_within_directory(directory: str, path: str) -> bool: + try: + return os.path.commonpath([directory, path]) == directory + except ValueError: + return False + + class ReleaseInfo: version: str published_at: str @@ -239,40 +246,75 @@ def parse_github_url(self, url: str): def unzip_file(self, zip_path: str, target_dir: str) -> None: """解压缩文件, 并将压缩包内**第一个**文件夹内的文件移动到 target_dir""" ensure_dir(target_dir) - update_dir = "" with zipfile.ZipFile(zip_path, "r") as z: update_dir = normalize_archive_root_dir(z.namelist()[0]) z.extractall(target_dir) logger.debug(f"解压文件完成: {zip_path}") + self._finalize_extracted_archive( + zip_path, + target_dir, + update_dir, + extract_log=logger.debug, + cleanup_warning_template="删除更新文件失败,可以手动删除 {zip_path}{suffix}", + cleanup_log_template="删除临时更新文件: {zip_path}{suffix}", + ) + + def _finalize_extracted_archive( + self, + zip_path: str, + target_dir: str, + update_dir: str, + *, + extract_log, + cleanup_log_template: str, + cleanup_warning_template: str, + ) -> None: + target_root_path = os.path.normpath(target_dir) + if not update_dir: try: os.remove(zip_path) - except BaseException: - logger.warning(f"删除更新文件失败,可以手动删除 {zip_path}") + except Exception: + logger.warning( + cleanup_warning_template.format(zip_path=zip_path, suffix="") + ) return - update_root_path = os.path.normpath(os.path.join(target_dir, update_dir)) + update_root_path = os.path.normpath(os.path.join(target_root_path, update_dir)) + if not _is_within_directory(target_root_path, update_root_path): + raise ValueError("update root escapes target directory") + files = os.listdir(update_root_path) for f in files: update_item_path = os.path.normpath(os.path.join(update_root_path, f)) - target_item_path = os.path.normpath(os.path.join(target_dir, f)) + target_item_path = os.path.normpath(os.path.join(target_root_path, f)) + if not _is_within_directory(update_root_path, update_item_path): + raise ValueError("update item escapes extracted archive root") + if not _is_within_directory(target_root_path, target_item_path): + raise ValueError("target item escapes target directory") if os.path.isdir(update_item_path): if os.path.exists(target_item_path): shutil.rmtree(target_item_path, onerror=on_error) elif os.path.exists(target_item_path): os.remove(target_item_path) - shutil.move(update_item_path, target_dir) + shutil.move(update_item_path, target_root_path) try: - logger.debug( - f"删除临时更新文件: {zip_path} 和 {update_root_path}", + extract_log( + cleanup_log_template.format( + zip_path=zip_path, + suffix=f" 和 {update_root_path}", + ) ) shutil.rmtree(update_root_path, onerror=on_error) os.remove(zip_path) - except BaseException: + except Exception: logger.warning( - f"删除更新文件失败,可以手动删除 {zip_path} 和 {update_root_path}", + cleanup_warning_template.format( + zip_path=zip_path, + suffix=f" 和 {update_root_path}", + ), ) def format_name(self, name: str) -> str: diff --git a/tests/test_updator_socks.py b/tests/test_updator_socks.py index bba3110b82..6093c92ccf 100644 --- a/tests/test_updator_socks.py +++ b/tests/test_updator_socks.py @@ -131,6 +131,88 @@ def _build_fake_archive_entries(archive_root: str) -> list[str]: return [archive_root, posixpath.join(archive_root, ".dockerignore")] +def _exercise_unzip_file_windows_path_normalization( + monkeypatch: pytest.MonkeyPatch, + *, + updater_module, + zip_updator_module, + updater, + target_dir: str, + archive_root: str, + logger_method: str, +) -> dict[str, object | None]: + captured: dict[str, object | None] = { + "listdir": None, + "move": None, + "cleanup": None, + "removed": None, + } + + def fake_listdir(path: str) -> list[str]: + captured["listdir"] = path + return [".dockerignore"] + + monkeypatch.setattr(updater_module.os, "makedirs", lambda path, exist_ok=True: None) + monkeypatch.setattr(updater_module.os.path, "join", ntpath.join) + monkeypatch.setattr(updater_module.os.path, "normpath", ntpath.normpath) + monkeypatch.setattr(updater_module.os.path, "commonpath", ntpath.commonpath) + monkeypatch.setattr(updater_module.os.path, "isdir", lambda path: False) + monkeypatch.setattr(updater_module.os.path, "exists", lambda path: False) + monkeypatch.setattr( + updater_module.zipfile, + "ZipFile", + lambda path, mode: _FakeZipArchive(_build_fake_archive_entries(archive_root)), + ) + monkeypatch.setattr(updater_module.logger, logger_method, lambda message: None) + monkeypatch.setattr(updater_module.logger, "warning", lambda message: None) + monkeypatch.setattr(updater_module.os, "listdir", fake_listdir) + monkeypatch.setattr( + zip_updator_module.shutil, + "move", + lambda src, dst: captured.__setitem__("move", (src, dst)), + ) + monkeypatch.setattr( + zip_updator_module.shutil, + "rmtree", + lambda path, onerror=None: captured.__setitem__("cleanup", path), + ) + monkeypatch.setattr( + updater_module.os, + "remove", + lambda path: captured.__setitem__("removed", path), + ) + + updater.unzip_file("temp.zip", target_dir) + + return captured + + +def _assert_unzip_file_windows_path_normalization( + captured: dict[str, object | None], + *, + target_dir: str, + archive_root: str, +) -> None: + normalized_root = ntpath.normpath(archive_root) + expected_root = ( + target_dir + if normalized_root == "." + else ntpath.join(target_dir, normalized_root) + ) + expected_file = ntpath.join(expected_root, ".dockerignore") + + assert captured["removed"] == "temp.zip" + if normalized_root == ".": + assert captured["listdir"] is None + assert captured["move"] is None + assert captured["cleanup"] is None + return + + assert captured["listdir"] == expected_root + assert captured["move"] == (expected_file, target_dir) + assert captured["cleanup"] == expected_root + + def _build_fake_httpx_module(state: _FakeAsyncClientState) -> SimpleNamespace: class _FakeAsyncClient: def __init__(self, **kwargs): @@ -441,68 +523,20 @@ def test_repo_unzip_file_normalizes_windows_extended_length_paths( import astrbot.core.zip_updator as zip_updator_module target_dir = r"\\?\C:\Users\admin\AppData\Local\AstrBot\backend\app" - normalized_root = ntpath.normpath(archive_root) - expected_root = ( - target_dir - if normalized_root == "." - else ntpath.join(target_dir, normalized_root) + captured = _exercise_unzip_file_windows_path_normalization( + monkeypatch, + updater_module=zip_updator_module, + zip_updator_module=zip_updator_module, + updater=RepoZipUpdator(), + target_dir=target_dir, + archive_root=archive_root, + logger_method="debug", ) - expected_file = ntpath.join(expected_root, ".dockerignore") - captured: dict[str, object | None] = { - "listdir": None, - "move": None, - "cleanup": None, - "removed": None, - } - - def fake_listdir(path: str) -> list[str]: - captured["listdir"] = path - return [".dockerignore"] - monkeypatch.setattr( - zip_updator_module.os, "makedirs", lambda path, exist_ok=True: None - ) - monkeypatch.setattr(zip_updator_module.os.path, "join", ntpath.join) - monkeypatch.setattr(zip_updator_module.os.path, "normpath", ntpath.normpath) - monkeypatch.setattr(zip_updator_module.os.path, "isdir", lambda path: False) - monkeypatch.setattr(zip_updator_module.os.path, "exists", lambda path: False) - monkeypatch.setattr( - zip_updator_module.zipfile, - "ZipFile", - lambda path, mode: _FakeZipArchive(_build_fake_archive_entries(archive_root)), - ) - monkeypatch.setattr(zip_updator_module.logger, "debug", lambda message: None) - monkeypatch.setattr(zip_updator_module.logger, "warning", lambda message: None) - monkeypatch.setattr(zip_updator_module.os, "listdir", fake_listdir) - monkeypatch.setattr( - zip_updator_module.shutil, - "move", - lambda src, dst: captured.__setitem__("move", (src, dst)), - ) - monkeypatch.setattr( - zip_updator_module.shutil, - "rmtree", - lambda path, onerror=None: captured.__setitem__("cleanup", path), - ) - monkeypatch.setattr( - zip_updator_module.os, - "remove", - lambda path: captured.__setitem__("removed", path), + _assert_unzip_file_windows_path_normalization( + captured, target_dir=target_dir, archive_root=archive_root ) - RepoZipUpdator().unzip_file("temp.zip", target_dir) - - assert captured["removed"] == "temp.zip" - if normalized_root == ".": - assert captured["listdir"] is None - assert captured["move"] is None - assert captured["cleanup"] is None - return - - assert captured["listdir"] == expected_root - assert captured["move"] == (expected_file, target_dir) - assert captured["cleanup"] == expected_root - @pytest.mark.parametrize( "archive_root", @@ -518,66 +552,49 @@ def test_plugin_unzip_file_normalizes_windows_extended_length_paths( archive_root: str, ) -> None: import astrbot.core.star.updator as plugin_updator_module + import astrbot.core.zip_updator as zip_updator_module target_dir = r"\\?\C:\Users\admin\AppData\Local\AstrBot\data\plugins\demo" - normalized_root = ntpath.normpath(archive_root) - expected_root = ( - target_dir - if normalized_root == "." - else ntpath.join(target_dir, normalized_root) + captured = _exercise_unzip_file_windows_path_normalization( + monkeypatch, + updater_module=plugin_updator_module, + zip_updator_module=zip_updator_module, + updater=PluginUpdator.__new__(PluginUpdator), + target_dir=target_dir, + archive_root=archive_root, + logger_method="info", ) - expected_file = ntpath.join(expected_root, ".dockerignore") - captured: dict[str, object | None] = { - "listdir": None, - "move": None, - "cleanup": None, - "removed": None, - } - def fake_listdir(path: str) -> list[str]: - captured["listdir"] = path - return [".dockerignore"] + _assert_unzip_file_windows_path_normalization( + captured, target_dir=target_dir, archive_root=archive_root + ) + + +@pytest.mark.parametrize( + ("archive_root", "expected_error"), + [ + ("../escape/", "update root escapes target directory"), + ("C:/escape", "update root escapes target directory"), + ], +) +def test_repo_unzip_file_rejects_archive_roots_outside_target_dir( + monkeypatch: pytest.MonkeyPatch, + archive_root: str, + expected_error: str, +) -> None: + import astrbot.core.zip_updator as zip_updator_module monkeypatch.setattr( - plugin_updator_module.os, "makedirs", lambda path, exist_ok=True: None + zip_updator_module.os, "makedirs", lambda path, exist_ok=True: None ) - monkeypatch.setattr(plugin_updator_module.os.path, "join", ntpath.join) - monkeypatch.setattr(plugin_updator_module.os.path, "normpath", ntpath.normpath) - monkeypatch.setattr(plugin_updator_module.os.path, "isdir", lambda path: False) - monkeypatch.setattr(plugin_updator_module.os.path, "exists", lambda path: False) + monkeypatch.setattr(zip_updator_module.os.path, "join", ntpath.join) + monkeypatch.setattr(zip_updator_module.os.path, "normpath", ntpath.normpath) + monkeypatch.setattr(zip_updator_module.os.path, "commonpath", ntpath.commonpath) monkeypatch.setattr( - plugin_updator_module.zipfile, + zip_updator_module.zipfile, "ZipFile", lambda path, mode: _FakeZipArchive(_build_fake_archive_entries(archive_root)), ) - monkeypatch.setattr(plugin_updator_module.logger, "info", lambda message: None) - monkeypatch.setattr(plugin_updator_module.logger, "warning", lambda message: None) - monkeypatch.setattr(plugin_updator_module.os, "listdir", fake_listdir) - monkeypatch.setattr( - plugin_updator_module.shutil, - "move", - lambda src, dst: captured.__setitem__("move", (src, dst)), - ) - monkeypatch.setattr( - plugin_updator_module.shutil, - "rmtree", - lambda path, onerror=None: captured.__setitem__("cleanup", path), - ) - monkeypatch.setattr( - plugin_updator_module.os, - "remove", - lambda path: captured.__setitem__("removed", path), - ) - - PluginUpdator.__new__(PluginUpdator).unzip_file("temp.zip", target_dir) - assert captured["removed"] == "temp.zip" - if normalized_root == ".": - assert captured["listdir"] is None - assert captured["move"] is None - assert captured["cleanup"] is None - return - - assert captured["listdir"] == expected_root - assert captured["move"] == (expected_file, target_dir) - assert captured["cleanup"] == expected_root + with pytest.raises(ValueError, match=expected_error): + RepoZipUpdator().unzip_file("temp.zip", r"\\?\C:\Users\admin\target") From 0f1dd63491cdf21e6538e2f8c67e597bd6a386ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=82=B9=E6=B0=B8=E8=B5=AB?= <1259085392@qq.com> Date: Thu, 7 May 2026 09:19:25 +0900 Subject: [PATCH 6/8] refactor: simplify updater extraction cleanup --- astrbot/core/star/updator.py | 9 +------ astrbot/core/zip_updator.py | 49 +++++++++++------------------------- tests/test_updator_socks.py | 4 +-- 3 files changed, 18 insertions(+), 44 deletions(-) diff --git a/astrbot/core/star/updator.py b/astrbot/core/star/updator.py index e5d67a5b0f..d7e6120abe 100644 --- a/astrbot/core/star/updator.py +++ b/astrbot/core/star/updator.py @@ -77,11 +77,4 @@ def unzip_file(self, zip_path: str, target_dir: str) -> None: update_dir = normalize_archive_root_dir(z.namelist()[0]) z.extractall(target_dir) - self._finalize_extracted_archive( - zip_path, - target_dir, - update_dir, - extract_log=logger.info, - cleanup_log_template="Removing temporary files: {zip_path}{suffix}", - cleanup_warning_template="Failed to remove update files; you can manually delete {zip_path}{suffix}", - ) + self._finalize_extracted_archive(zip_path, target_dir, update_dir) diff --git a/astrbot/core/zip_updator.py b/astrbot/core/zip_updator.py index 2e0afe8e87..0f763177bc 100644 --- a/astrbot/core/zip_updator.py +++ b/astrbot/core/zip_updator.py @@ -25,6 +25,14 @@ def _is_within_directory(directory: str, path: str) -> bool: return False +def _safe_join_within(root: str, *parts: str) -> str: + root_norm = os.path.normpath(root) + path = os.path.normpath(os.path.join(root_norm, *parts)) + if not _is_within_directory(root_norm, path): + raise ValueError("path escapes root directory") + return path + + class ReleaseInfo: version: str published_at: str @@ -251,24 +259,13 @@ def unzip_file(self, zip_path: str, target_dir: str) -> None: z.extractall(target_dir) logger.debug(f"解压文件完成: {zip_path}") - self._finalize_extracted_archive( - zip_path, - target_dir, - update_dir, - extract_log=logger.debug, - cleanup_warning_template="删除更新文件失败,可以手动删除 {zip_path}{suffix}", - cleanup_log_template="删除临时更新文件: {zip_path}{suffix}", - ) + self._finalize_extracted_archive(zip_path, target_dir, update_dir) def _finalize_extracted_archive( self, zip_path: str, target_dir: str, update_dir: str, - *, - extract_log, - cleanup_log_template: str, - cleanup_warning_template: str, ) -> None: target_root_path = os.path.normpath(target_dir) @@ -276,23 +273,15 @@ def _finalize_extracted_archive( try: os.remove(zip_path) except Exception: - logger.warning( - cleanup_warning_template.format(zip_path=zip_path, suffix="") - ) + logger.warning(f"删除更新文件失败,可以手动删除 {zip_path}") return - update_root_path = os.path.normpath(os.path.join(target_root_path, update_dir)) - if not _is_within_directory(target_root_path, update_root_path): - raise ValueError("update root escapes target directory") + update_root_path = _safe_join_within(target_root_path, update_dir) files = os.listdir(update_root_path) for f in files: - update_item_path = os.path.normpath(os.path.join(update_root_path, f)) - target_item_path = os.path.normpath(os.path.join(target_root_path, f)) - if not _is_within_directory(update_root_path, update_item_path): - raise ValueError("update item escapes extracted archive root") - if not _is_within_directory(target_root_path, target_item_path): - raise ValueError("target item escapes target directory") + update_item_path = _safe_join_within(update_root_path, f) + target_item_path = _safe_join_within(target_root_path, f) if os.path.isdir(update_item_path): if os.path.exists(target_item_path): shutil.rmtree(target_item_path, onerror=on_error) @@ -301,20 +290,12 @@ def _finalize_extracted_archive( shutil.move(update_item_path, target_root_path) try: - extract_log( - cleanup_log_template.format( - zip_path=zip_path, - suffix=f" 和 {update_root_path}", - ) - ) + logger.debug(f"删除临时更新文件: {zip_path} 和 {update_root_path}") shutil.rmtree(update_root_path, onerror=on_error) os.remove(zip_path) except Exception: logger.warning( - cleanup_warning_template.format( - zip_path=zip_path, - suffix=f" 和 {update_root_path}", - ), + f"删除更新文件失败,可以手动删除 {zip_path} 和 {update_root_path}" ) def format_name(self, name: str) -> str: diff --git a/tests/test_updator_socks.py b/tests/test_updator_socks.py index 6093c92ccf..86a4793a76 100644 --- a/tests/test_updator_socks.py +++ b/tests/test_updator_socks.py @@ -573,8 +573,8 @@ def test_plugin_unzip_file_normalizes_windows_extended_length_paths( @pytest.mark.parametrize( ("archive_root", "expected_error"), [ - ("../escape/", "update root escapes target directory"), - ("C:/escape", "update root escapes target directory"), + ("../escape/", "path escapes root directory"), + ("C:/escape", "path escapes root directory"), ], ) def test_repo_unzip_file_rejects_archive_roots_outside_target_dir( From b7f8be9e40059efecc115f4d45121c281ab1eb98 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=82=B9=E6=B0=B8=E8=B5=AB?= <1259085392@qq.com> Date: Thu, 7 May 2026 09:24:35 +0900 Subject: [PATCH 7/8] refactor: inline updater root normalization --- astrbot/core/star/updator.py | 4 ++-- astrbot/core/zip_updator.py | 38 +++++++++++++----------------------- 2 files changed, 16 insertions(+), 26 deletions(-) diff --git a/astrbot/core/star/updator.py b/astrbot/core/star/updator.py index d7e6120abe..f4f9f205b9 100644 --- a/astrbot/core/star/updator.py +++ b/astrbot/core/star/updator.py @@ -4,7 +4,6 @@ from astrbot.core import logger from astrbot.core.utils.astrbot_path import get_astrbot_plugin_path from astrbot.core.utils.io import ensure_dir, remove_dir -from astrbot.core.zip_updator import normalize_archive_root_dir from ..star.star import StarMetadata from ..updator import RepoZipUpdator @@ -74,7 +73,8 @@ def unzip_file(self, zip_path: str, target_dir: str) -> None: ensure_dir(target_dir) logger.info(f"Extracting archive: {zip_path}") with zipfile.ZipFile(zip_path, "r") as z: - update_dir = normalize_archive_root_dir(z.namelist()[0]) + first_entry = os.path.normpath(z.namelist()[0]) + update_dir = "" if first_entry == "." else first_entry z.extractall(target_dir) self._finalize_extracted_archive(zip_path, target_dir, update_dir) diff --git a/astrbot/core/zip_updator.py b/astrbot/core/zip_updator.py index 0f763177bc..74d50052c1 100644 --- a/astrbot/core/zip_updator.py +++ b/astrbot/core/zip_updator.py @@ -13,26 +13,6 @@ from astrbot.core.utils.version_comparator import VersionComparator -def normalize_archive_root_dir(path: str) -> str: - normalized = os.path.normpath(path) - return "" if normalized == "." else normalized - - -def _is_within_directory(directory: str, path: str) -> bool: - try: - return os.path.commonpath([directory, path]) == directory - except ValueError: - return False - - -def _safe_join_within(root: str, *parts: str) -> str: - root_norm = os.path.normpath(root) - path = os.path.normpath(os.path.join(root_norm, *parts)) - if not _is_within_directory(root_norm, path): - raise ValueError("path escapes root directory") - return path - - class ReleaseInfo: version: str published_at: str @@ -255,7 +235,8 @@ def unzip_file(self, zip_path: str, target_dir: str) -> None: """解压缩文件, 并将压缩包内**第一个**文件夹内的文件移动到 target_dir""" ensure_dir(target_dir) with zipfile.ZipFile(zip_path, "r") as z: - update_dir = normalize_archive_root_dir(z.namelist()[0]) + first_entry = os.path.normpath(z.namelist()[0]) + update_dir = "" if first_entry == "." else first_entry z.extractall(target_dir) logger.debug(f"解压文件完成: {zip_path}") @@ -269,6 +250,15 @@ def _finalize_extracted_archive( ) -> None: target_root_path = os.path.normpath(target_dir) + def _join_under_root(root: str, *parts: str) -> str: + path = os.path.normpath(os.path.join(root, *parts)) + try: + if os.path.commonpath([root, path]) != root: + raise ValueError("path escapes root directory") + except ValueError as exc: + raise ValueError("path escapes root directory") from exc + return path + if not update_dir: try: os.remove(zip_path) @@ -276,12 +266,12 @@ def _finalize_extracted_archive( logger.warning(f"删除更新文件失败,可以手动删除 {zip_path}") return - update_root_path = _safe_join_within(target_root_path, update_dir) + update_root_path = _join_under_root(target_root_path, update_dir) files = os.listdir(update_root_path) for f in files: - update_item_path = _safe_join_within(update_root_path, f) - target_item_path = _safe_join_within(target_root_path, f) + update_item_path = _join_under_root(update_root_path, f) + target_item_path = _join_under_root(target_root_path, f) if os.path.isdir(update_item_path): if os.path.exists(target_item_path): shutil.rmtree(target_item_path, onerror=on_error) From 448c4d4baa87a9acb438c603fdd9729ffd8e1a44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=82=B9=E6=B0=B8=E8=B5=AB?= <1259085392@qq.com> Date: Thu, 7 May 2026 09:37:30 +0900 Subject: [PATCH 8/8] fix: infer archive root from zip entries --- astrbot/core/star/updator.py | 3 +- astrbot/core/zip_updator.py | 33 ++++++++++++++++-- tests/test_updator_socks.py | 66 ++++++++++++++++++++++++++++++++++++ 3 files changed, 98 insertions(+), 4 deletions(-) diff --git a/astrbot/core/star/updator.py b/astrbot/core/star/updator.py index f4f9f205b9..c647779069 100644 --- a/astrbot/core/star/updator.py +++ b/astrbot/core/star/updator.py @@ -73,8 +73,7 @@ def unzip_file(self, zip_path: str, target_dir: str) -> None: ensure_dir(target_dir) logger.info(f"Extracting archive: {zip_path}") with zipfile.ZipFile(zip_path, "r") as z: - first_entry = os.path.normpath(z.namelist()[0]) - update_dir = "" if first_entry == "." else first_entry + update_dir = self._resolve_archive_root_dir(z.namelist()) z.extractall(target_dir) self._finalize_extracted_archive(zip_path, target_dir, update_dir) diff --git a/astrbot/core/zip_updator.py b/astrbot/core/zip_updator.py index 74d50052c1..15096cc81e 100644 --- a/astrbot/core/zip_updator.py +++ b/astrbot/core/zip_updator.py @@ -235,13 +235,42 @@ def unzip_file(self, zip_path: str, target_dir: str) -> None: """解压缩文件, 并将压缩包内**第一个**文件夹内的文件移动到 target_dir""" ensure_dir(target_dir) with zipfile.ZipFile(zip_path, "r") as z: - first_entry = os.path.normpath(z.namelist()[0]) - update_dir = "" if first_entry == "." else first_entry + update_dir = self._resolve_archive_root_dir(z.namelist()) z.extractall(target_dir) logger.debug(f"解压文件完成: {zip_path}") self._finalize_extracted_archive(zip_path, target_dir, update_dir) + @staticmethod + def _resolve_archive_root_dir(entries: list[str]) -> str: + normalized_entries = [os.path.normpath(entry) for entry in entries] + portable_entries = [entry.replace("\\", "/") for entry in normalized_entries] + root_candidates: list[str] = [] + + for raw_entry, normalized_entry, portable_entry in zip( + entries, normalized_entries, portable_entries + ): + if normalized_entry == ".": + continue + + has_children = any( + other_entry != portable_entry + and other_entry.startswith(f"{portable_entry}/") + for other_entry in portable_entries + ) + if raw_entry.endswith(("/", "\\")) or has_children: + root_candidates.append(normalized_entry) + continue + + parent_portable, _, _ = portable_entry.rpartition("/") + if not parent_portable: + return "" + root_candidates.append(parent_portable.replace("/", os.sep)) + + if not root_candidates: + return "" + return os.path.commonpath(root_candidates) + def _finalize_extracted_archive( self, zip_path: str, diff --git a/tests/test_updator_socks.py b/tests/test_updator_socks.py index 86a4793a76..dc4c6e2502 100644 --- a/tests/test_updator_socks.py +++ b/tests/test_updator_socks.py @@ -131,6 +131,10 @@ def _build_fake_archive_entries(archive_root: str) -> list[str]: return [archive_root, posixpath.join(archive_root, ".dockerignore")] +def _build_fake_archive_entries_with_first_file(root_dir: str) -> list[str]: + return [f"{root_dir}/README.md", f"{root_dir}/src/app.py"] + + def _exercise_unzip_file_windows_path_normalization( monkeypatch: pytest.MonkeyPatch, *, @@ -598,3 +602,65 @@ def test_repo_unzip_file_rejects_archive_roots_outside_target_dir( with pytest.raises(ValueError, match=expected_error): RepoZipUpdator().unzip_file("temp.zip", r"\\?\C:\Users\admin\target") + + +def test_repo_unzip_file_handles_archives_without_explicit_root_dir_entry( + monkeypatch: pytest.MonkeyPatch, +) -> None: + import astrbot.core.zip_updator as zip_updator_module + + target_dir = r"\\?\C:\Users\admin\AppData\Local\AstrBot\backend\app" + archive_root = "repo-root" + expected_root = ntpath.join(target_dir, archive_root) + expected_file = ntpath.join(expected_root, "README.md") + captured: dict[str, object | None] = { + "listdir": None, + "move": None, + "cleanup": None, + "removed": None, + } + + def fake_listdir(path: str) -> list[str]: + captured["listdir"] = path + return ["README.md"] + + monkeypatch.setattr( + zip_updator_module.os, "makedirs", lambda path, exist_ok=True: None + ) + monkeypatch.setattr(zip_updator_module.os.path, "join", ntpath.join) + monkeypatch.setattr(zip_updator_module.os.path, "normpath", ntpath.normpath) + monkeypatch.setattr(zip_updator_module.os.path, "commonpath", ntpath.commonpath) + monkeypatch.setattr(zip_updator_module.os.path, "isdir", lambda path: False) + monkeypatch.setattr(zip_updator_module.os.path, "exists", lambda path: False) + monkeypatch.setattr( + zip_updator_module.zipfile, + "ZipFile", + lambda path, mode: _FakeZipArchive( + _build_fake_archive_entries_with_first_file(archive_root) + ), + ) + monkeypatch.setattr(zip_updator_module.logger, "debug", lambda message: None) + monkeypatch.setattr(zip_updator_module.logger, "warning", lambda message: None) + monkeypatch.setattr(zip_updator_module.os, "listdir", fake_listdir) + monkeypatch.setattr( + zip_updator_module.shutil, + "move", + lambda src, dst: captured.__setitem__("move", (src, dst)), + ) + monkeypatch.setattr( + zip_updator_module.shutil, + "rmtree", + lambda path, onerror=None: captured.__setitem__("cleanup", path), + ) + monkeypatch.setattr( + zip_updator_module.os, + "remove", + lambda path: captured.__setitem__("removed", path), + ) + + RepoZipUpdator().unzip_file("temp.zip", target_dir) + + assert captured["listdir"] == expected_root + assert captured["move"] == (expected_file, target_dir) + assert captured["cleanup"] == expected_root + assert captured["removed"] == "temp.zip"