Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| f95c68ee41 | |||
| 2b961baa5b | |||
| 69bb85af01 | |||
| a97395370a | |||
| b1d2fe7717 |
4
.gitignore
vendored
4
.gitignore
vendored
@@ -1,4 +1,6 @@
|
|||||||
__pycache__
|
__pycache__
|
||||||
|
*.egg-info
|
||||||
|
/.coverage
|
||||||
|
/.hypothesis/
|
||||||
/src/bsv/_version.py
|
/src/bsv/_version.py
|
||||||
/venv
|
/venv
|
||||||
*.egg-info
|
|
||||||
|
|||||||
@@ -12,14 +12,18 @@ classifiers = [
|
|||||||
]
|
]
|
||||||
dynamic = ["version"]
|
dynamic = ["version"]
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"click",
|
||||||
"fastcdc",
|
"fastcdc",
|
||||||
"rich",
|
"rich",
|
||||||
"tomlkit",
|
"tomlkit",
|
||||||
|
"typing-extensions"
|
||||||
]
|
]
|
||||||
|
|
||||||
[project.optional-dependencies]
|
[project.optional-dependencies]
|
||||||
test = [
|
dev = [
|
||||||
|
"hypothesis",
|
||||||
"pytest",
|
"pytest",
|
||||||
|
"pytest-cov"
|
||||||
]
|
]
|
||||||
|
|
||||||
[project.urls]
|
[project.urls]
|
||||||
@@ -27,7 +31,59 @@ test = [
|
|||||||
"Bug Tracker" = "https://git.draklia.net/draklaw/pybsv/issues"
|
"Bug Tracker" = "https://git.draklia.net/draklaw/pybsv/issues"
|
||||||
|
|
||||||
[project.scripts]
|
[project.scripts]
|
||||||
bsv = "bsv.main:main"
|
bsv = "bsv.cli:cli"
|
||||||
|
|
||||||
|
|
||||||
|
[tool.ruff]
|
||||||
|
target-version = "py311"
|
||||||
|
|
||||||
|
[tool.ruff.lint]
|
||||||
|
select = [
|
||||||
|
"B", # flake8-bugbear
|
||||||
|
"D", # pydocstyle
|
||||||
|
"DOC", # pydoclint
|
||||||
|
"E", # pycodestyle
|
||||||
|
"EM", # flake8-errmsg
|
||||||
|
"F", # Pyflakes
|
||||||
|
"FURB", # refurb
|
||||||
|
"G", # flake8-logging-format
|
||||||
|
"I", # isort
|
||||||
|
"ISC", # flake8-implicit-str-concat
|
||||||
|
"LOG", # flake8-logging
|
||||||
|
"N", # pep8-naming
|
||||||
|
"PERF", # Perflint
|
||||||
|
"PT", # flake8-pytest-style
|
||||||
|
"PTH", # flake8-use-pathlib
|
||||||
|
"S", # flake8-bandit
|
||||||
|
"SIM", # flake8-simplify
|
||||||
|
"TC", # flake8-type-checking
|
||||||
|
"UP", # pyupgrade
|
||||||
|
"W", # pycodestyle
|
||||||
|
]
|
||||||
|
ignore = [
|
||||||
|
"UP038", # Deprecated rule; bad idea.
|
||||||
|
]
|
||||||
|
|
||||||
|
[tool.ruff.lint.per-file-ignores]
|
||||||
|
"**/tests/*" = [
|
||||||
|
"D103", # Missing docstring in public function
|
||||||
|
"S101", # Use of assert detected
|
||||||
|
]
|
||||||
|
|
||||||
|
[tool.ruff.lint.isort]
|
||||||
|
force-sort-within-sections = true
|
||||||
|
lines-after-imports = 2
|
||||||
|
required-imports = ["from __future__ import annotations"]
|
||||||
|
|
||||||
|
[tool.ruff.lint.pydocstyle]
|
||||||
|
convention = "google"
|
||||||
|
|
||||||
|
|
||||||
|
[tool.coverage.report]
|
||||||
|
exclude_also = [
|
||||||
|
"if TYPE_CHECKING:",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
[build-system]
|
[build-system]
|
||||||
requires = ["setuptools", "setuptools-scm"]
|
requires = ["setuptools", "setuptools-scm"]
|
||||||
|
|||||||
18
src/bsv.bak/__init__.py
Normal file
18
src/bsv.bak/__init__.py
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
# bsv - Backup, Synchronization, Versioning
|
||||||
|
# Copyright (C) 2023 Simon Boyé
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from bsv._version import __version__, __version_tuple__
|
||||||
21
src/bsv.bak/__main__.py
Normal file
21
src/bsv.bak/__main__.py
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
# bsv - Backup, Synchronization, Versioning
|
||||||
|
# Copyright (C) 2023 Simon Boyé
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from bsv.main import main
|
||||||
|
|
||||||
|
|
||||||
|
exit(main())
|
||||||
16
src/bsv.bak/_version.py
Normal file
16
src/bsv.bak/_version.py
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
# file generated by setuptools_scm
|
||||||
|
# don't change, don't track in version control
|
||||||
|
TYPE_CHECKING = False
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from typing import Tuple, Union
|
||||||
|
VERSION_TUPLE = Tuple[Union[int, str], ...]
|
||||||
|
else:
|
||||||
|
VERSION_TUPLE = object
|
||||||
|
|
||||||
|
version: str
|
||||||
|
__version__: str
|
||||||
|
__version_tuple__: VERSION_TUPLE
|
||||||
|
version_tuple: VERSION_TUPLE
|
||||||
|
|
||||||
|
__version__ = version = '0.0.1.dev8+g52a553d.d20231127'
|
||||||
|
__version_tuple__ = version_tuple = (0, 0, 1, 'dev8', 'g52a553d.d20231127')
|
||||||
126
src/bsv.bak/cli.py
Normal file
126
src/bsv.bak/cli.py
Normal file
@@ -0,0 +1,126 @@
|
|||||||
|
# bsv - Backup, Synchronization, Versioning
|
||||||
|
# Copyright (C) 2023 Simon Boyé
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import Any, Callable, TypeVar
|
||||||
|
|
||||||
|
from rich.console import Console
|
||||||
|
from rich.text import Text
|
||||||
|
|
||||||
|
|
||||||
|
_console: Console | None = None
|
||||||
|
def get_console() -> Console:
|
||||||
|
assert _console is not None
|
||||||
|
return _console
|
||||||
|
|
||||||
|
_error_console: Console | None = None
|
||||||
|
def get_error_console() -> Console:
|
||||||
|
assert _error_console is not None
|
||||||
|
return _error_console
|
||||||
|
|
||||||
|
|
||||||
|
def init_consoles(color: str="auto"):
|
||||||
|
global _console
|
||||||
|
global _error_console
|
||||||
|
|
||||||
|
assert _console is None
|
||||||
|
assert _error_console is None
|
||||||
|
|
||||||
|
kwargs: dict[str, Any] = {
|
||||||
|
"tab_size": 4,
|
||||||
|
}
|
||||||
|
match color:
|
||||||
|
case "always":
|
||||||
|
kwargs["force_terminal"] = True
|
||||||
|
case "auto":
|
||||||
|
pass
|
||||||
|
case "never":
|
||||||
|
kwargs["no_color"] = True
|
||||||
|
|
||||||
|
_console = Console(
|
||||||
|
**kwargs,
|
||||||
|
)
|
||||||
|
_error_console = Console(
|
||||||
|
stderr = True,
|
||||||
|
**kwargs,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
PromptType = TypeVar("PromptType")
|
||||||
|
|
||||||
|
class NoDefaultType:
|
||||||
|
def __repr__(self):
|
||||||
|
return "NoDefault"
|
||||||
|
NoDefault = NoDefaultType()
|
||||||
|
|
||||||
|
def prompt(
|
||||||
|
prompt: str,
|
||||||
|
factory: Callable[[str], PromptType],
|
||||||
|
*,
|
||||||
|
console: Console | None = None,
|
||||||
|
default: PromptType | NoDefaultType = NoDefault,
|
||||||
|
show_default: bool = True,
|
||||||
|
) -> PromptType:
|
||||||
|
if console is None:
|
||||||
|
console = get_console()
|
||||||
|
|
||||||
|
prompt_text = Text(prompt, style="prompt")
|
||||||
|
prompt_text.end = ""
|
||||||
|
if show_default and default is not NoDefault:
|
||||||
|
prompt_text.append(" ")
|
||||||
|
prompt_text.append(f"({default})", style="prompt.default")
|
||||||
|
prompt_text.append(": ")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
value = console.input(prompt_text)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
console.print("")
|
||||||
|
raise
|
||||||
|
|
||||||
|
if not value and not isinstance(default, NoDefaultType):
|
||||||
|
return default
|
||||||
|
try:
|
||||||
|
return factory(value)
|
||||||
|
except ValueError as err:
|
||||||
|
console.print(err)
|
||||||
|
|
||||||
|
def prompt_confirmation(prompt: str, *, console: Console | None=None, default: bool=True) -> bool:
|
||||||
|
if console is None:
|
||||||
|
console = get_console()
|
||||||
|
|
||||||
|
prompt_text = Text(prompt, style="prompt")
|
||||||
|
prompt_text.end = ""
|
||||||
|
prompt_text.append(" ")
|
||||||
|
if default:
|
||||||
|
prompt_text.append("(Y/n)", style="prompt.default")
|
||||||
|
else:
|
||||||
|
prompt_text.append("(y/N)", style="prompt.default")
|
||||||
|
prompt_text.append(": ")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
value = console.input(prompt_text).strip().lower()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
console.print("")
|
||||||
|
raise
|
||||||
|
|
||||||
|
if not value and not isinstance(default, NoDefaultType):
|
||||||
|
return default
|
||||||
|
if value not in "yn":
|
||||||
|
console.print("Please answer 'y' or 'n'.")
|
||||||
|
else:
|
||||||
|
return value == "y"
|
||||||
@@ -1,5 +1,5 @@
|
|||||||
# bsv - Backup, Synchronization, Versioning
|
# pybsv - Backup, Synchronization, Versioning.
|
||||||
# Copyright (C) 2023 Simon Boyé
|
# Copyright (C) 2025 Simon Boyé
|
||||||
#
|
#
|
||||||
# This program is free software: you can redistribute it and/or modify
|
# This program is free software: you can redistribute it and/or modify
|
||||||
# it under the terms of the GNU Affero General Public License as published by
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
@@ -13,6 +13,11 @@
|
|||||||
#
|
#
|
||||||
# You should have received a copy of the GNU Affero General Public License
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
"""pybsv - A Backup, Synchronization and Versioning tool."""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from bsv._version import __version__, __version_tuple__
|
from bsv._version import __version__, __version_tuple__
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = ["__version__", "__version_tuple__"]
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
# bsv - Backup, Synchronization, Versioning
|
# pybsv - Backup, Synchronization, Versioning.
|
||||||
# Copyright (C) 2023 Simon Boyé
|
# Copyright (C) 2025 Simon Boyé
|
||||||
#
|
#
|
||||||
# This program is free software: you can redistribute it and/or modify
|
# This program is free software: you can redistribute it and/or modify
|
||||||
# it under the terms of the GNU Affero General Public License as published by
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
@@ -13,9 +13,11 @@
|
|||||||
#
|
#
|
||||||
# You should have received a copy of the GNU Affero General Public License
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
"""Main entry-point. Allow to use bsv module as a command."""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from bsv.main import main
|
from bsv.cli import cli
|
||||||
|
|
||||||
|
|
||||||
exit(main())
|
exit(cli())
|
||||||
|
|||||||
377
src/bsv/cli.py
377
src/bsv/cli.py
@@ -1,5 +1,5 @@
|
|||||||
# bsv - Backup, Synchronization, Versioning
|
# pybsv - Backup, Synchronization, Versioning.
|
||||||
# Copyright (C) 2023 Simon Boyé
|
# Copyright (C) 2025 Simon Boyé
|
||||||
#
|
#
|
||||||
# This program is free software: you can redistribute it and/or modify
|
# This program is free software: you can redistribute it and/or modify
|
||||||
# it under the terms of the GNU Affero General Public License as published by
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
@@ -13,114 +13,303 @@
|
|||||||
#
|
#
|
||||||
# You should have received a copy of the GNU Affero General Public License
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
"""Command-line interface. This is where all bsv commands are defined."""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from typing import Any, Callable, TypeVar
|
from dataclasses import dataclass
|
||||||
|
import math
|
||||||
|
from pathlib import Path, PurePosixPath
|
||||||
|
import platform
|
||||||
|
import sys
|
||||||
|
from typing import Any, ClassVar, Literal
|
||||||
|
|
||||||
from rich.console import Console
|
import click
|
||||||
from rich.text import Text
|
|
||||||
|
|
||||||
|
from bsv.cli_utils import format_human_byte_size
|
||||||
_console: Console | None = None
|
from bsv.repo import default_repository_path
|
||||||
def get_console() -> Console:
|
from bsv.vfs import (
|
||||||
assert _console is not None
|
AlreadyExistError,
|
||||||
return _console
|
FileMetadata,
|
||||||
|
NotFoundError,
|
||||||
_error_console: Console | None = None
|
Permissions,
|
||||||
def get_error_console() -> Console:
|
VirtualFileSystem,
|
||||||
assert _error_console is not None
|
|
||||||
return _error_console
|
|
||||||
|
|
||||||
|
|
||||||
def init_consoles(color: str="auto"):
|
|
||||||
global _console
|
|
||||||
global _error_console
|
|
||||||
|
|
||||||
assert _console is None
|
|
||||||
assert _error_console is None
|
|
||||||
|
|
||||||
kwargs: dict[str, Any] = {
|
|
||||||
"tab_size": 4,
|
|
||||||
}
|
|
||||||
match color:
|
|
||||||
case "always":
|
|
||||||
kwargs["force_terminal"] = True
|
|
||||||
case "auto":
|
|
||||||
pass
|
|
||||||
case "never":
|
|
||||||
kwargs["no_color"] = True
|
|
||||||
|
|
||||||
_console = Console(
|
|
||||||
**kwargs,
|
|
||||||
)
|
|
||||||
_error_console = Console(
|
|
||||||
stderr = True,
|
|
||||||
**kwargs,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
PromptType = TypeVar("PromptType")
|
@dataclass
|
||||||
|
class RepositoryParams:
|
||||||
|
"""Global parameters shared by all commands."""
|
||||||
|
|
||||||
class NoDefaultType:
|
path: Path
|
||||||
def __repr__(self):
|
|
||||||
return "NoDefault"
|
|
||||||
NoDefault = NoDefaultType()
|
|
||||||
|
|
||||||
def prompt(
|
def as_filesystem(self) -> VirtualFileSystem:
|
||||||
prompt: str,
|
return VirtualFileSystem(self.path)
|
||||||
factory: Callable[[str], PromptType],
|
|
||||||
*,
|
|
||||||
console: Console | None = None,
|
|
||||||
default: PromptType | NoDefaultType = NoDefault,
|
|
||||||
show_default: bool = True,
|
|
||||||
) -> PromptType:
|
|
||||||
if console is None:
|
|
||||||
console = get_console()
|
|
||||||
|
|
||||||
prompt_text = Text(prompt, style="prompt")
|
|
||||||
prompt_text.end = ""
|
|
||||||
if show_default and default is not NoDefault:
|
|
||||||
prompt_text.append(" ")
|
|
||||||
prompt_text.append(f"({default})", style="prompt.default")
|
|
||||||
prompt_text.append(": ")
|
|
||||||
|
|
||||||
while True:
|
class PermissionsType(click.ParamType):
|
||||||
|
"""Converter for permissions given on the command line."""
|
||||||
|
|
||||||
|
name: ClassVar[str] = "permissions"
|
||||||
|
|
||||||
|
def convert(
|
||||||
|
self, value: Any, param: click.Parameter | None, ctx: click.Context | None
|
||||||
|
) -> Permissions:
|
||||||
|
"""Convert an argument to a `Permissions` object."""
|
||||||
|
if isinstance(value, Permissions):
|
||||||
|
return value
|
||||||
|
|
||||||
try:
|
try:
|
||||||
value = console.input(prompt_text)
|
return Permissions(value)
|
||||||
except KeyboardInterrupt:
|
|
||||||
console.print("")
|
|
||||||
raise
|
|
||||||
|
|
||||||
if not value and not isinstance(default, NoDefaultType):
|
|
||||||
return default
|
|
||||||
try:
|
|
||||||
return factory(value)
|
|
||||||
except ValueError as err:
|
except ValueError as err:
|
||||||
console.print(err)
|
self.fail(str(err), param, ctx)
|
||||||
|
|
||||||
def prompt_confirmation(prompt: str, *, console: Console | None=None, default: bool=True) -> bool:
|
|
||||||
if console is None:
|
|
||||||
console = get_console()
|
|
||||||
|
|
||||||
prompt_text = Text(prompt, style="prompt")
|
class BsvPathType(click.ParamType):
|
||||||
prompt_text.end = ""
|
"""Converter for bsv paths given on the command line."""
|
||||||
prompt_text.append(" ")
|
|
||||||
if default:
|
name: ClassVar[str] = "bsv_path"
|
||||||
prompt_text.append("(Y/n)", style="prompt.default")
|
|
||||||
else:
|
def convert(
|
||||||
prompt_text.append("(y/N)", style="prompt.default")
|
self, value: Any, param: click.Parameter | None, ctx: click.Context | None
|
||||||
prompt_text.append(": ")
|
) -> PurePosixPath:
|
||||||
|
"""Convert an argument to a bsv path (absolute `PurePosixPath`)."""
|
||||||
|
if isinstance(value, PurePosixPath):
|
||||||
|
return value
|
||||||
|
|
||||||
while True:
|
|
||||||
try:
|
try:
|
||||||
value = console.input(prompt_text).strip().lower()
|
path = PurePosixPath(value)
|
||||||
except KeyboardInterrupt:
|
except ValueError as err:
|
||||||
console.print("")
|
self.fail(str(err), param, ctx)
|
||||||
raise
|
|
||||||
|
|
||||||
if not value and not isinstance(default, NoDefaultType):
|
if not path.is_absolute():
|
||||||
return default
|
self.fail(f"{value} is not an absolute path", param, ctx)
|
||||||
if value not in "yn":
|
|
||||||
console.print("Please answer 'y' or 'n'.")
|
return path
|
||||||
|
|
||||||
|
|
||||||
|
class AnyPathType(click.ParamType):
|
||||||
|
"""Converter for bsv or fs paths given on the command line."""
|
||||||
|
|
||||||
|
name: str = "any_path"
|
||||||
|
|
||||||
|
default: Literal["bsv", "fs"]
|
||||||
|
|
||||||
|
def __init__(self, default: Literal["bsv", "fs"] = "fs"):
|
||||||
|
self.default = default
|
||||||
|
|
||||||
|
def convert(
|
||||||
|
self, value: Any, param: click.Parameter | None, ctx: click.Context | None
|
||||||
|
) -> PurePosixPath | Path:
|
||||||
|
"""Convert an argument to a bsv or fs path."""
|
||||||
|
if isinstance(value, (PurePosixPath, Path)):
|
||||||
|
return value
|
||||||
|
|
||||||
|
if not isinstance(value, str):
|
||||||
|
self.fail(f"{value} is not a string")
|
||||||
|
|
||||||
|
path_type = self.default
|
||||||
|
if value.startswith("bsv:"):
|
||||||
|
path_type = "bsv"
|
||||||
|
value = value.removeprefix("bsv:")
|
||||||
|
elif value.startswith("fs:"):
|
||||||
|
path_type = "fs"
|
||||||
|
value = value.removeprefix("fs:")
|
||||||
|
|
||||||
|
if path_type == "bsv":
|
||||||
|
return BsvPathType().convert(value, param, ctx)
|
||||||
else:
|
else:
|
||||||
return value == "y"
|
return Path(value)
|
||||||
|
|
||||||
|
|
||||||
|
PERMISSIONS_TYPE = PermissionsType()
|
||||||
|
ANY_OBJECT_TYPE = BsvPathType() # TODO: accept bsv path and object id.
|
||||||
|
|
||||||
|
|
||||||
|
@click.group()
|
||||||
|
@click.version_option()
|
||||||
|
@click.option(
|
||||||
|
"--repo", envvar="BSV_REPO", type=click.Path(resolve_path=True, path_type=Path)
|
||||||
|
)
|
||||||
|
@click.pass_context
|
||||||
|
def cli(ctx: click.Context, repo: Path):
|
||||||
|
"""Backup, Synchronization and Versioning (bsv) tool.
|
||||||
|
|
||||||
|
bsv manages synchronization of several "devices" with history. This makes it
|
||||||
|
suitable for different tasks:
|
||||||
|
|
||||||
|
* Backup: Synchronize your data with remote devices that serve as backup. The
|
||||||
|
remotes should be configured to keep previous versions of the files (using
|
||||||
|
configurable rules) so even if a file is deleted/corrupted, a valid version can
|
||||||
|
be found in the backup devices.
|
||||||
|
* Synchronization: Synchronize your data among several devices you are working with.
|
||||||
|
In case of conflict, the conflicting versions of a file are stored in each
|
||||||
|
devices so it is possible to inspect and merge them to resolve the conflict.
|
||||||
|
* Versioning: A local device can be used to store different versions of the same
|
||||||
|
directory structure.
|
||||||
|
"""
|
||||||
|
ctx.obj = RepositoryParams(
|
||||||
|
path=repo or default_repository_path(),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command()
|
||||||
|
@click.pass_obj
|
||||||
|
def info(params: RepositoryParams):
|
||||||
|
"""Print information on the current repository."""
|
||||||
|
print(f"Repository: {params.path}")
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command()
|
||||||
|
@click.option("-d", "--device-name", default=platform.node, prompt=True)
|
||||||
|
@click.pass_obj
|
||||||
|
def init(params: RepositoryParams, device_name: str):
|
||||||
|
"""Initialize a bsv repository."""
|
||||||
|
print(f"Repository path: {params.path!r}")
|
||||||
|
print(f"Device name: {device_name!r}")
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command()
|
||||||
|
@click.argument("directories", nargs=-1, type=BsvPathType())
|
||||||
|
@click.option("-m", "--mode", type=PERMISSIONS_TYPE, default=Permissions(0o770))
|
||||||
|
@click.option("-p", "--parents", is_flag=True)
|
||||||
|
@click.option("-v", "--verbose", is_flag=True)
|
||||||
|
@click.pass_obj
|
||||||
|
def mkdir(
|
||||||
|
params: RepositoryParams,
|
||||||
|
directories: list[PurePosixPath],
|
||||||
|
mode: Permissions,
|
||||||
|
parents: bool = False,
|
||||||
|
verbose: bool = False,
|
||||||
|
):
|
||||||
|
"""Make a directory in the current repository."""
|
||||||
|
fs = params.as_filesystem()
|
||||||
|
|
||||||
|
return_code = 0
|
||||||
|
for dir in directories:
|
||||||
|
try:
|
||||||
|
fs.mkdir(dir, mode=mode, parents=parents)
|
||||||
|
except AlreadyExistError as error:
|
||||||
|
click.echo(error, file=sys.stderr)
|
||||||
|
except NotFoundError as error:
|
||||||
|
return_code = 1
|
||||||
|
click.echo(error, file=sys.stderr)
|
||||||
|
else:
|
||||||
|
if verbose:
|
||||||
|
click.echo(f"Created {dir}")
|
||||||
|
|
||||||
|
sys.exit(return_code)
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command()
|
||||||
|
@click.argument("files", nargs=-1, type=BsvPathType())
|
||||||
|
@click.option("--filter", flag_value="hidden", default=True, hidden=True)
|
||||||
|
@click.option("-a", "--all", "filter", flag_value="all")
|
||||||
|
@click.option("-A", "--almost-all", "filter", flag_value="implied")
|
||||||
|
@click.option("-h", "--human-readable", is_flag=True)
|
||||||
|
@click.option("-l", "--list", is_flag=True)
|
||||||
|
@click.pass_obj
|
||||||
|
def ls(
|
||||||
|
params: RepositoryParams,
|
||||||
|
files: tuple[PurePosixPath],
|
||||||
|
filter: Literal["hidden", "implied", "all"],
|
||||||
|
human_readable: bool,
|
||||||
|
list: bool,
|
||||||
|
):
|
||||||
|
"""List information about files."""
|
||||||
|
fs = params.as_filesystem()
|
||||||
|
|
||||||
|
if not files:
|
||||||
|
files = (PurePosixPath("/"),)
|
||||||
|
|
||||||
|
filter_md = FileMetadata.is_hidden if filter == "hidden" else lambda _: False
|
||||||
|
|
||||||
|
for file_index, file in enumerate(files):
|
||||||
|
if len(files) > 1:
|
||||||
|
if file_index:
|
||||||
|
click.echo()
|
||||||
|
click.echo(f"{file}:")
|
||||||
|
|
||||||
|
items = [(md.path.name, md) for md in fs.iter_dir(file) if not filter_md(md)]
|
||||||
|
items.sort()
|
||||||
|
|
||||||
|
if filter == "all":
|
||||||
|
items[0:0] = [
|
||||||
|
(".", fs.metadata(file)),
|
||||||
|
("..", fs.metadata(file.parent)),
|
||||||
|
]
|
||||||
|
|
||||||
|
if list:
|
||||||
|
rows: list[tuple[str, str, str, str]] = []
|
||||||
|
rows_width: list[int] = [0, 0, 0, 0]
|
||||||
|
for name, md in items:
|
||||||
|
mode = str(md.unix_mode)
|
||||||
|
size = (
|
||||||
|
format_human_byte_size(md.byte_size)
|
||||||
|
if human_readable
|
||||||
|
else str(md.byte_size)
|
||||||
|
)
|
||||||
|
local_time = md.modification_time.astimezone().replace(tzinfo=None)
|
||||||
|
time = local_time.isoformat(" ", "seconds")
|
||||||
|
row = (mode, size, time, name)
|
||||||
|
rows.append(row)
|
||||||
|
for index, field in enumerate(row):
|
||||||
|
rows_width[index] = max(rows_width[index], len(field))
|
||||||
|
|
||||||
|
for mode, size, time, name in rows:
|
||||||
|
click.echo(
|
||||||
|
" ".join(
|
||||||
|
[
|
||||||
|
mode.ljust(rows_width[0]),
|
||||||
|
size.rjust(rows_width[1]),
|
||||||
|
time.ljust(rows_width[2]),
|
||||||
|
name,
|
||||||
|
]
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
else:
|
||||||
|
for name, _ in items:
|
||||||
|
click.echo(name)
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command()
|
||||||
|
@click.argument("object", type=ANY_OBJECT_TYPE)
|
||||||
|
@click.pass_obj
|
||||||
|
def show(
|
||||||
|
params: RepositoryParams,
|
||||||
|
object: PurePosixPath,
|
||||||
|
):
|
||||||
|
"""Show a bsv object."""
|
||||||
|
print(f"object: {object!r}")
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command()
|
||||||
|
@click.argument("srcs", nargs=-1, type=AnyPathType(default="bsv"))
|
||||||
|
@click.argument("dst", type=AnyPathType(default="bsv"))
|
||||||
|
@click.option("-r", "--recursive", is_flag=True)
|
||||||
|
@click.pass_obj
|
||||||
|
def cp(
|
||||||
|
params: RepositoryParams,
|
||||||
|
srcs: list[PurePosixPath | Path],
|
||||||
|
dst: PurePosixPath | Path,
|
||||||
|
recursive: bool,
|
||||||
|
):
|
||||||
|
"""Copy files or directories."""
|
||||||
|
print(f"srcs: {srcs!r}")
|
||||||
|
print(f"dst: {dst!r}")
|
||||||
|
print(f"recursive: {recursive!r}")
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command()
|
||||||
|
@click.argument("targets", nargs=-1, type=BsvPathType())
|
||||||
|
@click.option("-r", "--recursive", is_flag=True)
|
||||||
|
@click.pass_obj
|
||||||
|
def rm(
|
||||||
|
params: RepositoryParams,
|
||||||
|
targets: list[PurePosixPath],
|
||||||
|
recursive: bool,
|
||||||
|
):
|
||||||
|
"""Remove files or directories."""
|
||||||
|
print(f"targets: {targets}")
|
||||||
|
print(f"recursive: {recursive}")
|
||||||
|
|||||||
48
src/bsv/cli_utils.py
Normal file
48
src/bsv/cli_utils.py
Normal file
@@ -0,0 +1,48 @@
|
|||||||
|
# pybsv - Backup, Synchronization, Versioning.
|
||||||
|
# Copyright (C) 2025 Simon Boyé
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
"""Tools and utilities to build the command-line interface."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import Final
|
||||||
|
|
||||||
|
|
||||||
|
BINARY_PREFIXES: Final[list[str]] = [
|
||||||
|
"",
|
||||||
|
"Ki",
|
||||||
|
"Mi",
|
||||||
|
"Gi",
|
||||||
|
"Ti",
|
||||||
|
"Pi",
|
||||||
|
"Ei",
|
||||||
|
"Zi",
|
||||||
|
"Yi",
|
||||||
|
"Ri",
|
||||||
|
"Qi",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def format_human_byte_size(byte_size: int) -> str:
|
||||||
|
"""Format the given `byte_size` as a human-readable string."""
|
||||||
|
index = min(max((byte_size.bit_length() - 1) // 10, 0), len(BINARY_PREFIXES) - 1)
|
||||||
|
size = byte_size / 1024**index
|
||||||
|
num_digits = len(str(int(size)))
|
||||||
|
decimals = max(0, 3 - num_digits)
|
||||||
|
rounded = round(size, decimals)
|
||||||
|
if rounded == 1024 and index + 1 < len(BINARY_PREFIXES):
|
||||||
|
rounded = 1
|
||||||
|
index += 1
|
||||||
|
return f"{rounded:.16g}{BINARY_PREFIXES[index]}B"
|
||||||
0
src/bsv/py.typed
Normal file
0
src/bsv/py.typed
Normal file
41
src/bsv/repo.py
Normal file
41
src/bsv/repo.py
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
# pybsv - Backup, Synchronization, Versioning.
|
||||||
|
# Copyright (C) 2025 Simon Boyé
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import os
|
||||||
|
from pathlib import Path
|
||||||
|
import platform
|
||||||
|
|
||||||
|
|
||||||
|
def default_repository_path() -> Path:
|
||||||
|
"""Return the system-dependent default repository path."""
|
||||||
|
if platform.system() in ("Windows", "Darwin", "Java"):
|
||||||
|
msg = f"default_repository_path does not support {platform.system()} system"
|
||||||
|
raise NotImplementedError(msg)
|
||||||
|
else: # Assume Unix
|
||||||
|
# See https://specifications.freedesktop.org/basedir-spec/latest/
|
||||||
|
data_home = os.environ.get("XDG_DATA_HOME", "")
|
||||||
|
if data_home:
|
||||||
|
path = Path(data_home)
|
||||||
|
if not path.is_absolute() or not path.exists():
|
||||||
|
msg = (
|
||||||
|
f"invalid XDG_DATA_HOME ({path}): path is relative or does not "
|
||||||
|
"exists"
|
||||||
|
)
|
||||||
|
raise RuntimeError(msg)
|
||||||
|
else:
|
||||||
|
path = Path.home() / ".local/share"
|
||||||
|
return path / "bsv/repo"
|
||||||
348
src/bsv/vfs.py
Normal file
348
src/bsv/vfs.py
Normal file
@@ -0,0 +1,348 @@
|
|||||||
|
# pybsv - Backup, Synchronization, Versioning.
|
||||||
|
# Copyright (C) 2025 Simon Boyé
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
"""Provide a virtual file system interface alongside associated tools."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from datetime import UTC, datetime
|
||||||
|
from functools import total_ordering
|
||||||
|
import os
|
||||||
|
from pathlib import Path, PurePosixPath
|
||||||
|
from stat import S_IFDIR, S_IFLNK, S_IFMT, S_IFREG, S_IMODE, filemode
|
||||||
|
from typing import TYPE_CHECKING, Any, BinaryIO, Literal, Self
|
||||||
|
|
||||||
|
from typing_extensions import Buffer
|
||||||
|
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from collections.abc import Iterator
|
||||||
|
from os import stat_result
|
||||||
|
|
||||||
|
|
||||||
|
AnyBsvPath = PurePosixPath | str
|
||||||
|
|
||||||
|
|
||||||
|
class FsError(RuntimeError):
|
||||||
|
"""Error type raised by `FileSystem` objects."""
|
||||||
|
|
||||||
|
|
||||||
|
class AlreadyExistError(FsError):
|
||||||
|
"""Raise when trying to create an item that already exists."""
|
||||||
|
|
||||||
|
|
||||||
|
class NotFoundError(FsError):
|
||||||
|
"""Raise when trying to access an item that do not exist."""
|
||||||
|
|
||||||
|
|
||||||
|
class Permissions:
|
||||||
|
"""Represent the permissions of an object in a filesystem."""
|
||||||
|
|
||||||
|
unix_perms: int
|
||||||
|
|
||||||
|
def __init__(self, unix_perms: int | str = 0o640):
|
||||||
|
"""Create a `Permissions` object from `unix_perms`."""
|
||||||
|
if isinstance(unix_perms, str):
|
||||||
|
unix_perms = int(unix_perms, 8)
|
||||||
|
self.unix_perms = unix_perms
|
||||||
|
|
||||||
|
def __eq__(self, rhs: Any) -> bool:
|
||||||
|
"""Test if two `Permission` are the same."""
|
||||||
|
return (
|
||||||
|
rhs.unix_perms == self.unix_perms
|
||||||
|
if isinstance(rhs, Permissions)
|
||||||
|
else NotImplemented
|
||||||
|
)
|
||||||
|
|
||||||
|
def __repr__(self) -> str:
|
||||||
|
"""Return a representation of the permissions as valid python code."""
|
||||||
|
return f"Permissions(0o{oct(self.unix_perms)[2:].rjust(4, '0')})"
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
"""Return a representation of the permissions of the form 'rwxrwxrwx'."""
|
||||||
|
return filemode(self.unix_perms)[1:]
|
||||||
|
|
||||||
|
|
||||||
|
DEFAULT_DIR_PERMS = Permissions(0o770)
|
||||||
|
DEFAULT_FILE_PERMS = Permissions(0o640)
|
||||||
|
|
||||||
|
|
||||||
|
FileType = Literal["dir", "file", "symlink", "other"]
|
||||||
|
|
||||||
|
_IFMT_MAP: dict[int, FileType] = {
|
||||||
|
S_IFDIR: "dir",
|
||||||
|
S_IFREG: "file",
|
||||||
|
S_IFLNK: "symlink",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@total_ordering
|
||||||
|
class FileMetadata:
|
||||||
|
"""Metadata associated with vfs files: file type, permissions, etc."""
|
||||||
|
|
||||||
|
path: PurePosixPath
|
||||||
|
type: FileType
|
||||||
|
permissions: Permissions
|
||||||
|
modification_time: datetime
|
||||||
|
byte_size: int
|
||||||
|
_stat: stat_result
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
path: PurePosixPath,
|
||||||
|
*,
|
||||||
|
type: FileType,
|
||||||
|
permissions: Permissions,
|
||||||
|
modification_time: datetime,
|
||||||
|
byte_size: int,
|
||||||
|
):
|
||||||
|
"""Create a `FileMetadata`."""
|
||||||
|
self.path = path
|
||||||
|
self.type = type
|
||||||
|
self.permissions = permissions
|
||||||
|
self.modification_time = modification_time
|
||||||
|
self.byte_size = byte_size
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_stat(
|
||||||
|
cls,
|
||||||
|
path: PurePosixPath,
|
||||||
|
stat: stat_result,
|
||||||
|
) -> Self:
|
||||||
|
"""Create a `FileMetadata` from a `stat_result`."""
|
||||||
|
return cls(
|
||||||
|
path,
|
||||||
|
type=_IFMT_MAP.get(S_IFMT(stat.st_mode), "other"),
|
||||||
|
permissions=Permissions(S_IMODE(stat.st_mode)),
|
||||||
|
modification_time=datetime.fromtimestamp(stat.st_mtime, UTC),
|
||||||
|
byte_size=stat.st_size,
|
||||||
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def unix_mode(self) -> str:
|
||||||
|
"""Return unix-like mode in the form '-rwxrwxrwx'."""
|
||||||
|
return UNIX_MODE_FILE_TYPE[self.type] + str(self.permissions)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_hidden(self) -> bool:
|
||||||
|
"""Return true if the file starts with a '.'."""
|
||||||
|
return self.path.name.startswith(".")
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_file(self) -> bool:
|
||||||
|
"""Test if this is a file."""
|
||||||
|
return self.type == "file"
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_dir(self) -> bool:
|
||||||
|
"""Test if this is a directory."""
|
||||||
|
return self.type == "dir"
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_symlink(self) -> bool:
|
||||||
|
"""Test if this is a symbolic link."""
|
||||||
|
return self.type == "symlink"
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_other(self) -> bool:
|
||||||
|
"""Test if this is a symbolic link."""
|
||||||
|
return self.type == "other"
|
||||||
|
|
||||||
|
def _as_tuple(
|
||||||
|
self,
|
||||||
|
) -> tuple[PurePosixPath, FileType, Permissions, datetime, int]:
|
||||||
|
return (
|
||||||
|
self.path,
|
||||||
|
self.type,
|
||||||
|
self.permissions,
|
||||||
|
self.modification_time,
|
||||||
|
self.byte_size,
|
||||||
|
)
|
||||||
|
|
||||||
|
def __eq__(self, rhs: Any) -> bool:
|
||||||
|
"""Test if two `Metadata` are the same."""
|
||||||
|
return (
|
||||||
|
self._as_tuple() == rhs._as_tuple()
|
||||||
|
if isinstance(rhs, FileMetadata)
|
||||||
|
else NotImplemented
|
||||||
|
)
|
||||||
|
|
||||||
|
def __lt__(self, rhs: Any) -> bool:
|
||||||
|
"""Compare `rhs.path` with `self.path`."""
|
||||||
|
return self.path < rhs.path if isinstance(rhs, FileMetadata) else NotImplemented
|
||||||
|
|
||||||
|
|
||||||
|
UNIX_MODE_FILE_TYPE = {
|
||||||
|
"dir": "d",
|
||||||
|
"file": "-",
|
||||||
|
"other": "o",
|
||||||
|
"link": "l",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class VirtualFileSystem:
|
||||||
|
"""Represent a file system, with common file system operations."""
|
||||||
|
|
||||||
|
path: Path
|
||||||
|
|
||||||
|
def __init__(self, path: Path):
|
||||||
|
"""Initialize the file system to point to `path`."""
|
||||||
|
self.path = path
|
||||||
|
|
||||||
|
def exists(self, path: AnyBsvPath) -> bool:
|
||||||
|
"""Test if the `path` point to an existing item."""
|
||||||
|
path = self._make_path(path)
|
||||||
|
return self._real_path(path).exists()
|
||||||
|
|
||||||
|
def is_file(self, path: AnyBsvPath) -> bool:
|
||||||
|
"""Test if `path` is a file."""
|
||||||
|
return self.metadata(path).is_file
|
||||||
|
|
||||||
|
def is_dir(self, path: AnyBsvPath) -> bool:
|
||||||
|
"""Test if `path` is a directory."""
|
||||||
|
return self.metadata(path).is_dir
|
||||||
|
|
||||||
|
def is_symlink(self, path: AnyBsvPath) -> bool:
|
||||||
|
"""Test if `path` is a symbolic link."""
|
||||||
|
return self.metadata(path).is_symlink
|
||||||
|
|
||||||
|
def is_other(self, path: AnyBsvPath) -> bool:
|
||||||
|
"""Test if `path` is not a file, directory or symbolic link."""
|
||||||
|
return self.metadata(path).is_other
|
||||||
|
|
||||||
|
def metadata(self, path: AnyBsvPath) -> FileMetadata:
|
||||||
|
"""Return the metadata of a given object."""
|
||||||
|
metadata = self.metadata_or_none(path)
|
||||||
|
if metadata is None:
|
||||||
|
msg = f"file '{path}' not found"
|
||||||
|
raise NotFoundError(msg)
|
||||||
|
return metadata
|
||||||
|
|
||||||
|
def metadata_or_none(self, path: AnyBsvPath) -> FileMetadata | None:
|
||||||
|
"""Return the metadata of a given object or `None` if it does not exists."""
|
||||||
|
path = self._make_path(path)
|
||||||
|
try:
|
||||||
|
stat = self._real_path(path).stat(follow_symlinks=False)
|
||||||
|
except FileNotFoundError:
|
||||||
|
return None
|
||||||
|
except OSError as err:
|
||||||
|
msg = f"failed to read '{path}' metadata"
|
||||||
|
raise FsError(msg) from err
|
||||||
|
return FileMetadata.from_stat(path, stat)
|
||||||
|
|
||||||
|
def iter_dir(self, path: AnyBsvPath) -> Iterator[FileMetadata]:
|
||||||
|
"""Return the metadata of all items in the directory `path`."""
|
||||||
|
path = self._make_path(path)
|
||||||
|
real_path = self._real_path(path)
|
||||||
|
try:
|
||||||
|
for entry in os.scandir(real_path):
|
||||||
|
yield FileMetadata.from_stat(
|
||||||
|
path / entry.name, entry.stat(follow_symlinks=False)
|
||||||
|
)
|
||||||
|
except OSError as err:
|
||||||
|
msg = f"failed to read directory {path}"
|
||||||
|
raise FsError(msg) from err
|
||||||
|
|
||||||
|
def read_bytes(self, path: AnyBsvPath) -> bytes:
|
||||||
|
"""Return the content of `path` as `bytes`."""
|
||||||
|
with self.open_read(path) as stream:
|
||||||
|
return stream.read()
|
||||||
|
|
||||||
|
def write_bytes(self, path: AnyBsvPath, data: Buffer | BinaryIO) -> int:
|
||||||
|
"""Create or replace a file at `path`, setting its content to `data`."""
|
||||||
|
written = 0
|
||||||
|
with self.open_write(path) as sout:
|
||||||
|
if isinstance(data, Buffer):
|
||||||
|
written += sout.write(data)
|
||||||
|
else:
|
||||||
|
while chunk := data.read(65536):
|
||||||
|
written += sout.write(chunk)
|
||||||
|
return written
|
||||||
|
|
||||||
|
def open_read(self, path: AnyBsvPath) -> BinaryIO:
|
||||||
|
"""Return a read-only binary stream that read the content of `path`."""
|
||||||
|
path = self._make_path(path)
|
||||||
|
try:
|
||||||
|
return self._real_path(path).open("rb")
|
||||||
|
except OSError as err:
|
||||||
|
msg = f"failed to read {path}"
|
||||||
|
raise FsError(msg) from err
|
||||||
|
|
||||||
|
def open_write(self, path: AnyBsvPath) -> BinaryIO:
|
||||||
|
"""Return a write-only binary stream write to `path`."""
|
||||||
|
path = self._make_path(path)
|
||||||
|
try:
|
||||||
|
return self._real_path(path).open("wb")
|
||||||
|
except OSError as err:
|
||||||
|
msg = f"failed to read {path}"
|
||||||
|
raise FsError(msg) from err
|
||||||
|
|
||||||
|
def mkdir(
|
||||||
|
self,
|
||||||
|
path: AnyBsvPath,
|
||||||
|
mode: Permissions = DEFAULT_DIR_PERMS,
|
||||||
|
parents: bool = False,
|
||||||
|
exist_ok: bool = False,
|
||||||
|
):
|
||||||
|
"""Create a directory at `path`.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
path: The directory to create.
|
||||||
|
mode: The permissions of the new directory.
|
||||||
|
parents: If `True`, create parent directories if they don't exists.
|
||||||
|
exist_ok: If `False` and `path` already exist, raise an error.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
FsError: If something goes wrong.
|
||||||
|
"""
|
||||||
|
path = self._make_path(path)
|
||||||
|
try:
|
||||||
|
self._real_path(path).mkdir(
|
||||||
|
mode=mode.unix_perms, parents=parents, exist_ok=exist_ok
|
||||||
|
)
|
||||||
|
except FileExistsError as err:
|
||||||
|
msg = f"{path} already exists"
|
||||||
|
raise AlreadyExistError(msg) from err
|
||||||
|
except FileNotFoundError as err:
|
||||||
|
msg = f"{path.parent} does not exist"
|
||||||
|
raise NotFoundError(msg) from err
|
||||||
|
|
||||||
|
def make_link(self, path: AnyBsvPath, target: AnyBsvPath) -> None:
|
||||||
|
"""Creates a symbolic link from `path` to `target`."""
|
||||||
|
path = self._make_path(path)
|
||||||
|
target = self._make_path(path)
|
||||||
|
self._real_path(path).symlink_to(self._real_path(target))
|
||||||
|
|
||||||
|
def set_permissions(self, path: AnyBsvPath, permissions: Permissions) -> None:
|
||||||
|
"""Set the permissions of `path` to `permissions`."""
|
||||||
|
path = self._make_path(path)
|
||||||
|
self._real_path(path).chmod(permissions.unix_perms)
|
||||||
|
|
||||||
|
def set_modification_time(self, path: AnyBsvPath, mod_time: datetime) -> None:
|
||||||
|
"""Set the modification time of `path` to `mod_time`."""
|
||||||
|
path = self._make_path(path)
|
||||||
|
ts = mod_time.timestamp()
|
||||||
|
os.utime(self._real_path(path), (ts, ts))
|
||||||
|
|
||||||
|
def _make_path(self, path: AnyBsvPath) -> PurePosixPath:
|
||||||
|
if not isinstance(path, PurePosixPath):
|
||||||
|
path = PurePosixPath(path)
|
||||||
|
if not path.is_absolute():
|
||||||
|
msg = f"{path} is not absolute"
|
||||||
|
raise FsError(msg)
|
||||||
|
return path
|
||||||
|
|
||||||
|
def _real_path(self, path: PurePosixPath) -> Path:
|
||||||
|
return self.path / path.relative_to("/")
|
||||||
18
tests/test_bsv/__init__.py
Normal file
18
tests/test_bsv/__init__.py
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
# pybsv - Backup, Synchronization, Versioning.
|
||||||
|
# Copyright (C) 2025 Simon Boyé
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
"""pybsv test module."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
270
tests/test_bsv/test_cli.py
Normal file
270
tests/test_bsv/test_cli.py
Normal file
@@ -0,0 +1,270 @@
|
|||||||
|
# pybsv - Backup, Synchronization, Versioning.
|
||||||
|
# Copyright (C) 2025 Simon Boyé
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from contextlib import contextmanager
|
||||||
|
from datetime import UTC, datetime
|
||||||
|
import os
|
||||||
|
from pathlib import Path
|
||||||
|
import re
|
||||||
|
from tempfile import TemporaryDirectory
|
||||||
|
from typing import TYPE_CHECKING, Literal, NamedTuple
|
||||||
|
|
||||||
|
from click.testing import CliRunner
|
||||||
|
from hypothesis import given
|
||||||
|
import hypothesis.strategies as st
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from bsv import cli
|
||||||
|
from bsv.vfs import Permissions
|
||||||
|
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from collections.abc import Generator
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def runner(tmp_path: Path) -> CliRunner:
|
||||||
|
runner = CliRunner(env={"BSV_REPO": str(tmp_path)})
|
||||||
|
return runner
|
||||||
|
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def make_runner() -> Generator[CliRunner, None, None]:
|
||||||
|
with TemporaryDirectory(prefix="test_vfs_") as tmp:
|
||||||
|
runner = CliRunner(env={"BSV_REPO": tmp})
|
||||||
|
yield runner
|
||||||
|
|
||||||
|
|
||||||
|
########################################################################################
|
||||||
|
# mkdir
|
||||||
|
|
||||||
|
|
||||||
|
def test_mkdir_fails_with_relative_path(tmp_path: Path, runner: CliRunner):
|
||||||
|
assert not (tmp_path / "test").exists()
|
||||||
|
result = runner.invoke(cli.cli, ["mkdir", "test"])
|
||||||
|
assert result.exit_code == 2
|
||||||
|
assert "test is not an absolute path" in result.stderr
|
||||||
|
|
||||||
|
|
||||||
|
def test_mkdir_default(tmp_path: Path, runner: CliRunner):
|
||||||
|
assert not (tmp_path / "test").exists()
|
||||||
|
result = runner.invoke(cli.cli, ["mkdir", "/test"])
|
||||||
|
assert result.exit_code == 0
|
||||||
|
assert result.stderr == ""
|
||||||
|
assert result.stdout == ""
|
||||||
|
assert (tmp_path / "test").is_dir()
|
||||||
|
|
||||||
|
|
||||||
|
def test_mkdir_multiple_dirs(tmp_path: Path, runner: CliRunner):
|
||||||
|
assert not (tmp_path / "foo").exists()
|
||||||
|
assert not (tmp_path / "bar").exists()
|
||||||
|
result = runner.invoke(cli.cli, ["mkdir", "/foo", "/bar"])
|
||||||
|
assert result.exit_code == 0
|
||||||
|
assert result.stderr == ""
|
||||||
|
assert result.stdout == ""
|
||||||
|
assert (tmp_path / "foo").is_dir()
|
||||||
|
assert (tmp_path / "bar").is_dir()
|
||||||
|
|
||||||
|
|
||||||
|
def test_mkdir_nested_fails_without_parents(tmp_path: Path, runner: CliRunner):
|
||||||
|
assert not (tmp_path / "foo").exists()
|
||||||
|
result = runner.invoke(cli.cli, ["mkdir", "/foo/bar"])
|
||||||
|
assert result.exit_code == 1
|
||||||
|
assert result.stderr == "/foo does not exist\n"
|
||||||
|
assert result.stdout == ""
|
||||||
|
|
||||||
|
|
||||||
|
def test_mkdir_nested(tmp_path: Path, runner: CliRunner):
|
||||||
|
assert not (tmp_path / "foo/bar").exists()
|
||||||
|
result = runner.invoke(cli.cli, ["mkdir", "--parents", "/foo/bar"])
|
||||||
|
assert result.exit_code == 0
|
||||||
|
assert result.stderr == ""
|
||||||
|
assert result.stdout == ""
|
||||||
|
assert (tmp_path / "foo/bar").is_dir()
|
||||||
|
|
||||||
|
|
||||||
|
def test_mkdir_message_if_exists(tmp_path: Path, runner: CliRunner):
|
||||||
|
result = runner.invoke(cli.cli, ["mkdir", "/test"])
|
||||||
|
assert result.exit_code == 0
|
||||||
|
assert result.stderr == ""
|
||||||
|
assert result.stdout == ""
|
||||||
|
assert (tmp_path / "test").is_dir()
|
||||||
|
|
||||||
|
result = runner.invoke(cli.cli, ["mkdir", "/test"])
|
||||||
|
assert result.exit_code == 0
|
||||||
|
assert result.stderr == "/test already exists\n"
|
||||||
|
assert result.stdout == ""
|
||||||
|
assert (tmp_path / "test").is_dir()
|
||||||
|
|
||||||
|
|
||||||
|
def test_mkdir_mode(tmp_path: Path, runner: CliRunner):
|
||||||
|
assert not (tmp_path / "test").exists()
|
||||||
|
result = runner.invoke(cli.cli, ["mkdir", "/test", "--mode=741"])
|
||||||
|
assert result.exit_code == 0
|
||||||
|
assert result.stderr == ""
|
||||||
|
assert result.stdout == ""
|
||||||
|
assert (tmp_path / "test").is_dir()
|
||||||
|
assert (tmp_path / "test").stat().st_mode & 0o7777 == 0o741
|
||||||
|
|
||||||
|
|
||||||
|
def test_mkdir_verbose(tmp_path: Path, runner: CliRunner):
|
||||||
|
result = runner.invoke(cli.cli, ["mkdir", "/foo"])
|
||||||
|
assert result.exit_code == 0
|
||||||
|
assert result.stderr == ""
|
||||||
|
assert result.stdout == ""
|
||||||
|
assert (tmp_path / "foo").is_dir()
|
||||||
|
assert not (tmp_path / "bar").exists()
|
||||||
|
result = runner.invoke(cli.cli, ["mkdir", "--verbose", "/foo", "/bar"])
|
||||||
|
assert result.exit_code == 0
|
||||||
|
assert result.stderr == "/foo already exists\n"
|
||||||
|
assert result.stdout == "Created /bar\n"
|
||||||
|
assert (tmp_path / "foo").is_dir()
|
||||||
|
assert (tmp_path / "bar").is_dir()
|
||||||
|
|
||||||
|
|
||||||
|
########################################################################################
|
||||||
|
# ls
|
||||||
|
|
||||||
|
|
||||||
|
def permissions(target: Literal["file", "dir"] = "file"):
|
||||||
|
return st.builds(
|
||||||
|
Permissions,
|
||||||
|
st.sampled_from(
|
||||||
|
[
|
||||||
|
0o0400,
|
||||||
|
0o0440,
|
||||||
|
0o0444,
|
||||||
|
0o0600,
|
||||||
|
0o0640,
|
||||||
|
0o0644,
|
||||||
|
0o0664,
|
||||||
|
0o0750,
|
||||||
|
0o0755,
|
||||||
|
0o0777,
|
||||||
|
]
|
||||||
|
if target == "file"
|
||||||
|
else [
|
||||||
|
0o0400,
|
||||||
|
0o0600,
|
||||||
|
0o0640,
|
||||||
|
]
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class Tree(NamedTuple):
|
||||||
|
type: Literal["file", "dir"]
|
||||||
|
name: str
|
||||||
|
perms: Permissions
|
||||||
|
time: datetime
|
||||||
|
content: bytes | list[Tree]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def type_prefix(self) -> str:
|
||||||
|
if self.type == "dir":
|
||||||
|
return "d"
|
||||||
|
return "-"
|
||||||
|
|
||||||
|
def build(self, parent: Path) -> None:
|
||||||
|
path = parent / self.name
|
||||||
|
if isinstance(self.content, list):
|
||||||
|
path.mkdir(mode=self.perms.unix_perms)
|
||||||
|
for child in self.content:
|
||||||
|
child.build(path)
|
||||||
|
else:
|
||||||
|
path.write_bytes(self.content)
|
||||||
|
path.chmod(self.perms.unix_perms)
|
||||||
|
ts = self.time.timestamp()
|
||||||
|
os.utime(path, (ts, ts))
|
||||||
|
|
||||||
|
|
||||||
|
def filenames() -> st.SearchStrategy:
|
||||||
|
return st.text(
|
||||||
|
st.characters(exclude_categories=["Cc", "Cs"], exclude_characters='<>:"/\\|!*'),
|
||||||
|
min_size=1,
|
||||||
|
max_size=255,
|
||||||
|
).filter(lambda t: len(t.encode()) < 256 and t not in (".", ".."))
|
||||||
|
|
||||||
|
|
||||||
|
@st.composite
|
||||||
|
def trees(draw: st.DrawFn, max_depth: int = 3) -> Tree:
|
||||||
|
file_type = draw(st.sampled_from(["file", "dir"]))
|
||||||
|
content = (
|
||||||
|
st.binary()
|
||||||
|
if file_type == "file"
|
||||||
|
else st.lists(
|
||||||
|
trees(max_depth - 1),
|
||||||
|
unique_by=lambda t: t.name,
|
||||||
|
max_size=0 if max_depth == 0 else 10,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return Tree(
|
||||||
|
file_type, # type: ignore
|
||||||
|
draw(filenames()),
|
||||||
|
draw(permissions(file_type)), # type: ignore
|
||||||
|
draw(
|
||||||
|
st.datetimes(
|
||||||
|
min_value=datetime(1902, 1, 1),
|
||||||
|
max_value=datetime(2100, 1, 1),
|
||||||
|
timezones=st.just(UTC),
|
||||||
|
)
|
||||||
|
),
|
||||||
|
draw(content),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def trees_lists(max_depth: int = 3) -> st.SearchStrategy:
|
||||||
|
return st.lists(trees(max_depth=max_depth), unique_by=lambda t: t.name)
|
||||||
|
|
||||||
|
|
||||||
|
@given(trees=trees_lists(max_depth=0))
|
||||||
|
def test_ls(trees: list[Tree]):
|
||||||
|
with make_runner() as runner:
|
||||||
|
path = Path(runner.env["BSV_REPO"] or "")
|
||||||
|
for tree in trees:
|
||||||
|
tree.build(path)
|
||||||
|
|
||||||
|
result = runner.invoke(cli.cli, ["ls", "-lA"])
|
||||||
|
|
||||||
|
assert result.exit_code == 0
|
||||||
|
assert result.stderr == ""
|
||||||
|
|
||||||
|
trees.sort(key=lambda t: t.name)
|
||||||
|
lines = [line for line in result.stdout.splitlines() if line != "\n"]
|
||||||
|
|
||||||
|
for line, tree in zip(lines, trees, strict=True):
|
||||||
|
match = re.fullmatch(
|
||||||
|
r"""
|
||||||
|
([dl-])([r-][w-][x-][r-][w-][x-][r-][w-][x-])
|
||||||
|
\ +(\d+)
|
||||||
|
\ (\d{4}-\d{2}-\d{2}\ \d{2}:\d{2}:\d{2})
|
||||||
|
\ ([^\n]+)
|
||||||
|
""",
|
||||||
|
line,
|
||||||
|
re.VERBOSE,
|
||||||
|
)
|
||||||
|
assert match
|
||||||
|
assert match[1] == tree.type_prefix
|
||||||
|
assert match[2] == str(tree.perms)
|
||||||
|
if tree.type_prefix != "d":
|
||||||
|
assert match[3] == str(len(tree.content))
|
||||||
|
assert match[4] == tree.time.astimezone().replace(tzinfo=None).isoformat(
|
||||||
|
" ", "seconds"
|
||||||
|
)
|
||||||
|
assert match[5] == tree.name
|
||||||
|
|
||||||
|
pass
|
||||||
56
tests/test_bsv/test_cli_utils.py
Normal file
56
tests/test_bsv/test_cli_utils.py
Normal file
@@ -0,0 +1,56 @@
|
|||||||
|
# pybsv - Backup, Synchronization, Versioning.
|
||||||
|
# Copyright (C) 2025 Simon Boyé
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
"""Tests for cli_utils.py."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from bsv.cli_utils import format_human_byte_size
|
||||||
|
|
||||||
|
|
||||||
|
def test_format_human_byte_size():
|
||||||
|
assert format_human_byte_size(0) == "0B"
|
||||||
|
assert format_human_byte_size(1) == "1B"
|
||||||
|
assert format_human_byte_size(9) == "9B"
|
||||||
|
assert format_human_byte_size(10) == "10B"
|
||||||
|
assert format_human_byte_size(99) == "99B"
|
||||||
|
assert format_human_byte_size(100) == "100B"
|
||||||
|
assert format_human_byte_size(999) == "999B"
|
||||||
|
assert format_human_byte_size(1000) == "1000B"
|
||||||
|
assert format_human_byte_size(1023) == "1023B"
|
||||||
|
assert format_human_byte_size(2**10) == "1KiB"
|
||||||
|
assert format_human_byte_size(int(1.23456 * 2**10)) == "1.23KiB"
|
||||||
|
assert format_human_byte_size(9 * 2**10) == "9KiB"
|
||||||
|
assert format_human_byte_size(10 * 2**10 - 1) == "10KiB"
|
||||||
|
assert format_human_byte_size(int(98.76543 * 2**10)) == "98.8KiB"
|
||||||
|
assert format_human_byte_size(99 * 2**10 - 1) == "99KiB"
|
||||||
|
assert format_human_byte_size(100 * 2**10 - 1) == "100KiB"
|
||||||
|
assert format_human_byte_size(int(192.8374 * 2**10)) == "193KiB"
|
||||||
|
assert format_human_byte_size(999 * 2**10 - 1) == "999KiB"
|
||||||
|
assert format_human_byte_size(1000 * 2**10 - 1) == "1000KiB"
|
||||||
|
assert format_human_byte_size(2**20 - 1) == "1MiB"
|
||||||
|
assert format_human_byte_size(2**20) == "1MiB"
|
||||||
|
assert format_human_byte_size(2**30) == "1GiB"
|
||||||
|
assert format_human_byte_size(2**40) == "1TiB"
|
||||||
|
assert format_human_byte_size(2**50) == "1PiB"
|
||||||
|
assert format_human_byte_size(2**60) == "1EiB"
|
||||||
|
assert format_human_byte_size(2**70) == "1ZiB"
|
||||||
|
assert format_human_byte_size(2**80) == "1YiB"
|
||||||
|
assert format_human_byte_size(2**90) == "1RiB"
|
||||||
|
assert format_human_byte_size(2**100 - 2**80) == "1QiB"
|
||||||
|
assert format_human_byte_size(2**100) == "1QiB"
|
||||||
|
assert format_human_byte_size(2**110 - 2**90) == "1024QiB"
|
||||||
|
assert format_human_byte_size(2**110) == "1024QiB"
|
||||||
|
assert format_human_byte_size(2**120) == "1048576QiB"
|
||||||
394
tests/test_bsv/test_vfs.py
Normal file
394
tests/test_bsv/test_vfs.py
Normal file
@@ -0,0 +1,394 @@
|
|||||||
|
# pybsv - Backup, Synchronization, Versioning.
|
||||||
|
# Copyright (C) 2025 Simon Boyé
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
"""Tests for the `VirtualFileSystem` class and related stuff."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from datetime import UTC, datetime
|
||||||
|
from io import BytesIO
|
||||||
|
from pathlib import Path, PurePosixPath
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from bsv.vfs import FileMetadata, FsError, Permissions, VirtualFileSystem
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def fs(tmp_path: Path) -> VirtualFileSystem:
|
||||||
|
"""Fixture that returns a `VirtualFileSystem`."""
|
||||||
|
return VirtualFileSystem(tmp_path)
|
||||||
|
|
||||||
|
|
||||||
|
########################################################################################
|
||||||
|
# Permissions
|
||||||
|
|
||||||
|
|
||||||
|
def test_permissions():
|
||||||
|
perm0 = Permissions(0o1234)
|
||||||
|
assert perm0.unix_perms == 0o1234
|
||||||
|
|
||||||
|
perm1 = Permissions("752")
|
||||||
|
assert perm1.unix_perms == 0o752
|
||||||
|
|
||||||
|
assert perm0 == perm0
|
||||||
|
assert perm0 != perm1
|
||||||
|
|
||||||
|
assert repr(perm0) == "Permissions(0o1234)"
|
||||||
|
assert repr(perm1) == "Permissions(0o0752)"
|
||||||
|
|
||||||
|
assert str(perm0) == "-w--wxr-T"
|
||||||
|
assert str(perm1) == "rwxr-x-w-"
|
||||||
|
|
||||||
|
|
||||||
|
########################################################################################
|
||||||
|
# FileMetadata
|
||||||
|
|
||||||
|
|
||||||
|
def test_file_metadata():
|
||||||
|
path = PurePosixPath("/some_dir/some_file")
|
||||||
|
permissions = Permissions(0o1234)
|
||||||
|
mod_time = datetime(2025, 7, 12, 12, 34, 56, tzinfo=UTC)
|
||||||
|
|
||||||
|
file_md = FileMetadata(
|
||||||
|
path,
|
||||||
|
type="file",
|
||||||
|
permissions=permissions,
|
||||||
|
modification_time=mod_time,
|
||||||
|
byte_size=123,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert file_md.path == path
|
||||||
|
assert file_md.type == "file"
|
||||||
|
assert file_md.permissions == permissions
|
||||||
|
assert file_md.modification_time == mod_time
|
||||||
|
assert file_md.byte_size == 123
|
||||||
|
assert file_md.unix_mode == "--w--wxr-T"
|
||||||
|
assert not file_md.is_hidden
|
||||||
|
assert file_md.is_file
|
||||||
|
assert not file_md.is_dir
|
||||||
|
assert not file_md.is_symlink
|
||||||
|
assert not file_md.is_other
|
||||||
|
|
||||||
|
dir_md = FileMetadata(
|
||||||
|
path,
|
||||||
|
type="dir",
|
||||||
|
permissions=permissions,
|
||||||
|
modification_time=mod_time,
|
||||||
|
byte_size=123,
|
||||||
|
)
|
||||||
|
assert dir_md.type == "dir"
|
||||||
|
assert not dir_md.is_file
|
||||||
|
assert dir_md.is_dir
|
||||||
|
assert not dir_md.is_symlink
|
||||||
|
assert not dir_md.is_other
|
||||||
|
|
||||||
|
symlink_md = FileMetadata(
|
||||||
|
path,
|
||||||
|
type="symlink",
|
||||||
|
permissions=permissions,
|
||||||
|
modification_time=mod_time,
|
||||||
|
byte_size=123,
|
||||||
|
)
|
||||||
|
assert symlink_md.type == "symlink"
|
||||||
|
assert not symlink_md.is_file
|
||||||
|
assert not symlink_md.is_dir
|
||||||
|
assert symlink_md.is_symlink
|
||||||
|
assert not symlink_md.is_other
|
||||||
|
|
||||||
|
other_md = FileMetadata(
|
||||||
|
path,
|
||||||
|
type="other",
|
||||||
|
permissions=permissions,
|
||||||
|
modification_time=mod_time,
|
||||||
|
byte_size=123,
|
||||||
|
)
|
||||||
|
assert other_md.type == "other"
|
||||||
|
assert not other_md.is_file
|
||||||
|
assert not other_md.is_dir
|
||||||
|
assert not other_md.is_symlink
|
||||||
|
assert other_md.is_other
|
||||||
|
|
||||||
|
assert FileMetadata(
|
||||||
|
PurePosixPath("/some_dir/.some_file"),
|
||||||
|
type="file",
|
||||||
|
permissions=permissions,
|
||||||
|
modification_time=mod_time,
|
||||||
|
byte_size=123,
|
||||||
|
).is_hidden
|
||||||
|
|
||||||
|
|
||||||
|
def test_file_metadata_eq():
|
||||||
|
path = PurePosixPath("/some_dir/some_file")
|
||||||
|
permissions = Permissions(0o1234)
|
||||||
|
mod_time = datetime(2025, 7, 12, 12, 34, 56, tzinfo=UTC)
|
||||||
|
|
||||||
|
md = FileMetadata(
|
||||||
|
path,
|
||||||
|
type="file",
|
||||||
|
permissions=permissions,
|
||||||
|
modification_time=mod_time,
|
||||||
|
byte_size=123,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert (
|
||||||
|
FileMetadata(
|
||||||
|
path,
|
||||||
|
type="file",
|
||||||
|
permissions=permissions,
|
||||||
|
modification_time=mod_time,
|
||||||
|
byte_size=123,
|
||||||
|
)
|
||||||
|
== md
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
FileMetadata(
|
||||||
|
PurePosixPath("/some_dir/some_other_file"),
|
||||||
|
type="file",
|
||||||
|
permissions=permissions,
|
||||||
|
modification_time=mod_time,
|
||||||
|
byte_size=123,
|
||||||
|
)
|
||||||
|
!= md
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
FileMetadata(
|
||||||
|
path,
|
||||||
|
type="dir",
|
||||||
|
permissions=permissions,
|
||||||
|
modification_time=mod_time,
|
||||||
|
byte_size=123,
|
||||||
|
)
|
||||||
|
!= md
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
FileMetadata(
|
||||||
|
path,
|
||||||
|
type="file",
|
||||||
|
permissions=Permissions(0o0752),
|
||||||
|
modification_time=mod_time,
|
||||||
|
byte_size=123,
|
||||||
|
)
|
||||||
|
!= md
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
FileMetadata(
|
||||||
|
path,
|
||||||
|
type="file",
|
||||||
|
permissions=permissions,
|
||||||
|
modification_time=datetime(2025, 1, 2, 3, 4, 5),
|
||||||
|
byte_size=123,
|
||||||
|
)
|
||||||
|
!= md
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
FileMetadata(
|
||||||
|
path,
|
||||||
|
type="file",
|
||||||
|
permissions=permissions,
|
||||||
|
modification_time=mod_time,
|
||||||
|
byte_size=124,
|
||||||
|
)
|
||||||
|
!= md
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
########################################################################################
|
||||||
|
# mkdir
|
||||||
|
|
||||||
|
|
||||||
|
def test_mkdir_fails_with_relative_path(fs: VirtualFileSystem):
|
||||||
|
with pytest.raises(FsError):
|
||||||
|
fs.mkdir("test")
|
||||||
|
|
||||||
|
|
||||||
|
def test_mkdir_default(fs: VirtualFileSystem):
|
||||||
|
assert not fs.exists("/test")
|
||||||
|
fs.mkdir("/test")
|
||||||
|
assert fs.is_dir("/test")
|
||||||
|
|
||||||
|
|
||||||
|
def test_mkdir_nested_fails_without_parents(fs: VirtualFileSystem):
|
||||||
|
assert not fs.exists("/foo")
|
||||||
|
with pytest.raises(FsError):
|
||||||
|
fs.mkdir("/foo/bar")
|
||||||
|
|
||||||
|
|
||||||
|
def test_mkdir_nested(fs: VirtualFileSystem):
|
||||||
|
assert not fs.exists("/test")
|
||||||
|
fs.mkdir("/test/foobar", parents=True)
|
||||||
|
assert fs.is_dir("/test/foobar")
|
||||||
|
|
||||||
|
|
||||||
|
def test_mkdir_fails_if_exists(fs: VirtualFileSystem):
|
||||||
|
assert not fs.exists("/foo")
|
||||||
|
fs.mkdir("/foo")
|
||||||
|
assert fs.is_dir("/foo")
|
||||||
|
with pytest.raises(FsError):
|
||||||
|
fs.mkdir("/foo")
|
||||||
|
|
||||||
|
|
||||||
|
def test_mkdir_exists_ok(fs: VirtualFileSystem):
|
||||||
|
assert not fs.exists("/test")
|
||||||
|
fs.mkdir("/test")
|
||||||
|
assert fs.is_dir("/test")
|
||||||
|
fs.mkdir("/test", exist_ok=True)
|
||||||
|
|
||||||
|
|
||||||
|
def test_mkdir_exists_ok_fail_if_file(fs: VirtualFileSystem):
|
||||||
|
fs.write_bytes("/test", b"test")
|
||||||
|
assert fs.is_file("/test")
|
||||||
|
with pytest.raises(FsError):
|
||||||
|
fs.mkdir("/test", exist_ok=True)
|
||||||
|
|
||||||
|
|
||||||
|
def test_mkdir_mode(fs: VirtualFileSystem):
|
||||||
|
assert not fs.exists("/test")
|
||||||
|
permissions = Permissions(0o741)
|
||||||
|
fs.mkdir("/test", mode=permissions)
|
||||||
|
assert fs.is_dir("/test")
|
||||||
|
assert fs.metadata("/test").permissions == permissions
|
||||||
|
|
||||||
|
|
||||||
|
########################################################################################
|
||||||
|
# read_bytes / write_bytes
|
||||||
|
|
||||||
|
|
||||||
|
def test_read_write_bytes(fs: VirtualFileSystem):
|
||||||
|
assert not fs.exists("/test")
|
||||||
|
|
||||||
|
fs.write_bytes("/test", b"This is a test.")
|
||||||
|
assert fs.read_bytes("/test") == b"This is a test."
|
||||||
|
|
||||||
|
stream = BytesIO(b"Another test.")
|
||||||
|
fs.write_bytes("/test", stream)
|
||||||
|
assert fs.read_bytes("/test") == b"Another test."
|
||||||
|
|
||||||
|
with pytest.raises(FsError):
|
||||||
|
fs.read_bytes("/does_not_exist")
|
||||||
|
|
||||||
|
with pytest.raises(FsError):
|
||||||
|
fs.write_bytes("/does_not_exist/foobar", b"")
|
||||||
|
|
||||||
|
|
||||||
|
def test_open_read_write(fs: VirtualFileSystem):
|
||||||
|
assert not fs.exists("/test")
|
||||||
|
|
||||||
|
with fs.open_write("/test") as stream:
|
||||||
|
stream.write(b"foo")
|
||||||
|
stream.write(b"bar")
|
||||||
|
|
||||||
|
assert fs.exists("/test")
|
||||||
|
with fs.open_read("/test") as stream:
|
||||||
|
assert stream.read(3) == b"foo"
|
||||||
|
assert stream.read(3) == b"bar"
|
||||||
|
assert stream.read() == b""
|
||||||
|
|
||||||
|
# Test overwrite
|
||||||
|
with fs.open_write("/test") as stream:
|
||||||
|
stream.write(b"baz")
|
||||||
|
|
||||||
|
with fs.open_read("/test") as stream:
|
||||||
|
assert stream.read() == b"baz"
|
||||||
|
|
||||||
|
with pytest.raises(FsError):
|
||||||
|
fs.open_read("/does_not_exist")
|
||||||
|
|
||||||
|
with pytest.raises(FsError):
|
||||||
|
fs.open_write("/does_not_exist/foobar")
|
||||||
|
|
||||||
|
|
||||||
|
########################################################################################
|
||||||
|
# metadata
|
||||||
|
|
||||||
|
|
||||||
|
def test_metadata(fs: VirtualFileSystem):
|
||||||
|
file_permissions = Permissions(0o754)
|
||||||
|
file_time = datetime(2025, 5, 17, 13, 57, 32, tzinfo=UTC)
|
||||||
|
file_content = b"This is a test\n"
|
||||||
|
|
||||||
|
fs.write_bytes("/test_file", file_content)
|
||||||
|
fs.set_permissions("/test_file", file_permissions)
|
||||||
|
fs.set_modification_time("/test_file", file_time)
|
||||||
|
|
||||||
|
md = fs.metadata("/test_file")
|
||||||
|
assert md.path == PurePosixPath("/test_file")
|
||||||
|
assert md.permissions == file_permissions
|
||||||
|
assert md.type == "file"
|
||||||
|
assert md.modification_time == file_time
|
||||||
|
assert md.byte_size == len(file_content)
|
||||||
|
assert not md.is_hidden
|
||||||
|
assert fs.metadata("/test_file") == md
|
||||||
|
assert fs.is_file("/test_file")
|
||||||
|
assert not fs.is_dir("/test_file")
|
||||||
|
assert not fs.is_symlink("/test_file")
|
||||||
|
assert not fs.is_other("/test_file")
|
||||||
|
|
||||||
|
fs.set_permissions("/test_file", Permissions(0o644))
|
||||||
|
assert fs.metadata("/test_file") != md
|
||||||
|
|
||||||
|
fs.mkdir("/.test_dir")
|
||||||
|
md = fs.metadata("/.test_dir")
|
||||||
|
assert md.type == "dir"
|
||||||
|
assert fs.metadata("/.test_dir").is_hidden
|
||||||
|
assert not fs.is_file("/.test_dir")
|
||||||
|
assert fs.is_dir("/.test_dir")
|
||||||
|
assert not fs.is_symlink("/.test_dir")
|
||||||
|
assert not fs.is_other("/.test_dir")
|
||||||
|
|
||||||
|
fs.make_link("/test_link", "/link_target")
|
||||||
|
md = fs.metadata("/test_link")
|
||||||
|
assert md.type == "symlink"
|
||||||
|
assert not fs.is_file("/test_link")
|
||||||
|
assert not fs.is_dir("/test_link")
|
||||||
|
assert fs.is_symlink("/test_link")
|
||||||
|
assert not fs.is_other("/test_link")
|
||||||
|
|
||||||
|
assert fs.metadata_or_none("/does_not_exist") is None
|
||||||
|
with pytest.raises(FsError):
|
||||||
|
fs.metadata("/does_not_exist")
|
||||||
|
|
||||||
|
|
||||||
|
########################################################################################
|
||||||
|
# iter_dir
|
||||||
|
|
||||||
|
|
||||||
|
def test_iter_dir(fs: VirtualFileSystem):
|
||||||
|
expected = [
|
||||||
|
(PurePosixPath("/dir"), "dir"),
|
||||||
|
(PurePosixPath("/file"), "file"),
|
||||||
|
(PurePosixPath("/link"), "symlink"),
|
||||||
|
]
|
||||||
|
for path, file_type in expected:
|
||||||
|
if file_type == "dir":
|
||||||
|
fs.mkdir(path)
|
||||||
|
elif file_type == "file":
|
||||||
|
fs.write_bytes(path, b"")
|
||||||
|
elif file_type == "symlink":
|
||||||
|
fs.make_link(path, "/foobar")
|
||||||
|
|
||||||
|
items_metadata = sorted(fs.iter_dir("/"))
|
||||||
|
for md, [path, file_type] in zip(items_metadata, expected, strict=True):
|
||||||
|
assert md.path == path
|
||||||
|
assert md.type == file_type
|
||||||
|
|
||||||
|
|
||||||
|
def test_iter_dir_failure(fs: VirtualFileSystem):
|
||||||
|
with pytest.raises(FsError):
|
||||||
|
list(fs.iter_dir("/test"))
|
||||||
|
|
||||||
|
fs.write_bytes("/test", b"")
|
||||||
|
with pytest.raises(FsError):
|
||||||
|
list(fs.iter_dir("/test"))
|
||||||
Reference in New Issue
Block a user