Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions astrbot/core/star/updator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from astrbot.core import logger
from astrbot.core.utils.astrbot_path import get_astrbot_plugin_path
from astrbot.core.utils.io import ensure_dir, on_error, remove_dir
from astrbot.core.zip_updator import normalize_archive_root_dir

from ..star.star import StarMetadata
from ..updator import RepoZipUpdator
Expand Down Expand Up @@ -75,25 +76,37 @@ def unzip_file(self, zip_path: str, target_dir: str) -> None:
update_dir = ""
logger.info(f"Extracting archive: {zip_path}")
with zipfile.ZipFile(zip_path, "r") as z:
update_dir = z.namelist()[0]
update_dir = normalize_archive_root_dir(z.namelist()[0])
z.extractall(target_dir)

files = os.listdir(os.path.join(target_dir, update_dir))
if not update_dir:
try:
os.remove(zip_path)
except BaseException:
logger.warning(
f"Failed to remove update files; you can manually delete {zip_path}",
)
return

update_root_path = os.path.normpath(os.path.join(target_dir, update_dir))
files = os.listdir(update_root_path)
for f in files:
if os.path.isdir(os.path.join(target_dir, update_dir, f)):
if os.path.exists(os.path.join(target_dir, f)):
shutil.rmtree(os.path.join(target_dir, f), onerror=on_error)
elif os.path.exists(os.path.join(target_dir, f)):
os.remove(os.path.join(target_dir, f))
shutil.move(os.path.join(target_dir, update_dir, f), target_dir)
update_item_path = os.path.normpath(os.path.join(update_root_path, f))
target_item_path = os.path.normpath(os.path.join(target_dir, f))
if os.path.isdir(update_item_path):
if os.path.exists(target_item_path):
shutil.rmtree(target_item_path, onerror=on_error)
elif os.path.exists(target_item_path):
os.remove(target_item_path)
shutil.move(update_item_path, target_dir)

try:
logger.info(
f"Removing temporary files: {zip_path} and {os.path.join(target_dir, update_dir)}",
f"Removing temporary files: {zip_path} and {update_root_path}",
)
shutil.rmtree(os.path.join(target_dir, update_dir), onerror=on_error)
shutil.rmtree(update_root_path, onerror=on_error)
os.remove(zip_path)
except BaseException:
logger.warning(
f"Failed to remove update files; you can manually delete {zip_path} and {os.path.join(target_dir, update_dir)}",
f"Failed to remove update files; you can manually delete {zip_path} and {update_root_path}",
)
37 changes: 26 additions & 11 deletions astrbot/core/zip_updator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
from astrbot.core.utils.version_comparator import VersionComparator


def normalize_archive_root_dir(path: str) -> str:
normalized = os.path.normpath(path)
return "" if normalized == "." else normalized


class ReleaseInfo:
version: str
published_at: str
Expand Down Expand Up @@ -236,28 +241,38 @@ def unzip_file(self, zip_path: str, target_dir: str) -> None:
ensure_dir(target_dir)
update_dir = ""
with zipfile.ZipFile(zip_path, "r") as z:
update_dir = z.namelist()[0]
update_dir = normalize_archive_root_dir(z.namelist()[0])
z.extractall(target_dir)
logger.debug(f"解压文件完成: {zip_path}")

files = os.listdir(os.path.join(target_dir, update_dir))
if not update_dir:
try:
os.remove(zip_path)
except BaseException:
logger.warning(f"删除更新文件失败,可以手动删除 {zip_path}")
return

update_root_path = os.path.normpath(os.path.join(target_dir, update_dir))
files = os.listdir(update_root_path)
for f in files:
if os.path.isdir(os.path.join(target_dir, update_dir, f)):
if os.path.exists(os.path.join(target_dir, f)):
shutil.rmtree(os.path.join(target_dir, f), onerror=on_error)
elif os.path.exists(os.path.join(target_dir, f)):
os.remove(os.path.join(target_dir, f))
shutil.move(os.path.join(target_dir, update_dir, f), target_dir)
update_item_path = os.path.normpath(os.path.join(update_root_path, f))
target_item_path = os.path.normpath(os.path.join(target_dir, f))
if os.path.isdir(update_item_path):
if os.path.exists(target_item_path):
shutil.rmtree(target_item_path, onerror=on_error)
elif os.path.exists(target_item_path):
os.remove(target_item_path)
shutil.move(update_item_path, target_dir)

