Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 22 additions & 109 deletions src/borg/helpers/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,6 @@

import platformdirs

try:
import pwd # POSIX only
except ImportError:
pwd = None # win32?

from .errors import Error

from .process import prepare_subprocess_env
Expand Down Expand Up @@ -50,140 +45,68 @@ def ensure_dir(path, mode=stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO, pretty_dea
raise


def get_base_dir(*, legacy=False):
"""Get home directory / base directory for Borg:

Legacy:

Preference order (while being robust against misleading environment when invoked via mount helpers):
def get_base_dir():
"""Get base directory for Borg.

- BORG_BASE_DIR, if set.
- HOME, if it refers to the current (effective) user's home.
- ~$USER, if USER is set.
- The home directory of the current (effective) user from the password database (POSIX).
- ~ (platform default expansion).

Not legacy:

Just return BORG_BASE_DIR.
We only care for BORG_BASE_DIR here, as it can be used to override the base dir
and not use any more or less platform-specific way to determine the base dir.
The borg 1.x logic (HOME/USER/~ guessing) lives in borg.legacy.fs.
"""
if legacy:
# 1. Explicit override always wins.
base_dir = os.environ.get("BORG_BASE_DIR")
if base_dir:
return base_dir

# 2. Prefer HOME, but be robust against mount helpers that set HOME to root's home for non-root users.
home_env = os.environ.get("HOME")
if home_env and pwd is not None: # POSIX only
try:
# If HOME points to root's home but we are not root, prefer the invoking user's home.
root_home = pwd.getpwuid(0).pw_dir
uid = getattr(os, "geteuid", os.getuid)()
if uid != 0 and os.path.abspath(home_env) == os.path.abspath(root_home):
try:
user_home = pwd.getpwuid(uid).pw_dir
except Exception:
user_home = None
if user_home:
return user_home
# if we couldn't figure out the user's home, ignore HOME and continue with fallbacks
home_env = None
except Exception: # nosec B110
# If anything goes wrong determining root's home, keep HOME as-is.
pass

if home_env:
return home_env

# 3. Fall back to ~$USER if set (keeps previous behavior and existing tests).
user = os.environ.get("USER")
if user:
return os.path.expanduser("~%s" % user)

# 4. POSIX: use pw_home for the current uid; otherwise finally fallback to ~.
if pwd is not None:
try:
uid = getattr(os, "geteuid", os.getuid)()
return pwd.getpwuid(uid).pw_dir
except Exception: # nosec B110
pass

return os.path.expanduser("~")
else:
# we only care for BORG_BASE_DIR here, as it can be used to override the base dir
# and not use any more or less platform-specific way to determine the base dir.
return os.environ.get("BORG_BASE_DIR")
return os.environ.get("BORG_BASE_DIR")


def join_base_dir(*paths, **kw):
legacy = kw.get("legacy", True)
base_dir = get_base_dir(legacy=legacy)
def join_base_dir(*paths):
base_dir = get_base_dir()
return None if base_dir is None else str(Path(base_dir).joinpath(*paths))


def get_keys_dir(*, legacy=False, create=True):
def get_keys_dir(*, create=True):
"""Determine where to store repository keys."""
keys_dir = os.environ.get("BORG_KEYS_DIR")
if keys_dir is None:
# note: do not just give this as default to the environment.get(), see issue #5979.
keys_dir = str(Path(get_config_dir(legacy=legacy)) / "keys")
keys_dir = str(Path(get_config_dir()) / "keys")
if create:
ensure_dir(keys_dir)
return keys_dir


def get_security_dir(repository_id=None, *, legacy=False, create=True):
def get_security_dir(repository_id=None, *, create=True):
"""Determine where to store local security information."""
security_dir = os.environ.get("BORG_SECURITY_DIR")
if security_dir is None:
get_dir = get_config_dir if legacy else get_data_dir
# note: do not just give this as default to the environment.get(), see issue #5979.
security_dir = str(Path(get_dir(legacy=legacy)) / "security")
security_dir = str(Path(get_data_dir()) / "security")
if repository_id:
security_dir = str(Path(security_dir) / repository_id)
if create:
ensure_dir(security_dir)
return security_dir


def get_data_dir(*, legacy=False, create=True):
def get_data_dir(*, create=True):
"""Determine where to store borg changing data on the client"""
assert legacy is False, "there is no legacy variant of the borg data dir"
data_dir = os.environ.get(
"BORG_DATA_DIR", join_base_dir(".local", "share", "borg", legacy=legacy) or platformdirs.user_data_dir("borg")
"BORG_DATA_DIR", join_base_dir(".local", "share", "borg") or platformdirs.user_data_dir("borg")
)
if create:
ensure_dir(data_dir)
return data_dir


def get_runtime_dir(*, legacy=False, create=True):
def get_runtime_dir(*, create=True):
"""Determine where to store runtime files, like sockets, PID files, ..."""
assert legacy is False, "there is no legacy variant of the borg runtime dir"
runtime_dir = os.environ.get(
"BORG_RUNTIME_DIR", join_base_dir(".cache", "borg", legacy=legacy) or platformdirs.user_runtime_dir("borg")
"BORG_RUNTIME_DIR", join_base_dir(".cache", "borg") or platformdirs.user_runtime_dir("borg")
)
if create:
ensure_dir(runtime_dir)
return runtime_dir


def get_cache_dir(*, legacy=False, create=True):
def get_cache_dir(*, create=True):
"""Determine where to store Borg cache data."""

if legacy:
# Get cache home path
cache_home = join_base_dir(".cache", legacy=legacy)
# Try to use XDG_CACHE_HOME instead if BORG_BASE_DIR isn't explicitly set
if not os.environ.get("BORG_BASE_DIR"):
cache_home = os.environ.get("XDG_CACHE_HOME", cache_home)
# Use BORG_CACHE_DIR if set, otherwise assemble final path from cache home path
cache_dir = os.environ.get("BORG_CACHE_DIR", str(Path(cache_home) / "borg"))
else:
cache_dir = os.environ.get(
"BORG_CACHE_DIR", join_base_dir(".cache", "borg", legacy=legacy) or platformdirs.user_cache_dir("borg")
)
cache_dir = os.environ.get("BORG_CACHE_DIR", join_base_dir(".cache", "borg") or platformdirs.user_cache_dir("borg"))
if create:
ensure_dir(cache_dir)
cache_tag_fn = Path(cache_dir) / CACHE_TAG_NAME
Expand All @@ -205,21 +128,11 @@ def get_cache_dir(*, legacy=False, create=True):
return cache_dir


def get_config_dir(*, legacy=False, create=True):
def get_config_dir(*, create=True):
"""Determine where to store the configuration."""

# Get config home path
if legacy:
config_home = join_base_dir(".config", legacy=legacy)
# Try to use XDG_CONFIG_HOME instead if BORG_BASE_DIR isn't explicitly set
if not os.environ.get("BORG_BASE_DIR"):
config_home = os.environ.get("XDG_CONFIG_HOME", config_home)
# Use BORG_CONFIG_DIR if set, otherwise assemble final path from config home path
config_dir = os.environ.get("BORG_CONFIG_DIR", str(Path(config_home) / "borg"))
else:
config_dir = os.environ.get(
"BORG_CONFIG_DIR", join_base_dir(".config", "borg", legacy=legacy) or platformdirs.user_config_dir("borg")
)
config_dir = os.environ.get(
"BORG_CONFIG_DIR", join_base_dir(".config", "borg") or platformdirs.user_config_dir("borg")
)
if create:
ensure_dir(config_dir)
return config_dir
Expand Down
145 changes: 145 additions & 0 deletions src/borg/legacy/fs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
"""borg 1.x directory layout, kept only so borg transfer can read v1 repos.

The borg2 directory functions live in helpers.fs. This module keeps the old
home/base, config, cache, keys and security dir resolution. borg.security is
the only caller, and only when repository.version == 1. It goes away with the
rest of the borg 1.x transfer support.
"""

import os
import textwrap
from pathlib import Path

from ..constants import * # NOQA
from ..helpers.fs import ensure_dir

try:
import pwd # POSIX only
except ImportError:
pwd = None # win32?


def get_base_dir():
"""Get home directory / base directory for borg 1.x.

Preference order (while being robust against misleading environment when invoked via mount helpers):

- BORG_BASE_DIR, if set.
- HOME, if it refers to the current (effective) user's home.
- ~$USER, if USER is set.
- The home directory of the current (effective) user from the password database (POSIX).
- ~ (platform default expansion).
"""
# 1. Explicit override always wins.
base_dir = os.environ.get("BORG_BASE_DIR")
if base_dir:
return base_dir

# 2. Prefer HOME, but be robust against mount helpers that set HOME to root's home for non-root users.
home_env = os.environ.get("HOME")
if home_env and pwd is not None: # POSIX only
try:
# If HOME points to root's home but we are not root, prefer the invoking user's home.
root_home = pwd.getpwuid(0).pw_dir
uid = getattr(os, "geteuid", os.getuid)()
if uid != 0 and os.path.abspath(home_env) == os.path.abspath(root_home):
try:
user_home = pwd.getpwuid(uid).pw_dir
except Exception:
user_home = None
if user_home:
return user_home
# if we couldn't figure out the user's home, ignore HOME and continue with fallbacks
home_env = None
except Exception: # nosec B110
# If anything goes wrong determining root's home, keep HOME as-is.
pass

if home_env:
return home_env

# 3. Fall back to ~$USER if set (keeps previous behavior and existing tests).
user = os.environ.get("USER")
if user:
return os.path.expanduser("~%s" % user)

# 4. POSIX: use pw_home for the current uid; otherwise finally fallback to ~.
if pwd is not None:
try:
uid = getattr(os, "geteuid", os.getuid)()
return pwd.getpwuid(uid).pw_dir
except Exception: # nosec B110
pass

return os.path.expanduser("~")


def join_base_dir(*paths):
base_dir = get_base_dir()
return None if base_dir is None else str(Path(base_dir).joinpath(*paths))


def get_config_dir(*, create=True):
"""Determine where borg 1.x stored the configuration."""
config_home = join_base_dir(".config")
# Try to use XDG_CONFIG_HOME instead if BORG_BASE_DIR isn't explicitly set
if not os.environ.get("BORG_BASE_DIR"):
config_home = os.environ.get("XDG_CONFIG_HOME", config_home)
# Use BORG_CONFIG_DIR if set, otherwise assemble final path from config home path
config_dir = os.environ.get("BORG_CONFIG_DIR", str(Path(config_home) / "borg"))
if create:
ensure_dir(config_dir)
return config_dir


def get_cache_dir(*, create=True):
"""Determine where borg 1.x stored cache data."""
cache_home = join_base_dir(".cache")
# Try to use XDG_CACHE_HOME instead if BORG_BASE_DIR isn't explicitly set
if not os.environ.get("BORG_BASE_DIR"):
cache_home = os.environ.get("XDG_CACHE_HOME", cache_home)
# Use BORG_CACHE_DIR if set, otherwise assemble final path from cache home path
cache_dir = os.environ.get("BORG_CACHE_DIR", str(Path(cache_home) / "borg"))
if create:
ensure_dir(cache_dir)
cache_tag_fn = Path(cache_dir) / CACHE_TAG_NAME
if not cache_tag_fn.exists():
cache_tag_contents = (
CACHE_TAG_CONTENTS
+ textwrap.dedent(
"""
# This file is a cache directory tag created by Borg.
# For information about cache directory tags, see:
# https://www.bford.info/cachedir/spec.html
"""
).encode("ascii")
)
from ..platform import SaveFile

with SaveFile(cache_tag_fn, binary=True) as fd:
fd.write(cache_tag_contents)
return cache_dir


def get_keys_dir(*, create=True):
"""Determine where borg 1.x stored repository keys."""
keys_dir = os.environ.get("BORG_KEYS_DIR")
if keys_dir is None:
# note: do not just give this as default to the environment.get(), see issue #5979.
keys_dir = str(Path(get_config_dir()) / "keys")
if create:
ensure_dir(keys_dir)
return keys_dir


def get_security_dir(repository_id=None, *, create=True):
"""Determine where borg 1.x stored local security information."""
security_dir = os.environ.get("BORG_SECURITY_DIR")
if security_dir is None:
# note: do not just give this as default to the environment.get(), see issue #5979.
security_dir = str(Path(get_config_dir()) / "security")
if repository_id:
security_dir = str(Path(security_dir) / repository_id)
if create:
ensure_dir(security_dir)
return security_dir
13 changes: 11 additions & 2 deletions src/borg/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@
logger = create_logger()


def _security_dir(repository, *, create=True):
"""Security dir path for repository, using the borg 1.x layout for v1 repos."""
if repository.version == 1:
from .legacy.fs import get_security_dir as get_security_dir_legacy

return get_security_dir_legacy(repository.id_str, create=create)
return get_security_dir(repository.id_str, create=create)


class CacheInitAbortedError(Error):
"""Cache initialization aborted"""

Expand Down Expand Up @@ -63,15 +72,15 @@ class SecurityManager:

def __init__(self, repository):
self.repository = repository
self.dir = Path(get_security_dir(repository.id_str, legacy=(repository.version == 1)))
self.dir = Path(_security_dir(repository))
self.key_type_file = self.dir / "key-type"
self.location_file = self.dir / "location"
self.manifest_ts_file = self.dir / "manifest-timestamp"

@staticmethod
def destroy(repository, path=None):
"""Destroys the security directory for ``repository`` or at ``path``."""
path = path or get_security_dir(repository.id_str, legacy=(repository.version == 1))
path = path or _security_dir(repository)
if Path(path).exists():
shutil.rmtree(path)

Expand Down
Loading
Loading