Compare commits

...

5 Commits
master ... dev

  1. 4
      .gitignore
  2. 60
      pyproject.toml
  3. 18
      src/bsv.bak/__init__.py
  4. 21
      src/bsv.bak/__main__.py
  5. 16
      src/bsv.bak/_version.py
  6. 126
      src/bsv.bak/cli.py
  7. 0
      src/bsv.bak/command/__init__.py
  8. 0
      src/bsv.bak/command/info.py
  9. 0
      src/bsv.bak/command/init.py
  10. 0
      src/bsv.bak/exception.py
  11. 0
      src/bsv.bak/main.py
  12. 0
      src/bsv.bak/object.py
  13. 0
      src/bsv.bak/path_map.py
  14. 0
      src/bsv.bak/repository.py
  15. 0
      src/bsv.bak/simple_cas/__init__.py
  16. 0
      src/bsv.bak/simple_cas/cas.py
  17. 0
      src/bsv.bak/tree_walker.py
  18. 0
      src/bsv.bak/util.py
  19. 9
      src/bsv/__init__.py
  20. 10
      src/bsv/__main__.py
  21. 369
      src/bsv/cli.py
  22. 48
      src/bsv/cli_utils.py
  23. 0
      src/bsv/py.typed
  24. 41
      src/bsv/repo.py
  25. 348
      src/bsv/vfs.py
  26. 0
      tests.bak/test_repository.py
  27. 0
      tests.bak/test_simple_cas.py
  28. 18
      tests/test_bsv/__init__.py
  29. 270
      tests/test_bsv/test_cli.py
  30. 56
      tests/test_bsv/test_cli_utils.py
  31. 394
      tests/test_bsv/test_vfs.py

4
.gitignore

@ -1,4 +1,6 @@
__pycache__ __pycache__
*.egg-info
/.coverage
/.hypothesis/
/src/bsv/_version.py /src/bsv/_version.py
/venv /venv
*.egg-info

60
pyproject.toml