try:
logger.debug(
f"删除临时更新文件: {zip_path} 和 {os.path.join(target_dir, update_dir)}",
f"删除临时更新文件: {zip_path} 和 {update_root_path}",
)
shutil.rmtree(os.path.join(target_dir, update_dir), onerror=on_error)
shutil.rmtree(update_root_path, onerror=on_error)
os.remove(zip_path)
except BaseException:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Catching BaseException is overly broad as it includes system-exiting exceptions like SystemExit and KeyboardInterrupt. It is recommended to catch Exception instead to handle standard runtime errors during cleanup.

Suggested change
except BaseException:
except Exception:

logger.warning(
f"删除更新文件失败,可以手动删除 {zip_path} 和 {os.path.join(target_dir, update_dir)}",
f"删除更新文件失败,可以手动删除 {zip_path} 和 {update_root_path}",
)

def format_name(self, name: str) -> str:
Expand Down
181 changes: 181 additions & 0 deletions tests/test_updator_socks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import ntpath
import posixpath
from dataclasses import dataclass, field
from pathlib import Path
from types import SimpleNamespace
Expand Down Expand Up @@ -108,6 +110,27 @@ def stream(self, method: str, url: str): # noqa: ARG002
return _FakeFailingStreamResponse()


class _FakeZipArchive:
def __init__(self, names: list[str]):
self._names = names

def __enter__(self):
return self

def __exit__(self, exc_type, exc, tb) -> None:
return None

def namelist(self) -> list[str]:
return self._names

def extractall(self, target_dir: str) -> None: # noqa: ARG002
return None


def _build_fake_archive_entries(archive_root: str) -> list[str]:
return [archive_root, posixpath.join(archive_root, ".dockerignore")]


