diff --git a/astrbot/core/star/updator.py b/astrbot/core/star/updator.py index cb462d8b2c..90529732e0 100644 --- a/astrbot/core/star/updator.py +++ b/astrbot/core/star/updator.py @@ -5,6 +5,7 @@ from astrbot.core import logger from astrbot.core.utils.astrbot_path import get_astrbot_plugin_path from astrbot.core.utils.io import ensure_dir, on_error, remove_dir +from astrbot.core.zip_updator import normalize_archive_root_dir from ..star.star import StarMetadata from ..updator import RepoZipUpdator @@ -75,25 +76,37 @@ def unzip_file(self, zip_path: str, target_dir: str) -> None: update_dir = "" logger.info(f"Extracting archive: {zip_path}") with zipfile.ZipFile(zip_path, "r") as z: - update_dir = z.namelist()[0] + update_dir = normalize_archive_root_dir(z.namelist()[0]) z.extractall(target_dir) - files = os.listdir(os.path.join(target_dir, update_dir)) + if not update_dir: + try: + os.remove(zip_path) + except BaseException: + logger.warning( + f"Failed to remove update files; you can manually delete {zip_path}", + ) + return + + update_root_path = os.path.normpath(os.path.join(target_dir, update_dir)) + files = os.listdir(update_root_path) for f in files: - if os.path.isdir(os.path.join(target_dir, update_dir, f)): - if os.path.exists(os.path.join(target_dir, f)): - shutil.rmtree(os.path.join(target_dir, f), onerror=on_error) - elif os.path.exists(os.path.join(target_dir, f)): - os.remove(os.path.join(target_dir, f)) - shutil.move(os.path.join(target_dir, update_dir, f), target_dir) + update_item_path = os.path.normpath(os.path.join(update_root_path, f)) + target_item_path = os.path.normpath(os.path.join(target_dir, f)) + if os.path.isdir(update_item_path): + if os.path.exists(target_item_path): + shutil.rmtree(target_item_path, onerror=on_error) + elif os.path.exists(target_item_path): + os.remove(target_item_path) + shutil.move(update_item_path, target_dir) try: logger.info( - f"Removing temporary files: {zip_path} and {os.path.join(target_dir, update_dir)}", + f"Removing temporary files: {zip_path} and {update_root_path}", ) - shutil.rmtree(os.path.join(target_dir, update_dir), onerror=on_error) + shutil.rmtree(update_root_path, onerror=on_error) os.remove(zip_path) except BaseException: logger.warning( - f"Failed to remove update files; you can manually delete {zip_path} and {os.path.join(target_dir, update_dir)}", + f"Failed to remove update files; you can manually delete {zip_path} and {update_root_path}", ) diff --git a/astrbot/core/zip_updator.py b/astrbot/core/zip_updator.py index 1408a1c454..9795de5e80 100644 --- a/astrbot/core/zip_updator.py +++ b/astrbot/core/zip_updator.py @@ -13,6 +13,11 @@ from astrbot.core.utils.version_comparator import VersionComparator +def normalize_archive_root_dir(path: str) -> str: + normalized = os.path.normpath(path) + return "" if normalized == "." else normalized + + class ReleaseInfo: version: str published_at: str @@ -236,28 +241,38 @@ def unzip_file(self, zip_path: str, target_dir: str) -> None: ensure_dir(target_dir) update_dir = "" with zipfile.ZipFile(zip_path, "r") as z: - update_dir = z.namelist()[0] + update_dir = normalize_archive_root_dir(z.namelist()[0]) z.extractall(target_dir) logger.debug(f"解压文件完成: {zip_path}") - files = os.listdir(os.path.join(target_dir, update_dir)) + if not update_dir: + try: + os.remove(zip_path) + except BaseException: + logger.warning(f"删除更新文件失败,可以手动删除 {zip_path}") + return + + update_root_path = os.path.normpath(os.path.join(target_dir, update_dir)) + files = os.listdir(update_root_path) for f in files: - if os.path.isdir(os.path.join(target_dir, update_dir, f)): - if os.path.exists(os.path.join(target_dir, f)): - shutil.rmtree(os.path.join(target_dir, f), onerror=on_error) - elif os.path.exists(os.path.join(target_dir, f)): - os.remove(os.path.join(target_dir, f)) - shutil.move(os.path.join(target_dir, update_dir, f), target_dir) + update_item_path = os.path.normpath(os.path.join(update_root_path, f)) + target_item_path = os.path.normpath(os.path.join(target_dir, f)) + if os.path.isdir(update_item_path): + if os.path.exists(target_item_path): + shutil.rmtree(target_item_path, onerror=on_error) + elif os.path.exists(target_item_path): + os.remove(target_item_path) + shutil.move(update_item_path, target_dir) try: logger.debug( - f"删除临时更新文件: {zip_path} 和 {os.path.join(target_dir, update_dir)}", + f"删除临时更新文件: {zip_path} 和 {update_root_path}", ) - shutil.rmtree(os.path.join(target_dir, update_dir), onerror=on_error) + shutil.rmtree(update_root_path, onerror=on_error) os.remove(zip_path) except BaseException: logger.warning( - f"删除更新文件失败,可以手动删除 {zip_path} 和 {os.path.join(target_dir, update_dir)}", + f"删除更新文件失败,可以手动删除 {zip_path} 和 {update_root_path}", ) def format_name(self, name: str) -> str: diff --git a/tests/test_updator_socks.py b/tests/test_updator_socks.py index 11009ff85c..bba3110b82 100644 --- a/tests/test_updator_socks.py +++ b/tests/test_updator_socks.py @@ -1,3 +1,5 @@ +import ntpath +import posixpath from dataclasses import dataclass, field from pathlib import Path from types import SimpleNamespace @@ -108,6 +110,27 @@ def stream(self, method: str, url: str): # noqa: ARG002 return _FakeFailingStreamResponse() +class _FakeZipArchive: + def __init__(self, names: list[str]): + self._names = names + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb) -> None: + return None + + def namelist(self) -> list[str]: + return self._names + + def extractall(self, target_dir: str) -> None: # noqa: ARG002 + return None + + +def _build_fake_archive_entries(archive_root: str) -> list[str]: + return [archive_root, posixpath.join(archive_root, ".dockerignore")] + + def _build_fake_httpx_module(state: _FakeAsyncClientState) -> SimpleNamespace: class _FakeAsyncClient: def __init__(self, **kwargs): @@ -400,3 +423,161 @@ async def test_download_file_logs_url_and_target_path_on_failure( assert any(url in message for message in log_messages) assert any(str(target_path) in message for message in log_messages) + + +@pytest.mark.parametrize( + "archive_root", + [ + "AstrBotDevs-AstrBot-39386ee/", + "AstrBotDevs-AstrBot-39386ee", + "owner-repo-branch/subdir/", + ".", + ], +) +def test_repo_unzip_file_normalizes_windows_extended_length_paths( + monkeypatch: pytest.MonkeyPatch, + archive_root: str, +) -> None: + import astrbot.core.zip_updator as zip_updator_module + + target_dir = r"\\?\C:\Users\admin\AppData\Local\AstrBot\backend\app" + normalized_root = ntpath.normpath(archive_root) + expected_root = ( + target_dir + if normalized_root == "." + else ntpath.join(target_dir, normalized_root) + ) + expected_file = ntpath.join(expected_root, ".dockerignore") + captured: dict[str, object | None] = { + "listdir": None, + "move": None, + "cleanup": None, + "removed": None, + } + + def fake_listdir(path: str) -> list[str]: + captured["listdir"] = path + return [".dockerignore"] + + monkeypatch.setattr( + zip_updator_module.os, "makedirs", lambda path, exist_ok=True: None + ) + monkeypatch.setattr(zip_updator_module.os.path, "join", ntpath.join) + monkeypatch.setattr(zip_updator_module.os.path, "normpath", ntpath.normpath) + monkeypatch.setattr(zip_updator_module.os.path, "isdir", lambda path: False) + monkeypatch.setattr(zip_updator_module.os.path, "exists", lambda path: False) + monkeypatch.setattr( + zip_updator_module.zipfile, + "ZipFile", + lambda path, mode: _FakeZipArchive(_build_fake_archive_entries(archive_root)), + ) + monkeypatch.setattr(zip_updator_module.logger, "debug", lambda message: None) + monkeypatch.setattr(zip_updator_module.logger, "warning", lambda message: None) + monkeypatch.setattr(zip_updator_module.os, "listdir", fake_listdir) + monkeypatch.setattr( + zip_updator_module.shutil, + "move", + lambda src, dst: captured.__setitem__("move", (src, dst)), + ) + monkeypatch.setattr( + zip_updator_module.shutil, + "rmtree", + lambda path, onerror=None: captured.__setitem__("cleanup", path), + ) + monkeypatch.setattr( + zip_updator_module.os, + "remove", + lambda path: captured.__setitem__("removed", path), + ) + + RepoZipUpdator().unzip_file("temp.zip", target_dir) + + assert captured["removed"] == "temp.zip" + if normalized_root == ".": + assert captured["listdir"] is None + assert captured["move"] is None + assert captured["cleanup"] is None + return + + assert captured["listdir"] == expected_root + assert captured["move"] == (expected_file, target_dir) + assert captured["cleanup"] == expected_root + + +@pytest.mark.parametrize( + "archive_root", + [ + "AstrBotDevs-demo-39386ee/", + "AstrBotDevs-demo-39386ee", + "owner-repo-branch/subdir/", + ".", + ], +) +def test_plugin_unzip_file_normalizes_windows_extended_length_paths( + monkeypatch: pytest.MonkeyPatch, + archive_root: str, +) -> None: + import astrbot.core.star.updator as plugin_updator_module + + target_dir = r"\\?\C:\Users\admin\AppData\Local\AstrBot\data\plugins\demo" + normalized_root = ntpath.normpath(archive_root) + expected_root = ( + target_dir + if normalized_root == "." + else ntpath.join(target_dir, normalized_root) + ) + expected_file = ntpath.join(expected_root, ".dockerignore") + captured: dict[str, object | None] = { + "listdir": None, + "move": None, + "cleanup": None, + "removed": None, + } + + def fake_listdir(path: str) -> list[str]: + captured["listdir"] = path + return [".dockerignore"] + + monkeypatch.setattr( + plugin_updator_module.os, "makedirs", lambda path, exist_ok=True: None + ) + monkeypatch.setattr(plugin_updator_module.os.path, "join", ntpath.join) + monkeypatch.setattr(plugin_updator_module.os.path, "normpath", ntpath.normpath) + monkeypatch.setattr(plugin_updator_module.os.path, "isdir", lambda path: False) + monkeypatch.setattr(plugin_updator_module.os.path, "exists", lambda path: False) + monkeypatch.setattr( + plugin_updator_module.zipfile, + "ZipFile", + lambda path, mode: _FakeZipArchive(_build_fake_archive_entries(archive_root)), + ) + monkeypatch.setattr(plugin_updator_module.logger, "info", lambda message: None) + monkeypatch.setattr(plugin_updator_module.logger, "warning", lambda message: None) + monkeypatch.setattr(plugin_updator_module.os, "listdir", fake_listdir) + monkeypatch.setattr( + plugin_updator_module.shutil, + "move", + lambda src, dst: captured.__setitem__("move", (src, dst)), + ) + monkeypatch.setattr( + plugin_updator_module.shutil, + "rmtree", + lambda path, onerror=None: captured.__setitem__("cleanup", path), + ) + monkeypatch.setattr( + plugin_updator_module.os, + "remove", + lambda path: captured.__setitem__("removed", path), + ) + + PluginUpdator.__new__(PluginUpdator).unzip_file("temp.zip", target_dir) + + assert captured["removed"] == "temp.zip" + if normalized_root == ".": + assert captured["listdir"] is None + assert captured["move"] is None + assert captured["cleanup"] is None + return + + assert captured["listdir"] == expected_root + assert captured["move"] == (expected_file, target_dir) + assert captured["cleanup"] == expected_root