@ -12,14 +12,18 @@ classifiers = [
] ]
dynamic = ["version"] dynamic = ["version"]
dependencies = [ dependencies = [
"click",
"fastcdc", "fastcdc",
"rich", "rich",
"tomlkit", "tomlkit",
"typing-extensions"
] ]
[project.optional-dependencies] [project.optional-dependencies]
test = [ dev = [
"hypothesis",
"pytest", "pytest",
"pytest-cov"
] ]
[project.urls] [project.urls]
@ -27,7 +31,59 @@ test = [
"Bug Tracker" = "https://git.draklia.net/draklaw/pybsv/issues" "Bug Tracker" = "https://git.draklia.net/draklaw/pybsv/issues"
[project.scripts] [project.scripts]
bsv = "bsv.main:main" bsv = "bsv.cli:cli"
[tool.ruff]
target-version = "py311"
[tool.ruff.lint]
select = [
"B", # flake8-bugbear
"D", # pydocstyle
"DOC", # pydoclint
"E", # pycodestyle
"EM", # flake8-errmsg
"F", # Pyflakes
"FURB", # refurb
"G", # flake8-logging-format
"I", # isort
"ISC", # flake8-implicit-str-concat
"LOG", # flake8-logging
"N", # pep8-naming
"PERF", # Perflint
"PT", # flake8-pytest-style
"PTH", # flake8-use-pathlib
"S", # flake8-bandit
"SIM", # flake8-simplify
"TC", # flake8-type-checking
"UP", # pyupgrade
"W", # pycodestyle
]
ignore = [
"UP038", # Deprecated rule; bad idea.
]
[tool.ruff.lint.per-file-ignores]
"**/tests/*" = [
"D103", # Missing docstring in public function
"S101", # Use of assert detected
]
[tool.ruff.lint.isort]
force-sort-within-sections = true
lines-after-imports = 2
required-imports = ["from __future__ import annotations"]
[tool.ruff.lint.pydocstyle]
convention = "google"
[tool.coverage.report]
exclude_also = [
"if TYPE_CHECKING:",
]
[build-system] [build-system]
requires = ["setuptools", "setuptools-scm"] requires = ["setuptools", "setuptools-scm"]

18
src/bsv.bak/__init__.py

@ -0,0 +1,18 @@
# bsv - Backup, Synchronization, Versioning
# Copyright (C) 2023 Simon Boyé
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from bsv._version import __version__, __version_tuple__

21
src/bsv.bak/__main__.py

@ -0,0 +1,21 @@
# bsv - Backup, Synchronization, Versioning
# Copyright (C) 2023 Simon Boyé
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from bsv.main import main
exit(main())

16
src/bsv.bak/_version.py

@ -0,0 +1,16 @@
# file generated by setuptools_scm
# don't change, don't track in version control
TYPE_CHECKING = False
if TYPE_CHECKING:
from typing import Tuple, Union
VERSION_TUPLE = Tuple[Union[int, str], ...]
else:
VERSION_TUPLE = object
version: str
__version__: str
__version_tuple__: VERSION_TUPLE
version_tuple: VERSION_TUPLE
__version__ = version = '0.0.1.dev8+g52a553d.d20231127'
__version_tuple__ = version_tuple = (0, 0, 1, 'dev8', 'g52a553d.d20231127')

126
src/bsv.bak/cli.py

@ -0,0 +1,126 @@
# bsv - Backup, Synchronization, Versioning
# Copyright (C) 2023 Simon Boyé
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from typing import Any, Callable, TypeVar
from rich.console import Console
from rich.text import Text
_console: Console | None = None
def get_console() -> Console:
assert _console is not None
return _console
_error_console: Console | None = None
def get_error_console() -> Console:
assert _error_console is not None
return _error_console
def init_consoles(color: str="auto"):
global _console
global _error_console
assert _console is None
assert _error_console is None
kwargs: dict[str, Any] = {
"tab_size": 4,
}
match color:
case "always":
kwargs["force_terminal"] = True
case "auto":
pass
case "never":
kwargs["no_color"] = True
_console = Console(
**kwargs,
)
_error_console = Console(
stderr = True,
**kwargs,
)
PromptType = TypeVar("PromptType")
class NoDefaultType:
def __repr__(self):
return "NoDefault"
NoDefault = NoDefaultType()
def prompt(
prompt: str,
factory: Callable[[str], PromptType],
*,
console: Console | None = None,
default: PromptType | NoDefaultType = NoDefault,
show_default: bool = True,
) -> PromptType:
if console is None:
console = get_console()
prompt_text = Text(prompt, style="prompt")
prompt_text.end = ""
if show_default and default is not NoDefault:
prompt_text.append(" ")
prompt_text.append(f"({default})", style="prompt.default")
prompt_text.append(": ")
while True:
try:
value = console.input(prompt_text)
except KeyboardInterrupt:
console.print("")
raise
if not value and not isinstance(default, NoDefaultType):
return default
try:
return factory(value)
except ValueError as err:
console.print(err)
def prompt_confirmation(prompt: str, *, console: Console | None=None, default: bool=True) -> bool:
if console is None:
console = get_console()
prompt_text = Text(prompt, style="prompt")
prompt_text.end = ""
prompt_text.append(" ")
if default:
prompt_text.append("(Y/n)", style="prompt.default")
else:
prompt_text.append("(y/N)", style="prompt.default")
prompt_text.append(": ")
while True:
try:
value = console.input(prompt_text).strip().lower()
except KeyboardInterrupt:
console.print("")
raise
if not value and not isinstance(default, NoDefaultType):
return default
if value not in "yn":
console.print("Please answer 'y' or 'n'.")
else:
return value == "y"

0
src/bsv/command/__init__.py → src/bsv.bak/command/__init__.py

0
src/bsv/command/info.py → src/bsv.bak/command/info.py

0
src/bsv/command/init.py → src/bsv.bak/command/init.py

0
src/bsv/exception.py → src/bsv.bak/exception.py

0
src/bsv/main.py → src/bsv.bak/main.py

0
src/bsv/object.py → src/bsv.bak/object.py

0
src/bsv/path_map.py → src/bsv.bak/path_map.py

0
src/bsv/repository.py → src/bsv.bak/repository.py

0
src/bsv/simple_cas/__init__.py → src/bsv.bak/simple_cas/__init__.py

0
src/bsv/simple_cas/cas.py → src/bsv.bak/simple_cas/cas.py

0
src/bsv/tree_walker.py → src/bsv.bak/tree_walker.py

0
src/bsv/util.py → src/bsv.bak/util.py

9
src/bsv/__init__.py

@ -1,5 +1,5 @@
# bsv - Backup, Synchronization, Versioning # pybsv - Backup, Synchronization, Versioning.
# Copyright (C) 2023 Simon Boyé # Copyright (C) 2025 Simon Boyé
# #
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
@ -13,6 +13,11 @@
# #
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
"""pybsv - A Backup, Synchronization and Versioning tool."""
from __future__ import annotations from __future__ import annotations
from bsv._version import __version__, __version_tuple__ from bsv._version import __version__, __version_tuple__
__all__ = ["__version__", "__version_tuple__"]

10
src/bsv/__main__.py

@ -1,5 +1,5 @@
# bsv - Backup, Synchronization, Versioning # pybsv - Backup, Synchronization, Versioning.
# Copyright (C) 2023 Simon Boyé # Copyright (C) 2025 Simon Boyé
# #
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
@ -13,9 +13,11 @@
# #
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Main entry-point. Allow to use bsv module as a command."""
from __future__ import annotations from __future__ import annotations
from bsv.main import main from bsv.cli import cli
exit(main()) exit(cli())

369
src/bsv/cli.py

@ -1,5 +1,5 @@
# bsv - Backup, Synchronization, Versioning # pybsv - Backup, Synchronization, Versioning.
# Copyright (C) 2023 Simon Boyé # Copyright (C) 2025 Simon Boyé
# #
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
@ -13,114 +13,303 @@
# #
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Command-line interface. This is where all bsv commands are defined."""
from __future__ import annotations from __future__ import annotations
from typing import Any, Callable, TypeVar from dataclasses import dataclass
import math
from pathlib import Path, PurePosixPath
import platform
import sys
from typing import Any, ClassVar, Literal
from rich.console import Console import click
from rich.text import Text
from bsv.cli_utils import format_human_byte_size
from bsv.repo import default_repository_path
from bsv.vfs import (
AlreadyExistError,
FileMetadata,
NotFoundError,
Permissions,
VirtualFileSystem,
)
_console: Console | None = None
def get_console() -> Console:
assert _console is not None
return _console
_error_console: Console | None = None @dataclass
def get_error_console() -> Console: class RepositoryParams:
assert _error_console is not None """Global parameters shared by all commands."""
return _error_console
path: Path
def init_consoles(color: str="auto"): def as_filesystem(self) -> VirtualFileSystem:
global _console return VirtualFileSystem(self.path)
global _error_console
assert _console is None
assert _error_console is None
kwargs: dict[str, Any] = { class PermissionsType(click.ParamType):
"tab_size": 4, """Converter for permissions given on the command line."""
}
match color:
case "always":
kwargs["force_terminal"] = True
case "auto":
pass
case "never":
kwargs["no_color"] = True
_console = Console( name: ClassVar[str] = "permissions"
**kwargs,
)
_error_console = Console(
stderr = True,
**kwargs,
)
def convert(
self, value: Any, param: click.Parameter | None, ctx: click.Context | None
) -> Permissions:
"""Convert an argument to a `Permissions` object."""
if isinstance(value, Permissions):
return value
PromptType = TypeVar("PromptType")
class NoDefaultType:
def __repr__(self):
return "NoDefault"
NoDefault = NoDefaultType()
def prompt(
prompt: str,
factory: Callable[[str], PromptType],
*,
console: Console | None = None,
default: PromptType | NoDefaultType = NoDefault,
show_default: bool = True,
) -> PromptType:
if console is None:
console = get_console()
prompt_text = Text(prompt, style="prompt")
prompt_text.end = ""
if show_default and default is not NoDefault:
prompt_text.append(" ")
prompt_text.append(f"({default})", style="prompt.default")
prompt_text.append(": ")
while True:
try: try:
value = console.input(prompt_text) return Permissions(value)
except KeyboardInterrupt: except ValueError as err:
console.print("") self.fail(str(err), param, ctx)
raise
class BsvPathType(click.ParamType):
"""Converter for bsv paths given on the command line."""
name: ClassVar[str] = "bsv_path"
def convert(
self, value: Any, param: click.Parameter | None, ctx: click.Context | None
) -> PurePosixPath:
"""Convert an argument to a bsv path (absolute `PurePosixPath`)."""
if isinstance(value, PurePosixPath):
return value
if not value and not isinstance(default, NoDefaultType):
return default
try: try:
return factory(value) path = PurePosixPath(value)
except ValueError as err: except ValueError as err:
console.print(err) self.fail(str(err), param, ctx)
if not path.is_absolute():
self.fail(f"{value} is not an absolute path", param, ctx)
return path
class AnyPathType(click.ParamType):
"""Converter for bsv or fs paths given on the command line."""
name: str = "any_path"
def prompt_confirmation(prompt: str, *, console: Console | None=None, default: bool=True) -> bool: default: Literal["bsv", "fs"]
if console is None:
console = get_console()
prompt_text = Text(prompt, style="prompt") def __init__(self, default: Literal["bsv", "fs"] = "fs"):
prompt_text.end = "" self.default = default
prompt_text.append(" ")
if default: def convert(
prompt_text.append("(Y/n)", style="prompt.default") self, value: Any, param: click.Parameter | None, ctx: click.Context | None
) -> PurePosixPath | Path:
"""Convert an argument to a bsv or fs path."""
if isinstance(value, (PurePosixPath, Path)):
return value
if not isinstance(value, str):
self.fail(f"{value} is not a string")
path_type = self.default
if value.startswith("bsv:"):
path_type = "bsv"
value = value.removeprefix("bsv:")
elif value.startswith("fs:"):
path_type = "fs"
value = value.removeprefix("fs:")
if path_type == "bsv":
return BsvPathType().convert(value, param, ctx)
else: else:
prompt_text.append("(y/N)", style="prompt.default") return Path(value)
prompt_text.append(": ")
PERMISSIONS_TYPE = PermissionsType()
ANY_OBJECT_TYPE = BsvPathType() # TODO: accept bsv path and object id.
@click.group()
@click.version_option()
@click.option(
"--repo", envvar="BSV_REPO", type=click.Path(resolve_path=True, path_type=Path)
)
@click.pass_context
def cli(ctx: click.Context, repo: Path):
"""Backup, Synchronization and Versioning (bsv) tool.
bsv manages synchronization of several "devices" with history. This makes it
suitable for different tasks:
* Backup: Synchronize your data with remote devices that serve as backup. The
remotes should be configured to keep previous versions of the files (using
configurable rules) so even if a file is deleted/corrupted, a valid version can
be found in the backup devices.
* Synchronization: Synchronize your data among several devices you are working with.
In case of conflict, the conflicting versions of a file are stored in each
devices so it is possible to inspect and merge them to resolve the conflict.
* Versioning: A local device can be used to store different versions of the same
directory structure.
"""
ctx.obj = RepositoryParams(
path=repo or default_repository_path(),
)
@cli.command()
@click.pass_obj
def info(params: RepositoryParams):
"""Print information on the current repository."""
print(f"Repository: {params.path}")
while True:
@cli.command()
@click.option("-d", "--device-name", default=platform.node, prompt=True)
@click.pass_obj
def init(params: RepositoryParams, device_name: str):
"""Initialize a bsv repository."""
print(f"Repository path: {params.path!r}")
print(f"Device name: {device_name!r}")
@cli.command()
@click.argument("directories", nargs=-1, type=BsvPathType())
@click.option("-m", "--mode", type=PERMISSIONS_TYPE, default=Permissions(0o770))
@click.option("-p", "--parents", is_flag=True)
@click.option("-v", "--verbose", is_flag=True)
@click.pass_obj
def mkdir(
params: RepositoryParams,
directories: list[PurePosixPath],
mode: Permissions,
parents: bool = False,
verbose: bool = False,
):
"""Make a directory in the current repository."""
fs = params.as_filesystem()
return_code = 0
for dir in directories:
try: try:
value = console.input(prompt_text).strip().lower() fs.mkdir(dir, mode=mode, parents=parents)
except KeyboardInterrupt: except AlreadyExistError as error:
console.print("") click.echo(error, file=sys.stderr)
raise except NotFoundError as error:
return_code = 1
if not value and not isinstance(default, NoDefaultType): click.echo(error, file=sys.stderr)
return default
if value not in "yn":
console.print("Please answer 'y' or 'n'.")
else: else:
return value == "y" if verbose:
click.echo(f"Created {dir}")
sys.exit(return_code)
@cli.command()
@click.argument("files", nargs=-1, type=BsvPathType())
@click.option("--filter", flag_value="hidden", default=True, hidden=True)
@click.option("-a", "--all", "filter", flag_value="all")
@click.option("-A", "--almost-all", "filter", flag_value="implied")
@click.option("-h", "--human-readable", is_flag=True)
@click.option("-l", "--list", is_flag=True)
@click.pass_obj
def ls(
params: RepositoryParams,
files: tuple[PurePosixPath],
filter: Literal["hidden", "implied", "all"],
human_readable: bool,
list: bool,
):
"""List information about files."""
fs = params.as_filesystem()
if not files:
files = (PurePosixPath("/"),)
filter_md = FileMetadata.is_hidden if filter == "hidden" else lambda _: False
for file_index, file in enumerate(files):
if len(files) > 1:
if file_index:
click.echo()
click.echo(f"{file}:")
items = [(md.path.name, md) for md in fs.iter_dir(file) if not filter_md(md)]
items.sort()
if filter == "all":
items[0:0] = [
(".", fs.metadata(file)),
("..", fs.metadata(file.parent)),
]
if list:
rows: list[tuple[str, str, str, str]] = []
rows_width: list[int] = [0, 0, 0, 0]
for name, md in items:
mode = str(md.unix_mode)
size = (
format_human_byte_size(md.byte_size)
if human_readable
else str(md.byte_size)
)
local_time = md.modification_time.astimezone().replace(tzinfo=None)
time = local_time.isoformat(" ", "seconds")
row = (mode, size, time, name)
rows.append(row)
for index, field in enumerate(row):
rows_width[index] = max(rows_width[index], len(field))
for mode, size, time, name in rows:
click.echo(
" ".join(
[
mode.ljust(rows_width[0]),
size.rjust(rows_width[1]),
time.ljust(rows_width[2]),
name,
]
)
)
else:
for name, _ in items:
click.echo(name)
@cli.command()
@click.argument("object", type=ANY_OBJECT_TYPE)
@click.pass_obj
def show(
params: RepositoryParams,
object: PurePosixPath,
):
"""Show a bsv object."""
print(f"object: {object!r}")
@cli.command()
@click.argument("srcs", nargs=-1, type=AnyPathType(default="bsv"))
@click.argument("dst", type=AnyPathType(default="bsv"))
@click.option("-r", "--recursive", is_flag=True)
@click.pass_obj
def cp(
params: RepositoryParams,
srcs: list[PurePosixPath | Path],
dst: PurePosixPath | Path,
recursive: bool,
):
"""Copy files or directories."""
print(f"srcs: {srcs!r}")
print(f"dst: {dst!r}")
print(f"recursive: {recursive!r}")
@cli.command()
@click.argument("targets", nargs=-1, type=BsvPathType())
@click.option("-r", "--recursive", is_flag=True)
@click.pass_obj
def rm(
params: RepositoryParams,
targets: list[PurePosixPath],
recursive: bool,
):
"""Remove files or directories."""
print(f"targets: {targets}")
print(f"recursive: {recursive}")

48
src/bsv/cli_utils.py

@ -0,0 +1,48 @@
# pybsv - Backup, Synchronization, Versioning.
# Copyright (C) 2025 Simon Boyé
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Tools and utilities to build the command-line interface."""
from __future__ import annotations
from typing import Final
BINARY_PREFIXES: Final[list[str]] = [
"",
"Ki",
"Mi",
"Gi",
"Ti",
"Pi",
"Ei",
"Zi",
"Yi",
"Ri",
"Qi",
]
def format_human_byte_size(byte_size: int) -> str:
"""Format the given `byte_size` as a human-readable string."""
index = min(max((byte_size.bit_length() - 1) // 10, 0), len(BINARY_PREFIXES) - 1)
size = byte_size / 1024**index
num_digits = len(str(int(size)))
decimals = max(0, 3 - num_digits)
rounded = round(size, decimals)
if rounded == 1024 and index + 1 < len(BINARY_PREFIXES):
rounded = 1
index += 1
return f"{rounded:.16g}{BINARY_PREFIXES[index]}B"

0
src/bsv/py.typed

41
src/bsv/repo.py

@ -0,0 +1,41 @@
# pybsv - Backup, Synchronization, Versioning.
# Copyright (C) 2025 Simon Boyé
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
import os
from pathlib import Path
import platform
def default_repository_path() -> Path:
"""Return the system-dependent default repository path."""
if platform.system() in ("Windows", "Darwin", "Java"):
msg = f"default_repository_path does not support {platform.system()} system"
raise NotImplementedError(msg)
else: # Assume Unix
# See https://specifications.freedesktop.org/basedir-spec/latest/
data_home = os.environ.get("XDG_DATA_HOME", "")
if data_home:
path = Path(data_home)
if not path.is_absolute() or not path.exists():
msg = (
f"invalid XDG_DATA_HOME ({path}): path is relative or does not "
"exists"
)
raise RuntimeError(msg)
else:
path = Path.home() / ".local/share"
return path / "bsv/repo"

348
src/bsv/vfs.py

@ -0,0 +1,348 @@
# pybsv - Backup, Synchronization, Versioning.
# Copyright (C) 2025 Simon Boyé
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Provide a virtual file system interface alongside associated tools."""
from __future__ import annotations
from datetime import UTC, datetime
from functools import total_ordering
import os
from pathlib import Path, PurePosixPath
from stat import S_IFDIR, S_IFLNK, S_IFMT, S_IFREG, S_IMODE, filemode
from typing import TYPE_CHECKING, Any, BinaryIO, Literal, Self
from typing_extensions import Buffer
if TYPE_CHECKING:
from collections.abc import Iterator
from os import stat_result
AnyBsvPath = PurePosixPath | str
class FsError(RuntimeError):
"""Error type raised by `FileSystem` objects."""
class AlreadyExistError(FsError):
"""Raise when trying to create an item that already exists."""
class NotFoundError(FsError):
"""Raise when trying to access an item that do not exist."""
class Permissions:
"""Represent the permissions of an object in a filesystem."""
unix_perms: int
def __init__(self, unix_perms: int | str = 0o640):
"""Create a `Permissions` object from `unix_perms`."""
if isinstance(unix_perms, str):
unix_perms = int(unix_perms, 8)
self.unix_perms = unix_perms
def __eq__(self, rhs: Any) -> bool:
"""Test if two `Permission` are the same."""
return (
rhs.unix_perms == self.unix_perms
if isinstance(rhs, Permissions)
else NotImplemented
)
def __repr__(self) -> str:
"""Return a representation of the permissions as valid python code."""
return f"Permissions(0o{oct(self.unix_perms)[2:].rjust(4, '0')})"
def __str__(self) -> str:
"""Return a representation of the permissions of the form 'rwxrwxrwx'."""
return filemode(self.unix_perms)[1:]
DEFAULT_DIR_PERMS = Permissions(0o770)
DEFAULT_FILE_PERMS = Permissions(0o640)
FileType = Literal["dir", "file", "symlink", "other"]
_IFMT_MAP: dict[int, FileType] = {
S_IFDIR: "dir",
S_IFREG: "file",
S_IFLNK: "symlink",
}
@total_ordering
class FileMetadata:
"""Metadata associated with vfs files: file type, permissions, etc."""
path: PurePosixPath
type: FileType
permissions: Permissions
modification_time: datetime
byte_size: int
_stat: stat_result
def __init__(
self,
path: PurePosixPath,
*,
type: FileType,
permissions: Permissions,
modification_time: datetime,
byte_size: int,
):
"""Create a `FileMetadata`."""
self.path = path
self.type = type
self.permissions = permissions
self.modification_time = modification_time
self.byte_size = byte_size
@classmethod
def from_stat(
cls,
path: PurePosixPath,
stat: stat_result,
) -> Self:
"""Create a `FileMetadata` from a `stat_result`."""
return cls(
path,
type=_IFMT_MAP.get(S_IFMT(stat.st_mode), "other"),
permissions=Permissions(S_IMODE(stat.st_mode)),
modification_time=datetime.fromtimestamp(stat.st_mtime, UTC),
byte_size=stat.st_size,
)
@property
def unix_mode(self) -> str:
"""Return unix-like mode in the form '-rwxrwxrwx'."""
return UNIX_MODE_FILE_TYPE[self.type] + str(self.permissions)
@property
def is_hidden(self) -> bool:
"""Return true if the file starts with a '.'."""
return self.path.name.startswith(".")
@property
def is_file(self) -> bool:
"""Test if this is a file."""
return self.type == "file"
@property
def is_dir(self) -> bool:
"""Test if this is a directory."""
return self.type == "dir"
@property
def is_symlink(self) -> bool:
"""Test if this is a symbolic link."""
return self.type == "symlink"
@property
def is_other(self) -> bool:
"""Test if this is a symbolic link."""
return self.type == "other"
def _as_tuple(
self,
) -> tuple[PurePosixPath, FileType, Permissions, datetime, int]:
return (
self.path,
self.type,
self.permissions,
self.modification_time,
self.byte_size,
)
def __eq__(self, rhs: Any) -> bool:
"""Test if two `Metadata` are the same."""
return (
self._as_tuple() == rhs._as_tuple()
if isinstance(rhs, FileMetadata)
else NotImplemented
)
def __lt__(self, rhs: Any) -> bool:
"""Compare `rhs.path` with `self.path`."""
return self.path < rhs.path if isinstance(rhs, FileMetadata) else NotImplemented
UNIX_MODE_FILE_TYPE = {
"dir": "d",
"file": "-",
"other": "o",
"link": "l",
}
class VirtualFileSystem:
"""Represent a file system, with common file system operations."""
path: Path
def __init__(self, path: Path):
"""Initialize the file system to point to `path`."""
self.path = path
def exists(self, path: AnyBsvPath) -> bool:
"""Test if the `path` point to an existing item."""
path = self._make_path(path)
return self._real_path(path).exists()
def is_file(self, path: AnyBsvPath) -> bool:
"""Test if `path` is a file."""
return self.metadata(path).is_file
def is_dir(self, path: AnyBsvPath) -> bool:
"""Test if `path` is a directory."""
return self.metadata(path).is_dir
def is_symlink(self, path: AnyBsvPath) -> bool:
"""Test if `path` is a symbolic link."""
return self.metadata(path).is_symlink
def is_other(self, path: AnyBsvPath) -> bool:
"""Test if `path` is not a file, directory or symbolic link."""
return self.metadata(path).is_other
def metadata(self, path: AnyBsvPath) -> FileMetadata:
"""Return the metadata of a given object."""
metadata = self.metadata_or_none(path)
if metadata is None:
msg = f"file '{path}' not found"
raise NotFoundError(msg)
return metadata
def metadata_or_none(self, path: AnyBsvPath) -> FileMetadata | None:
"""Return the metadata of a given object or `None` if it does not exists."""
path = self._make_path(path)
try:
stat = self._real_path(path).stat(follow_symlinks=False)
except FileNotFoundError:
return None
except OSError as err:
msg = f"failed to read '{path}' metadata"
raise FsError(msg) from err
return FileMetadata.from_stat(path, stat)
def iter_dir(self, path: AnyBsvPath) -> Iterator[FileMetadata]:
"""Return the metadata of all items in the directory `path`."""
path = self._make_path(path)
real_path = self._real_path(path)
try:
for entry in os.scandir(real_path):
yield FileMetadata.from_stat(
path / entry.name, entry.stat(follow_symlinks=False)
)
except OSError as err:
msg = f"failed to read directory {path}"
raise FsError(msg) from err
def read_bytes(self, path: AnyBsvPath) -> bytes:
"""Return the content of `path` as `bytes`."""
with self.open_read(path) as stream:
return stream.read()
def write_bytes(self, path: AnyBsvPath, data: Buffer | BinaryIO) -> int:
"""Create or replace a file at `path`, setting its content to `data`."""
written = 0
with self.open_write(path) as sout:
if isinstance(data, Buffer):
written += sout.write(data)
else:
while chunk := data.read(65536):
written += sout.write(chunk)
return written
def open_read(self, path: AnyBsvPath) -> BinaryIO:
"""Return a read-only binary stream that read the content of `path`."""
path = self._make_path(path)
try:
return self._real_path(path).open("rb")
except OSError as err:
msg = f"failed to read {path}"
raise FsError(msg) from err
def open_write(self, path: AnyBsvPath) -> BinaryIO:
"""Return a write-only binary stream write to `path`."""
path = self._make_path(path)
try:
return self._real_path(path).open("wb")
except OSError as err:
msg = f"failed to read {path}"
raise FsError(msg) from err
def mkdir(
self,
path: AnyBsvPath,
mode: Permissions = DEFAULT_DIR_PERMS,
parents: bool = False,
exist_ok: bool = False,
):
"""Create a directory at `path`.
Args:
path: The directory to create.
mode: The permissions of the new directory.
parents: If `True`, create parent directories if they don't exists.
exist_ok: If `False` and `path` already exist, raise an error.
Raises:
FsError: If something goes wrong.
"""
path = self._make_path(path)
try:
self._real_path(path).mkdir(
mode=mode.unix_perms, parents=parents, exist_ok=exist_ok
)
except FileExistsError as err:
msg = f"{path} already exists"
raise AlreadyExistError(msg) from err
except FileNotFoundError as err:
msg = f"{path.parent} does not exist"
raise NotFoundError(msg) from err
def make_link(self, path: AnyBsvPath, target: AnyBsvPath) -> None:
"""Creates a symbolic link from `path` to `target`."""
path = self._make_path(path)
target = self._make_path(path)
self._real_path(path).symlink_to(self._real_path(target))
def set_permissions(self, path: AnyBsvPath, permissions: Permissions) -> None:
"""Set the permissions of `path` to `permissions`."""
path = self._make_path(path)
self._real_path(path).chmod(permissions.unix_perms)
def set_modification_time(self, path: AnyBsvPath, mod_time: datetime) -> None:
"""Set the modification time of `path` to `mod_time`."""
path = self._make_path(path)
ts = mod_time.timestamp()
os.utime(self._real_path(path), (ts, ts))
def _make_path(self, path: AnyBsvPath) -> PurePosixPath:
if not isinstance(path, PurePosixPath):
path = PurePosixPath(path)
if not path.is_absolute():
msg = f"{path} is not absolute"
raise FsError(msg)
return path
def _real_path(self, path: PurePosixPath) -> Path:
return self.path / path.relative_to("/")

0
tests/test_repository.py → tests.bak/test_repository.py

0
tests/test_simple_cas.py → tests.bak/test_simple_cas.py

18
tests/test_bsv/__init__.py

@ -0,0 +1,18 @@
# pybsv - Backup, Synchronization, Versioning.
# Copyright (C) 2025 Simon Boyé
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""pybsv test module."""
from __future__ import annotations

270
tests/test_bsv/test_cli.py

@ -0,0 +1,270 @@
# pybsv - Backup, Synchronization, Versioning.
# Copyright (C) 2025 Simon Boyé
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from contextlib import contextmanager
from datetime import UTC, datetime
import os
from pathlib import Path
import re
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING, Literal, NamedTuple
from click.testing import CliRunner
from hypothesis import given
import hypothesis.strategies as st
import pytest
from bsv import cli
from bsv.vfs import Permissions
if TYPE_CHECKING:
from collections.abc import Generator
@pytest.fixture
def runner(tmp_path: Path) -> CliRunner:
runner = CliRunner(env={"BSV_REPO": str(tmp_path)})
return runner
@contextmanager
def make_runner() -> Generator[CliRunner, None, None]:
with TemporaryDirectory(prefix="test_vfs_") as tmp:
runner = CliRunner(env={"BSV_REPO": tmp})
yield runner
########################################################################################
# mkdir
def test_mkdir_fails_with_relative_path(tmp_path: Path, runner: CliRunner):
assert not (tmp_path / "test").exists()
result = runner.invoke(cli.cli, ["mkdir", "test"])
assert result.exit_code == 2
assert "test is not an absolute path" in result.stderr
def test_mkdir_default(tmp_path: Path, runner: CliRunner):
assert not (tmp_path / "test").exists()
result = runner.invoke(cli.cli, ["mkdir", "/test"])
assert result.exit_code == 0
assert result.stderr == ""
assert result.stdout == ""
assert (tmp_path / "test").is_dir()
def test_mkdir_multiple_dirs(tmp_path: Path, runner: CliRunner):
assert not (tmp_path / "foo").exists()
assert not (tmp_path / "bar").exists()
result = runner.invoke(cli.cli, ["mkdir", "/foo", "/bar"])
assert result.exit_code == 0
assert result.stderr == ""
assert result.stdout == ""
assert (tmp_path / "foo").is_dir()
assert (tmp_path / "bar").is_dir()
def test_mkdir_nested_fails_without_parents(tmp_path: Path, runner: CliRunner):
assert not (tmp_path / "foo").exists()
result = runner.invoke(cli.cli, ["mkdir", "/foo/bar"])
assert result.exit_code == 1
assert result.stderr == "/foo does not exist\n"
assert result.stdout == ""
def test_mkdir_nested(tmp_path: Path, runner: CliRunner):
assert not (tmp_path / "foo/bar").exists()
result = runner.invoke(cli.cli, ["mkdir", "--parents", "/foo/bar"])
assert result.exit_code == 0
assert result.stderr == ""
assert result.stdout == ""
assert (tmp_path / "foo/bar").is_dir()
def test_mkdir_message_if_exists(tmp_path: Path, runner: CliRunner):
result = runner.invoke(cli.cli, ["mkdir", "/test"])
assert result.exit_code == 0
assert result.stderr == ""
assert result.stdout == ""
assert (tmp_path / "test").is_dir()
result = runner.invoke(cli.cli, ["mkdir", "/test"])
assert result.exit_code == 0
assert result.stderr == "/test already exists\n"
assert result.stdout == ""
assert (tmp_path / "test").is_dir()
def test_mkdir_mode(tmp_path: Path, runner: CliRunner):
assert not (tmp_path / "test").exists()
result = runner.invoke(cli.cli, ["mkdir", "/test", "--mode=741"])
assert result.exit_code == 0
assert result.stderr == ""
assert result.stdout == ""
assert (tmp_path / "test").is_dir()
assert (tmp_path / "test").stat().st_mode & 0o7777 == 0o741
def test_mkdir_verbose(tmp_path: Path, runner: CliRunner):
result = runner.invoke(cli.cli, ["mkdir", "/foo"])
assert result.exit_code == 0
assert result.stderr == ""
assert result.stdout == ""
assert (tmp_path / "foo").is_dir()
assert not (tmp_path / "bar").exists()
result = runner.invoke(cli.cli, ["mkdir", "--verbose", "/foo", "/bar"])
assert result.exit_code == 0
assert result.stderr == "/foo already exists\n"
assert result.stdout == "Created /bar\n"
assert (tmp_path / "foo").is_dir()
assert (tmp_path / "bar").is_dir()
########################################################################################
# ls
def permissions(target: Literal["file", "dir"] = "file"):
return st.builds(
Permissions,
st.sampled_from(
[
0o0400,
0o0440,
0o0444,
0o0600,
0o0640,
0o0644,
0o0664,
0o0750,
0o0755,
0o0777,
]
if target == "file"
else [
0o0400,
0o0600,
0o0640,
]
),
)
class Tree(NamedTuple):
type: Literal["file", "dir"]
name: str
perms: Permissions
time: datetime
content: bytes | list[Tree]
@property
def type_prefix(self) -> str:
if self.type == "dir":
return "d"
return "-"
def build(self, parent: Path) -> None:
path = parent / self.name
if isinstance(self.content, list):
path.mkdir(mode=self.perms.unix_perms)
for child in self.content:
child.build(path)
else:
path.write_bytes(self.content)
path.chmod(self.perms.unix_perms)
ts = self.time.timestamp()
os.utime(path, (ts, ts))
def filenames() -> st.SearchStrategy:
return st.text(
st.characters(exclude_categories=["Cc", "Cs"], exclude_characters='<>:"/\\|!*'),
min_size=1,
max_size=255,
).filter(lambda t: len(t.encode()) < 256 and t not in (".", ".."))
@st.composite
def trees(draw: st.DrawFn, max_depth: int = 3) -> Tree:
file_type = draw(st.sampled_from(["file", "dir"]))
content = (
st.binary()
if file_type == "file"
else st.lists(
trees(max_depth - 1),
unique_by=lambda t: t.name,
max_size=0 if max_depth == 0 else 10,
)
)
return Tree(
file_type, # type: ignore
draw(filenames()),
draw(permissions(file_type)), # type: ignore
draw(
st.datetimes(
min_value=datetime(1902, 1, 1),
max_value=datetime(2100, 1, 1),
timezones=st.just(UTC),
)
),
draw(content),
)
def trees_lists(max_depth: int = 3) -> st.SearchStrategy:
return st.lists(trees(max_depth=max_depth), unique_by=lambda t: t.name)
@given(trees=trees_lists(max_depth=0))
def test_ls(trees: list[Tree]):
with make_runner() as runner:
path = Path(runner.env["BSV_REPO"] or "")
for tree in trees:
tree.build(path)
result = runner.invoke(cli.cli, ["ls", "-lA"])
assert result.exit_code == 0
assert result.stderr == ""
trees.sort(key=lambda t: t.name)
lines = [line for line in result.stdout.splitlines() if line != "\n"]
for line, tree in zip(lines, trees, strict=True):
match = re.fullmatch(
r"""
([dl-])([r-][w-][x-][r-][w-][x-][r-][w-][x-])
\ +(\d+)
\ (\d{4}-\d{2}-\d{2}\ \d{2}:\d{2}:\d{2})
\ ([^\n]+)
""",
line,
re.VERBOSE,
)
assert match
assert match[1] == tree.type_prefix
assert match[2] == str(tree.perms)
if tree.type_prefix != "d":
assert match[3] == str(len(tree.content))
assert match[4] == tree.time.astimezone().replace(tzinfo=None).isoformat(
" ", "seconds"
)
assert match[5] == tree.name
pass

56
tests/test_bsv/test_cli_utils.py

@ -0,0 +1,56 @@
# pybsv - Backup, Synchronization, Versioning.
# Copyright (C) 2025 Simon Boyé
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Tests for cli_utils.py."""
from __future__ import annotations
from bsv.cli_utils import format_human_byte_size
def test_format_human_byte_size():
assert format_human_byte_size(0) == "0B"
assert format_human_byte_size(1) == "1B"
assert format_human_byte_size(9) == "9B"
assert format_human_byte_size(10) == "10B"
assert format_human_byte_size(99) == "99B"
assert format_human_byte_size(100) == "100B"
assert format_human_byte_size(999) == "999B"
assert format_human_byte_size(1000) == "1000B"
assert format_human_byte_size(1023) == "1023B"
assert format_human_byte_size(2**10) == "1KiB"
assert format_human_byte_size(int(1.23456 * 2**10)) == "1.23KiB"
assert format_human_byte_size(9 * 2**10) == "9KiB"
assert format_human_byte_size(10 * 2**10 - 1) == "10KiB"
assert format_human_byte_size(int(98.76543 * 2**10)) == "98.8KiB"
assert format_human_byte_size(99 * 2**10 - 1) == "99KiB"
assert format_human_byte_size(100 * 2**10 - 1) == "100KiB"
assert format_human_byte_size(int(192.8374 * 2**10)) == "193KiB"
assert format_human_byte_size(999 * 2**10 - 1) == "999KiB"
assert format_human_byte_size(1000 * 2**10 - 1) == "1000KiB"
assert format_human_byte_size(2**20 - 1) == "1MiB"
assert format_human_byte_size(2**20) == "1MiB"
assert format_human_byte_size(2**30) == "1GiB"
assert format_human_byte_size(2**40) == "1TiB"
assert format_human_byte_size(2**50) == "1PiB"
assert format_human_byte_size(2**60) == "1EiB"
assert format_human_byte_size(2**70) == "1ZiB"
assert format_human_byte_size(2**80) == "1YiB"
assert format_human_byte_size(2**90) == "1RiB"
assert format_human_byte_size(2**100 - 2**80) == "1QiB"
assert format_human_byte_size(2**100) == "1QiB"
assert format_human_byte_size(2**110 - 2**90) == "1024QiB"
assert format_human_byte_size(2**110) == "1024QiB"
assert format_human_byte_size(2**120) == "1048576QiB"

394
tests/test_bsv/test_vfs.py

@ -0,0 +1,394 @@
# pybsv - Backup, Synchronization, Versioning.
# Copyright (C) 2025 Simon Boyé
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Tests for the `VirtualFileSystem` class and related stuff."""
from __future__ import annotations
from datetime import UTC, datetime
from io import BytesIO
from pathlib import Path, PurePosixPath
import pytest
from bsv.vfs import FileMetadata, FsError, Permissions, VirtualFileSystem
@pytest.fixture
def fs(tmp_path: Path) -> VirtualFileSystem:
"""Fixture that returns a `VirtualFileSystem`."""
return VirtualFileSystem(tmp_path)
########################################################################################
# Permissions
def test_permissions():
perm0 = Permissions(0o1234)
assert perm0.unix_perms == 0o1234
perm1 = Permissions("752")
assert perm1.unix_perms == 0o752
assert perm0 == perm0
assert perm0 != perm1
assert repr(perm0) == "Permissions(0o1234)"
assert repr(perm1) == "Permissions(0o0752)"
assert str(perm0) == "-w--wxr-T"
assert str(perm1) == "rwxr-x-w-"
########################################################################################
# FileMetadata
def test_file_metadata():
path = PurePosixPath("/some_dir/some_file")
permissions = Permissions(0o1234)
mod_time = datetime(2025, 7, 12, 12, 34, 56, tzinfo=UTC)
file_md = FileMetadata(
path,
type="file",
permissions=permissions,
modification_time=mod_time,
byte_size=123,
)
assert file_md.path == path
assert file_md.type == "file"
assert file_md.permissions == permissions
assert file_md.modification_time == mod_time
assert file_md.byte_size == 123
assert file_md.unix_mode == "--w--wxr-T"
assert not file_md.is_hidden
assert file_md.is_file
assert not file_md.is_dir
assert not file_md.is_symlink
assert not file_md.is_other
dir_md = FileMetadata(
path,
type="dir",
permissions=permissions,
modification_time=mod_time,
byte_size=123,
)
assert dir_md.type == "dir"
assert not dir_md.is_file
assert dir_md.is_dir
assert not dir_md.is_symlink
assert not dir_md.is_other
symlink_md = FileMetadata(
path,
type="symlink",
permissions=permissions,
modification_time=mod_time,
byte_size=123,
)
assert symlink_md.type == "symlink"
assert not symlink_md.is_file
assert not symlink_md.is_dir
assert symlink_md.is_symlink
assert not symlink_md.is_other
other_md = FileMetadata(
path,
type="other",
permissions=permissions,
modification_time=mod_time,
byte_size=123,
)
assert other_md.type == "other"
assert not other_md.is_file
assert not other_md.is_dir
assert not other_md.is_symlink
assert other_md.is_other
assert FileMetadata(
PurePosixPath("/some_dir/.some_file"),
type="file",
permissions=permissions,
modification_time=mod_time,
byte_size=123,
).is_hidden
def test_file_metadata_eq():
path = PurePosixPath("/some_dir/some_file")
permissions = Permissions(0o1234)
mod_time = datetime(2025, 7, 12, 12, 34, 56, tzinfo=UTC)
md = FileMetadata(
path,
type="file",
permissions=permissions,
modification_time=mod_time,
byte_size=123,
)
assert (
FileMetadata(
path,
type="file",
permissions=permissions,
modification_time=mod_time,
byte_size=123,
)
== md
)
assert (
FileMetadata(
PurePosixPath("/some_dir/some_other_file"),
type="file",
permissions=permissions,
modification_time=mod_time,
byte_size=123,
)
!= md
)
assert (
FileMetadata(
path,
type="dir",
permissions=permissions,
modification_time=mod_time,
byte_size=123,
)
!= md
)
assert (
FileMetadata(
path,
type="file",
permissions=Permissions(0o0752),
modification_time=mod_time,
byte_size=123,
)
!= md
)
assert (
FileMetadata(
path,
type="file",
permissions=permissions,
modification_time=datetime(2025, 1, 2, 3, 4, 5),
byte_size=123,
)
!= md
)
assert (
FileMetadata(
path,
type="file",
permissions=permissions,
modification_time=mod_time,
byte_size=124,
)
!= md
)
########################################################################################
# mkdir
def test_mkdir_fails_with_relative_path(fs: VirtualFileSystem):
with pytest.raises(FsError):
fs.mkdir("test")
def test_mkdir_default(fs: VirtualFileSystem):
assert not fs.exists("/test")
fs.mkdir("/test")
assert fs.is_dir("/test")
def test_mkdir_nested_fails_without_parents(fs: VirtualFileSystem):
assert not fs.exists("/foo")
with pytest.raises(FsError):
fs.mkdir("/foo/bar")
def test_mkdir_nested(fs: VirtualFileSystem):
assert not fs.exists("/test")
fs.mkdir("/test/foobar", parents=True)
assert fs.is_dir("/test/foobar")
def test_mkdir_fails_if_exists(fs: VirtualFileSystem):
assert not fs.exists("/foo")
fs.mkdir("/foo")
assert fs.is_dir("/foo")
with pytest.raises(FsError):
fs.mkdir("/foo")
def test_mkdir_exists_ok(fs: VirtualFileSystem):
assert not fs.exists("/test")
fs.mkdir("/test")
assert fs.is_dir("/test")
fs.mkdir("/test", exist_ok=True)
def test_mkdir_exists_ok_fail_if_file(fs: VirtualFileSystem):
fs.write_bytes("/test", b"test")
assert fs.is_file("/test")
with pytest.raises(FsError):
fs.mkdir("/test", exist_ok=True)
def test_mkdir_mode(fs: VirtualFileSystem):
assert not fs.exists("/test")
permissions = Permissions(0o741)
fs.mkdir("/test", mode=permissions)
assert fs.is_dir("/test")
assert fs.metadata("/test").permissions == permissions
########################################################################################
# read_bytes / write_bytes
def test_read_write_bytes(fs: VirtualFileSystem):
assert not fs.exists("/test")
fs.write_bytes("/test", b"This is a test.")
assert fs.read_bytes("/test") == b"This is a test."
stream = BytesIO(b"Another test.")
fs.write_bytes("/test", stream)
assert fs.read_bytes("/test") == b"Another test."
with pytest.raises(FsError):
fs.read_bytes("/does_not_exist")
with pytest.raises(FsError):
fs.write_bytes("/does_not_exist/foobar", b"")
def test_open_read_write(fs: VirtualFileSystem):
assert not fs.exists("/test")
with fs.open_write("/test") as stream:
stream.write(b"foo")
stream.write(b"bar")
assert fs.exists("/test")
with fs.open_read("/test") as stream:
assert stream.read(3) == b"foo"
assert stream.read(3) == b"bar"
assert stream.read() == b""
# Test overwrite
with fs.open_write("/test") as stream:
stream.write(b"baz")
with fs.open_read("/test") as stream:
assert stream.read() == b"baz"
with pytest.raises(FsError):
fs.open_read("/does_not_exist")
with pytest.raises(FsError):
fs.open_write("/does_not_exist/foobar")
########################################################################################
# metadata
def test_metadata(fs: VirtualFileSystem):
file_permissions = Permissions(0o754)
file_time = datetime(2025, 5, 17, 13, 57, 32, tzinfo=UTC)
file_content = b"This is a test\n"
fs.write_bytes("/test_file", file_content)
fs.set_permissions("/test_file", file_permissions)
fs.set_modification_time("/test_file", file_time)
md = fs.metadata("/test_file")
assert md.path == PurePosixPath("/test_file")
assert md.permissions == file_permissions
assert md.type == "file"
assert md.modification_time == file_time
assert md.byte_size == len(file_content)
assert not md.is_hidden
assert fs.metadata("/test_file") == md
assert fs.is_file("/test_file")
assert not fs.is_dir("/test_file")
assert not fs.is_symlink("/test_file")
assert not fs.is_other("/test_file")
fs.set_permissions("/test_file", Permissions(0o644))
assert fs.metadata("/test_file") != md
fs.mkdir("/.test_dir")
md = fs.metadata("/.test_dir")
assert md.type == "dir"
assert fs.metadata("/.test_dir").is_hidden
assert not fs.is_file("/.test_dir")
assert fs.is_dir("/.test_dir")
assert not fs.is_symlink("/.test_dir")
assert not fs.is_other("/.test_dir")
fs.make_link("/test_link", "/link_target")
md = fs.metadata("/test_link")
assert md.type == "symlink"
assert not fs.is_file("/test_link")
assert not fs.is_dir("/test_link")
assert fs.is_symlink("/test_link")
assert not fs.is_other("/test_link")
assert fs.metadata_or_none("/does_not_exist") is None
with pytest.raises(FsError):
fs.metadata("/does_not_exist")
########################################################################################
# iter_dir
def test_iter_dir(fs: VirtualFileSystem):
expected = [
(PurePosixPath("/dir"), "dir"),
(PurePosixPath("/file"), "file"),
(PurePosixPath("/link"), "symlink"),
]
for path, file_type in expected:
if file_type == "dir":
fs.mkdir(path)
elif file_type == "file":
fs.write_bytes(path, b"")
elif file_type == "symlink":
fs.make_link(path, "/foobar")
items_metadata = sorted(fs.iter_dir("/"))
for md, [path, file_type] in zip(items_metadata, expected, strict=True):
assert md.path == path
assert md.type == file_type
def test_iter_dir_failure(fs: VirtualFileSystem):
with pytest.raises(FsError):
list(fs.iter_dir("/test"))
fs.write_bytes("/test", b"")
with pytest.raises(FsError):
list(fs.iter_dir("/test"))
Loading…
Cancel
Save