def _build_fake_httpx_module(state: _FakeAsyncClientState) -> SimpleNamespace:
class _FakeAsyncClient:
def __init__(self, **kwargs):
Expand Down Expand Up @@ -400,3 +423,161 @@ async def test_download_file_logs_url_and_target_path_on_failure(

assert any(url in message for message in log_messages)
assert any(str(target_path) in message for message in log_messages)


@pytest.mark.parametrize(
"archive_root",
[
"AstrBotDevs-AstrBot-39386ee/",
"AstrBotDevs-AstrBot-39386ee",
"owner-repo-branch/subdir/",
".",
],
)
def test_repo_unzip_file_normalizes_windows_extended_length_paths(
monkeypatch: pytest.MonkeyPatch,
archive_root: str,
) -> None:
import astrbot.core.zip_updator as zip_updator_module

target_dir = r"\\?\C:\Users\admin\AppData\Local\AstrBot\backend\app"
normalized_root = ntpath.normpath(archive_root)
expected_root = (
target_dir
if normalized_root == "."
else ntpath.join(target_dir, normalized_root)
)
expected_file = ntpath.join(expected_root, ".dockerignore")
captured: dict[str, object | None] = {
"listdir": None,
"move": None,
"cleanup": None,
"removed": None,
}

def fake_listdir(path: str) -> list[str]:
captured["listdir"] = path
return [".dockerignore"]

monkeypatch.setattr(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Assert that the zip file itself is removed, or avoid capturing it if you don’t use it

In test_repo_unzip_file_normalizes_windows_extended_length_paths, os.remove is monkeypatched to capture the "removed" path, but this value is never asserted. Please either assert that captured["removed"] == "temp.zip" to confirm the archive is cleaned up, or remove the monkeypatch and capture if it’s not needed.

Suggested implementation:

    expected_root = ntpath.normpath(ntpath.join(target_dir, archive_root))
    expected_file = ntpath.normpath(
        ntpath.join(target_dir, archive_root, ".dockerignore")
    )
    # used to capture side effects like listdir path and removed zip archive
    captured: dict[str, object] = {"removed": None}

In test_repo_unzip_file_normalizes_windows_extended_length_paths:

  1. Ensure os.remove is monkeypatched to populate captured["removed"], e.g.:

    monkeypatch.setattr(
        zip_updator_module.os,
        "remove",
        lambda path: captured.__setitem__("removed", path),
    )
  2. After the function under test has been invoked and any other assertions on expected_root / expected_file are made, add:

    assert captured["removed"] == "temp.zip"

If you prefer not to assert on removal, instead delete the os.remove monkeypatch and any use of captured["removed"] so the unnecessary capture is removed entirely.

zip_updator_module.os, "makedirs", lambda path, exist_ok=True: None
)
monkeypatch.setattr(zip_updator_module.os.path, "join", ntpath.join)
monkeypatch.setattr(zip_updator_module.os.path, "normpath", ntpath.normpath)
monkeypatch.setattr(zip_updator_module.os.path, "isdir", lambda path: False)
monkeypatch.setattr(zip_updator_module.os.path, "exists", lambda path: False)
monkeypatch.setattr(
zip_updator_module.zipfile,
"ZipFile",
lambda path, mode: _FakeZipArchive(_build_fake_archive_entries(archive_root)),
)
monkeypatch.setattr(zip_updator_module.logger, "debug", lambda message: None)
monkeypatch.setattr(zip_updator_module.logger, "warning", lambda message: None)
monkeypatch.setattr(zip_updator_module.os, "listdir", fake_listdir)
monkeypatch.setattr(
zip_updator_module.shutil,
"move",
lambda src, dst: captured.__setitem__("move", (src, dst)),
)
monkeypatch.setattr(
zip_updator_module.shutil,
"rmtree",
lambda path, onerror=None: captured.__setitem__("cleanup", path),
)
monkeypatch.setattr(
zip_updator_module.os,
"remove",
lambda path: captured.__setitem__("removed", path),
)

RepoZipUpdator().unzip_file("temp.zip", target_dir)

assert captured["removed"] == "temp.zip"
if normalized_root == ".":
assert captured["listdir"] is None
assert captured["move"] is None
assert captured["cleanup"] is None
return

assert captured["listdir"] == expected_root
assert captured["move"] == (expected_file, target_dir)
assert captured["cleanup"] == expected_root


@pytest.mark.parametrize(
"archive_root",
[
"AstrBotDevs-demo-39386ee/",
"AstrBotDevs-demo-39386ee",
"owner-repo-branch/subdir/",
".",
],
)
def test_plugin_unzip_file_normalizes_windows_extended_length_paths(
monkeypatch: pytest.MonkeyPatch,
archive_root: str,
) -> None:
import astrbot.core.star.updator as plugin_updator_module

target_dir = r"\\?\C:\Users\admin\AppData\Local\AstrBot\data\plugins\demo"
normalized_root = ntpath.normpath(archive_root)
expected_root = (
target_dir
if normalized_root == "."
else ntpath.join(target_dir, normalized_root)
)
expected_file = ntpath.join(expected_root, ".dockerignore")
captured: dict[str, object | None] = {
"listdir": None,
"move": None,
"cleanup": None,
"removed": None,
}

def fake_listdir(path: str) -> list[str]:
captured["listdir"] = path
return [".dockerignore"]

monkeypatch.setattr(
plugin_updator_module.os, "makedirs", lambda path, exist_ok=True: None
)
monkeypatch.setattr(plugin_updator_module.os.path, "join", ntpath.join)
monkeypatch.setattr(plugin_updator_module.os.path, "normpath", ntpath.normpath)
monkeypatch.setattr(plugin_updator_module.os.path, "isdir", lambda path: False)
monkeypatch.setattr(plugin_updator_module.os.path, "exists", lambda path: False)
monkeypatch.setattr(
plugin_updator_module.zipfile,
"ZipFile",
lambda path, mode: _FakeZipArchive(_build_fake_archive_entries(archive_root)),
)
monkeypatch.setattr(plugin_updator_module.logger, "info", lambda message: None)
monkeypatch.setattr(plugin_updator_module.logger, "warning", lambda message: None)
monkeypatch.setattr(plugin_updator_module.os, "listdir", fake_listdir)
monkeypatch.setattr(
plugin_updator_module.shutil,
"move",
lambda src, dst: captured.__setitem__("move", (src, dst)),
)
monkeypatch.setattr(
plugin_updator_module.shutil,
"rmtree",
lambda path, onerror=None: captured.__setitem__("cleanup", path),
)
monkeypatch.setattr(
plugin_updator_module.os,
"remove",
lambda path: captured.__setitem__("removed", path),
)

PluginUpdator.__new__(PluginUpdator).unzip_file("temp.zip", target_dir)

assert captured["removed"] == "temp.zip"
if normalized_root == ".":
assert captured["listdir"] is None
assert captured["move"] is None
assert captured["cleanup"] is None
return

assert captured["listdir"] == expected_root
assert captured["move"] == (expected_file, target_dir)
assert captured["cleanup"] == expected_root
Loading