diff --git a/.gitignore b/.gitignore index 7f8f940..1aefd01 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ __pycache__ +*.egg-info +/.coverage +/.hypothesis/ /src/bsv/_version.py /venv -*.egg-info diff --git a/pyproject.toml b/pyproject.toml index dade943..215d763 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,15 +12,18 @@ classifiers = [ ] dynamic = ["version"] dependencies = [ + "click", "fastcdc", "rich", "tomlkit", + "typing-extensions" ] [project.optional-dependencies] -test = [ - "pytest", +dev = [ "hypothesis", + "pytest", + "pytest-cov" ] [project.urls] @@ -28,7 +31,7 @@ test = [ "Bug Tracker" = "https://git.draklia.net/draklaw/pybsv/issues" [project.scripts] -bsv = "bsv.main:main" +bsv = "bsv.cli:cli" [tool.ruff] @@ -57,14 +60,31 @@ select = [ "UP", # pyupgrade "W", # pycodestyle ] +ignore = [ + "UP038", # Deprecated rule; bad idea. +] + +[tool.ruff.lint.per-file-ignores] +"**/tests/*" = [ + "D103", # Missing docstring in public function + "S101", # Use of assert detected +] [tool.ruff.lint.isort] +force-sort-within-sections = true +lines-after-imports = 2 required-imports = ["from __future__ import annotations"] [tool.ruff.lint.pydocstyle] convention = "google" +[tool.coverage.report] +exclude_also = [ + "if TYPE_CHECKING:", +] + + [build-system] requires = ["setuptools", "setuptools-scm"] build-backend = "setuptools.build_meta" diff --git a/src/bsv/__init__.py b/src/bsv/__init__.py index 18276bf..4c55921 100644 --- a/src/bsv/__init__.py +++ b/src/bsv/__init__.py @@ -1,4 +1,4 @@ -# pybsv +# pybsv - Backup, Synchronization, Versioning. # Copyright (C) 2025 Simon Boyé # # This program is free software: you can redistribute it and/or modify @@ -14,3 +14,10 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """pybsv - A Backup, Synchronization and Versioning tool.""" + +from __future__ import annotations + +from bsv._version import __version__, __version_tuple__ + + +__all__ = ["__version__", "__version_tuple__"] diff --git a/src/bsv/__main__.py b/src/bsv/__main__.py new file mode 100644 index 0000000..b6cc351 --- /dev/null +++ b/src/bsv/__main__.py @@ -0,0 +1,23 @@ +# pybsv - Backup, Synchronization, Versioning. +# Copyright (C) 2025 Simon Boyé +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +"""Main entry-point. Allow to use bsv module as a command.""" + +from __future__ import annotations + +from bsv.cli import cli + + +exit(cli()) diff --git a/src/bsv/cli.py b/src/bsv/cli.py new file mode 100644 index 0000000..3757187 --- /dev/null +++ b/src/bsv/cli.py @@ -0,0 +1,330 @@ +# pybsv - Backup, Synchronization, Versioning. +# Copyright (C) 2025 Simon Boyé +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +"""Command-line interface. This is where all bsv commands are defined.""" + +from __future__ import annotations + +from dataclasses import dataclass +import math +from pathlib import Path, PurePosixPath +import platform +import sys +from typing import Any, ClassVar, Literal + +import click + +from bsv.repo import default_repository_path +from bsv.vfs import ( + AlreadyExistError, + FileMetadata, + NotFoundError, + Permissions, + VirtualFileSystem, +) + + +@dataclass +class RepositoryParams: + """Global parameters shared by all commands.""" + + path: Path + + def as_filesystem(self) -> VirtualFileSystem: + return VirtualFileSystem(self.path) + + +class PermissionsType(click.ParamType): + """Converter for permissions given on the command line.""" + + name: ClassVar[str] = "permissions" + + def convert( + self, value: Any, param: click.Parameter | None, ctx: click.Context | None + ) -> Permissions: + """Convert an argument to a `Permissions` object.""" + if isinstance(value, Permissions): + return value + + try: + return Permissions(value) + except ValueError as err: + self.fail(str(err), param, ctx) + + +class BsvPathType(click.ParamType): + """Converter for bsv paths given on the command line.""" + + name: ClassVar[str] = "bsv_path" + + def convert( + self, value: Any, param: click.Parameter | None, ctx: click.Context | None + ) -> PurePosixPath: + """Convert an argument to a bsv path (absolute `PurePosixPath`).""" + if isinstance(value, PurePosixPath): + return value + + try: + path = PurePosixPath(value) + except ValueError as err: + self.fail(str(err), param, ctx) + + if not path.is_absolute(): + self.fail(f"{value} is not an absolute path", param, ctx) + + return path + + +class AnyPathType(click.ParamType): + """Converter for bsv or fs paths given on the command line.""" + + name: ClassVar[str] = "any_path" + + default: Literal["bsv", "fs"] + + def __init__(self, default: Literal["bsv", "fs"] = "fs"): + self.default = default + + def convert( + self, value: Any, param: click.Parameter | None, ctx: click.Context | None + ) -> PurePosixPath | Path: + """Convert an argument to a bsv or fs path.""" + if isinstance(value, (PurePosixPath, Path)): + return value + + if not isinstance(value, str): + self.fail(f"{value} is not a string") + + path_type = self.default + if value.startswith("bsv:"): + path_type = "bsv" + value = value.removeprefix("bsv:") + elif value.startswith("fs:"): + path_type = "fs" + value = value.removeprefix("fs:") + + if path_type == "bsv": + return BsvPathType().convert(value, param, ctx) + else: + return Path(value) + + +PERMISSIONS_TYPE = PermissionsType() +ANY_OBJECT_TYPE = BsvPathType() # TODO: accept bsv path and object id. + + +@click.group() +@click.version_option() +@click.option( + "--repo", envvar="BSV_REPO", type=click.Path(resolve_path=True, path_type=Path) +) +@click.pass_context +def cli(ctx: click.Context, repo: Path): + """Backup, Synchronization and Versioning (bsv) tool. + + bsv manages synchronization of several "devices" with history. This makes it + suitable for different tasks: + + * Backup: Synchronize your data with remote devices that serve as backup. The + remotes should be configured to keep previous versions of the files (using + configurable rules) so even if a file is deleted/corrupted, a valid version can + be found in the backup devices. + * Synchronization: Synchronize your data among several devices you are working with. + In case of conflict, the conflicting versions of a file are stored in each + devices so it is possible to inspect and merge them to resolve the conflict. + * Versioning: A local device can be used to store different versions of the same + directory structure. + """ + ctx.obj = RepositoryParams( + path=repo or default_repository_path(), + ) + + +@cli.command() +@click.pass_obj +def info(params: RepositoryParams): + """Print information on the current repository.""" + print(f"Repository: {params.path}") + + +@cli.command() +@click.option("-d", "--device-name", default=platform.node, prompt=True) +@click.pass_obj +def init(params: RepositoryParams, device_name: str): + """Initialize a bsv repository.""" + print(f"device_name: {device_name!r}") + + +@cli.command() +@click.argument("directories", nargs=-1, type=BsvPathType()) +@click.option("-m", "--mode", type=PERMISSIONS_TYPE, default=Permissions(0o770)) +@click.option("-p", "--parents", is_flag=True) +@click.option("-v", "--verbose", is_flag=True) +@click.pass_obj +def mkdir( + params: RepositoryParams, + directories: list[PurePosixPath], + mode: Permissions, + parents: bool = False, + verbose: bool = False, +): + """Make a directory in the current repository.""" + fs = params.as_filesystem() + + return_code = 0 + for dir in directories: + try: + fs.mkdir(dir, mode=mode, parents=parents) + except AlreadyExistError as error: + click.echo(error, file=sys.stderr) + except NotFoundError as error: + return_code = 1 + click.echo(error, file=sys.stderr) + else: + if verbose: + click.echo(f"Created {dir}") + + sys.exit(return_code) + + +SI_PREFIXES = ["", "k", "M", "G", "T", "P", "E", "Z", "Y", "R", "Q"] + + +def human_byte_size(byte_size: int) -> str: + if byte_size == 0: + return "0" + + prefix_index = int(math.log2(byte_size) / 10) + display_size: float = byte_size / 1024**prefix_index + size = ( + str(math.ceil(display_size)) + if prefix_index == 0 or display_size >= 10 + else repr(math.ceil(display_size * 10) / 10) + ) + return f"{size}{SI_PREFIXES[prefix_index]}" + + +@cli.command() +@click.argument("files", nargs=-1, type=BsvPathType()) +@click.option("--filter", flag_value="hidden", default=True, hidden=True) +@click.option("-a", "--all", "filter", flag_value="all") +@click.option("-A", "--almost-all", "filter", flag_value="implied") +@click.option("-h", "--human-readable", is_flag=True) +@click.option("-l", "--list", is_flag=True) +@click.pass_obj +def ls( + params: RepositoryParams, + files: tuple[PurePosixPath], + filter: Literal["hidden", "implied", "all"], + human_readable: bool, + list: bool, +): + """List information about files.""" + fs = params.as_filesystem() + + if not files: + files = (PurePosixPath("/"),) + + filter_md = FileMetadata.is_hidden_files if filter == "hidden" else lambda _: False + + for file_index, file in enumerate(files): + if len(files) > 1: + if file_index: + click.echo() + click.echo(f"{file}:") + + items = [(md.path.name, md) for md in fs.iter_dir(file) if not filter_md(md)] + items.sort() + + if filter == "all": + items[0:0] = [ + (".", fs.metadata(file)), + ("..", fs.metadata(file.parent)), + ] + + if list: + rows: list[tuple[str, str, str, str]] = [] + rows_width: list[int] = [0, 0, 0, 0] + for name, md in items: + mode = str(md.unix_mode) + size = ( + human_byte_size(md.byte_size) + if human_readable + else str(md.byte_size) + ) + local_time = md.modification_time.astimezone().replace(tzinfo=None) + time = local_time.isoformat(" ", "seconds") + row = (mode, size, time, name) + rows.append(row) + for index, field in enumerate(row): + rows_width[index] = max(rows_width[index], len(field)) + + for mode, size, time, name in rows: + click.echo( + " ".join( + [ + mode.ljust(rows_width[0]), + size.rjust(rows_width[1]), + time.ljust(rows_width[2]), + name, + ] + ) + ) + + else: + for name, _ in items: + click.echo(name) + + +@cli.command() +@click.argument("object", type=ANY_OBJECT_TYPE) +@click.pass_obj +def show( + params: RepositoryParams, + object: PurePosixPath, +): + """Show a bsv object.""" + print(f"object: {object!r}") + + +@cli.command() +@click.argument("srcs", nargs=-1, type=AnyPathType(default="bsv")) +@click.argument("dst", type=AnyPathType(default="bsv")) +@click.option("-r", "--recursive", is_flag=True) +@click.pass_obj +def cp( + params: RepositoryParams, + srcs: list[PurePosixPath | Path], + dst: PurePosixPath | Path, + recursive: bool, +): + """Copy files or directories.""" + print(f"srcs: {srcs!r}") + print(f"dst: {dst!r}") + print(f"recursive: {recursive!r}") + + +@cli.command() +@click.argument("targets", nargs=-1, type=BsvPathType()) +@click.option("-r", "--recursive", is_flag=True) +@click.pass_obj +def rm( + params: RepositoryParams, + targets: list[PurePosixPath], + recursive: bool, +): + """Remove files or directories.""" + print(f"targets: {targets}") + print(f"recursive: {recursive}") diff --git a/src/bsv/py.typed b/src/bsv/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/src/bsv/repo.py b/src/bsv/repo.py new file mode 100644 index 0000000..55e95ea --- /dev/null +++ b/src/bsv/repo.py @@ -0,0 +1,41 @@ +# pybsv - Backup, Synchronization, Versioning. +# Copyright (C) 2025 Simon Boyé +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +from __future__ import annotations + +import os +from pathlib import Path +import platform + + +def default_repository_path() -> Path: + """Return the system-dependent default repository path.""" + if platform.system() in ("Windows", "Darwin", "Java"): + msg = f"default_repository_path does not support {platform.system()} system" + raise NotImplementedError(msg) + else: # Assume Unix + # See https://specifications.freedesktop.org/basedir-spec/latest/ + data_home = os.environ.get("XDG_DATA_HOME", "") + if data_home: + path = Path(data_home) + if not path.is_absolute() or not path.exists(): + msg = ( + f"invalid XDG_DATA_HOME ({path}): path is relative or does not " + "exists" + ) + raise RuntimeError(msg) + else: + path = Path.home() / ".local/share" + return path / "bsv/repo" diff --git a/src/bsv/vfs.py b/src/bsv/vfs.py new file mode 100644 index 0000000..b45d3f4 --- /dev/null +++ b/src/bsv/vfs.py @@ -0,0 +1,328 @@ +# pybsv - Backup, Synchronization, Versioning. +# Copyright (C) 2025 Simon Boyé +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +"""Provide a virtual file system interface alongside associated tools.""" + +from __future__ import annotations + +from datetime import UTC, datetime +from functools import total_ordering +import os +from pathlib import Path, PurePosixPath +from stat import S_IFDIR, S_IFLNK, S_IFMT, S_IFREG, S_IMODE, filemode +from typing import TYPE_CHECKING, Any, BinaryIO, Literal, Self, cast + +from typing_extensions import Buffer + + +if TYPE_CHECKING: + from collections.abc import Iterator + from os import stat_result + + +AnyBsvPath = PurePosixPath | str + + +class FsError(RuntimeError): + """Error type raised by `FileSystem` objects.""" + + +class AlreadyExistError(FsError): + """Raise when trying to create an item that already exists.""" + + +class NotFoundError(FsError): + """Raise when trying to access an item that do not exist.""" + + +class Permissions: + """Represent the permissions of an object in a filesystem.""" + + unix_perms: int + + def __init__(self, unix_perms: int | str = 0o640): + """Create a `Permissions` object from `unix_perms`.""" + if isinstance(unix_perms, str): + unix_perms = int(unix_perms, 8) + self.unix_perms = unix_perms + + def __eq__(self, rhs: Any) -> bool: + """Test if two `Permission` are the same.""" + return ( + rhs.unix_perms == self.unix_perms + if isinstance(rhs, Permissions) + else NotImplemented + ) + + def __repr__(self) -> str: + """Return a representation of the permissions as valid python code.""" + return f"Permissions(0o{oct(self.unix_perms)[2:].rjust(4, '0')})" + + def __str__(self) -> str: + """Return a representation of the permissions of the form 'rwxrwxrwx'.""" + return filemode(self.unix_perms)[1:] + + +DEFAULT_DIR_PERMS = Permissions(0o770) +DEFAULT_FILE_PERMS = Permissions(0o640) + + +class VirtualFileSystem: + """Represent a file system, with common file system operations.""" + + path: Path + + def __init__(self, path: Path): + """Initialize the file system to point to `path`.""" + self.path = path + + def exists(self, path: AnyBsvPath) -> bool: + """Test if the `path` point to an existing item.""" + path = self._make_path(path) + return self._real_path(path).exists() + + def is_file(self, path: AnyBsvPath) -> bool: + """Test if `path` is a file.""" + path = self._make_path(path) + return self._real_path(path).is_file() + + def is_dir(self, path: AnyBsvPath) -> bool: + """Test if `path` is a directory.""" + path = self._make_path(path) + return self._real_path(path).is_dir() + + def metadata(self, path: AnyBsvPath) -> FileMetadata: + """Return the metadata of a given object.""" + path = self._make_path(path) + return FileMetadata.from_stat( + self, path, self._real_path(path).stat(follow_symlinks=False) + ) + + def iter_dir(self, path: AnyBsvPath) -> Iterator[FileMetadata]: + """Return the metadata of all items in the directory `path`.""" + path = self._make_path(path) + real_path = self._real_path(path) + try: + for entry in os.scandir(real_path): + yield FileMetadata.from_stat( + self, path / entry.name, entry.stat(follow_symlinks=False) + ) + except OSError as err: + msg = f"failed to read directory {path}" + raise FsError(msg) from err + + def read_bytes(self, path: AnyBsvPath) -> bytes: + """Return the content of `path` as `bytes`.""" + path = self._make_path(path) + try: + return self._real_path(path).read_bytes() + except OSError as err: + msg = f"failed to read {path}" + raise FsError(msg) from err + + def write_bytes(self, path: AnyBsvPath, data: Buffer | BinaryIO) -> int: + """Create or replace a file at `path`, setting its content to `data`.""" + path = self._make_path(path) + real_path = self._real_path(path) + written = 0 + + try: + stream = real_path.open("wb") + except OSError as err: + msg = f"failed to write {path}" + raise FsError(msg) from err + + with stream: + if isinstance(data, Buffer): + written += stream.write(data) + else: + while chunk := data.read(65536): + written += stream.write(chunk) + return written + + def open_read(self, path: AnyBsvPath) -> BinaryIO: + """Return a read-only binary stream that read the content of `path`.""" + path = self._make_path(path) + try: + return self._real_path(path).open("rb") + except OSError as err: + msg = f"failed to read {path}" + raise FsError(msg) from err + + def open_write(self, path: AnyBsvPath) -> BinaryIO: + """Return a write-only binary stream write to `path`.""" + path = self._make_path(path) + try: + return self._real_path(path).open("wb") + except OSError as err: + msg = f"failed to read {path}" + raise FsError(msg) from err + + def mkdir( + self, + path: AnyBsvPath, + mode: Permissions = DEFAULT_DIR_PERMS, + parents: bool = False, + exist_ok: bool = False, + ): + """Create a directory at `path`. + + Args: + path: The directory to create. + mode: The permissions of the new directory. + parents: If `True`, create parent directories if they don't exists. + exist_ok: If `False` and `path` already exist, raise an error. + + Raises: + FsError: If something goes wrong. + """ + path = self._make_path(path) + try: + self._real_path(path).mkdir( + mode=mode.unix_perms, parents=parents, exist_ok=exist_ok + ) + except FileExistsError as err: + msg = f"{path} already exists" + raise AlreadyExistError(msg) from err + except FileNotFoundError as err: + msg = f"{path.parent} does not exist" + raise NotFoundError(msg) from err + + def make_link(self, path: AnyBsvPath, target: AnyBsvPath) -> None: + """Creates a symbolic link from `path` to `target`.""" + path = self._make_path(path) + target = self._make_path(path) + self._real_path(path).symlink_to(self._real_path(target)) + + def set_permissions(self, path: AnyBsvPath, permissions: Permissions) -> None: + """Set the permissions of `path` to `permissions`.""" + path = self._make_path(path) + self._real_path(path).chmod(permissions.unix_perms) + + def set_modification_time(self, path: AnyBsvPath, mod_time: datetime) -> None: + """Set the modification time of `path` to `mod_time`.""" + path = self._make_path(path) + ts = mod_time.timestamp() + os.utime(self._real_path(path), (ts, ts)) + + def _make_path(self, path: AnyBsvPath) -> PurePosixPath: + if not isinstance(path, PurePosixPath): + path = PurePosixPath(path) + if not path.is_absolute(): + msg = f"{path} is not absolute" + raise FsError(msg) + return path + + def _real_path(self, path: PurePosixPath) -> Path: + return self.path / path.relative_to("/") + + +FileType = Literal["dir", "file", "link", "other"] + +_IFMT_MAP = { + S_IFDIR: "dir", + S_IFREG: "file", + S_IFLNK: "link", +} + + +@total_ordering +class FileMetadata: + """Metadata associated with vfs files: file type, permissions, etc.""" + + vfs: VirtualFileSystem + path: PurePosixPath + type: FileType + permissions: Permissions + modification_time: datetime + byte_size: int + _stat: stat_result + + def __init__( + self, + vfs: VirtualFileSystem, + path: PurePosixPath, + *, + type: FileType, + permissions: Permissions, + modification_time: datetime, + byte_size: int, + ): + """Create a `FileMetadata`.""" + self.vfs = vfs + self.path = path + self.type = type + self.permissions = permissions + self.modification_time = modification_time + self.byte_size = byte_size + + @classmethod + def from_stat( + cls, + vfs: VirtualFileSystem, + path: PurePosixPath, + stat: stat_result, + ) -> Self: + """Create a `FileMetadata` from a `stat_result`.""" + return cls( + vfs, + path, + type=cast("FileType", _IFMT_MAP.get(S_IFMT(stat.st_mode), "other")), + permissions=Permissions(S_IMODE(stat.st_mode)), + modification_time=datetime.fromtimestamp(stat.st_mtime, UTC), + byte_size=stat.st_size, + ) + + @property + def unix_mode(self) -> str: + """Return unix-like mode in the form '-rwxrwxrwx'.""" + return UNIX_MODE_FILE_TYPE[self.type] + str(self.permissions) + + @property + def is_hidden_files(self) -> bool: + """Return true if the file starts with a '.'.""" + return self.path.name.startswith(".") + + def _as_tuple( + self, + ) -> tuple[VirtualFileSystem, PurePosixPath, FileType, Permissions, datetime, int]: + return ( + self.vfs, + self.path, + self.type, + self.permissions, + self.modification_time, + self.byte_size, + ) + + def __eq__(self, rhs: Any) -> bool: + """Test if two `Metadata` are the same.""" + return ( + self._as_tuple() == rhs._as_tuple() + if isinstance(rhs, FileMetadata) + else NotImplemented + ) + + def __lt__(self, rhs: Any) -> bool: + """Compare `rhs.path` with `self.path`.""" + return self.path < rhs.path if isinstance(rhs, FileMetadata) else NotImplemented + + +UNIX_MODE_FILE_TYPE = { + "dir": "d", + "file": "-", + "other": "o", + "link": "l", +} diff --git a/tests/test_bsv/__init__.py b/tests/test_bsv/__init__.py new file mode 100644 index 0000000..ca34cef --- /dev/null +++ b/tests/test_bsv/__init__.py @@ -0,0 +1,18 @@ +# pybsv - Backup, Synchronization, Versioning. +# Copyright (C) 2025 Simon Boyé +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +"""pybsv test module.""" + +from __future__ import annotations diff --git a/tests/test_bsv/test_cli.py b/tests/test_bsv/test_cli.py new file mode 100644 index 0000000..8cda5e7 --- /dev/null +++ b/tests/test_bsv/test_cli.py @@ -0,0 +1,270 @@ +# pybsv - Backup, Synchronization, Versioning. +# Copyright (C) 2025 Simon Boyé +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +from __future__ import annotations + +from contextlib import contextmanager +from datetime import UTC, datetime +import os +from pathlib import Path +import re +from tempfile import TemporaryDirectory +from typing import TYPE_CHECKING, Literal, NamedTuple + +from click.testing import CliRunner +from hypothesis import given +import hypothesis.strategies as st +import pytest + +from bsv import cli +from bsv.vfs import Permissions + + +if TYPE_CHECKING: + from collections.abc import Generator + + +@pytest.fixture +def runner(tmp_path: Path) -> CliRunner: + runner = CliRunner(env={"BSV_REPO": str(tmp_path)}) + return runner + + +@contextmanager +def make_runner() -> Generator[CliRunner, None, None]: + with TemporaryDirectory(prefix="test_vfs_") as tmp: + runner = CliRunner(env={"BSV_REPO": tmp}) + yield runner + + +######################################################################################## +# mkdir + + +def test_mkdir_fails_with_relative_path(tmp_path: Path, runner: CliRunner): + assert not (tmp_path / "test").exists() + result = runner.invoke(cli.cli, ["mkdir", "test"]) + assert result.exit_code == 2 + assert "test is not an absolute path" in result.stderr + + +def test_mkdir_default(tmp_path: Path, runner: CliRunner): + assert not (tmp_path / "test").exists() + result = runner.invoke(cli.cli, ["mkdir", "/test"]) + assert result.exit_code == 0 + assert result.stderr == "" + assert result.stdout == "" + assert (tmp_path / "test").is_dir() + + +def test_mkdir_multiple_dirs(tmp_path: Path, runner: CliRunner): + assert not (tmp_path / "foo").exists() + assert not (tmp_path / "bar").exists() + result = runner.invoke(cli.cli, ["mkdir", "/foo", "/bar"]) + assert result.exit_code == 0 + assert result.stderr == "" + assert result.stdout == "" + assert (tmp_path / "foo").is_dir() + assert (tmp_path / "bar").is_dir() + + +def test_mkdir_nested_fails_without_parents(tmp_path: Path, runner: CliRunner): + assert not (tmp_path / "foo").exists() + result = runner.invoke(cli.cli, ["mkdir", "/foo/bar"]) + assert result.exit_code == 1 + assert result.stderr == "/foo does not exist\n" + assert result.stdout == "" + + +def test_mkdir_nested(tmp_path: Path, runner: CliRunner): + assert not (tmp_path / "foo/bar").exists() + result = runner.invoke(cli.cli, ["mkdir", "--parents", "/foo/bar"]) + assert result.exit_code == 0 + assert result.stderr == "" + assert result.stdout == "" + assert (tmp_path / "foo/bar").is_dir() + + +def test_mkdir_message_if_exists(tmp_path: Path, runner: CliRunner): + result = runner.invoke(cli.cli, ["mkdir", "/test"]) + assert result.exit_code == 0 + assert result.stderr == "" + assert result.stdout == "" + assert (tmp_path / "test").is_dir() + + result = runner.invoke(cli.cli, ["mkdir", "/test"]) + assert result.exit_code == 0 + assert result.stderr == "/test already exists\n" + assert result.stdout == "" + assert (tmp_path / "test").is_dir() + + +def test_mkdir_mode(tmp_path: Path, runner: CliRunner): + assert not (tmp_path / "test").exists() + result = runner.invoke(cli.cli, ["mkdir", "/test", "--mode=741"]) + assert result.exit_code == 0 + assert result.stderr == "" + assert result.stdout == "" + assert (tmp_path / "test").is_dir() + assert (tmp_path / "test").stat().st_mode & 0o7777 == 0o741 + + +def test_mkdir_verbose(tmp_path: Path, runner: CliRunner): + result = runner.invoke(cli.cli, ["mkdir", "/foo"]) + assert result.exit_code == 0 + assert result.stderr == "" + assert result.stdout == "" + assert (tmp_path / "foo").is_dir() + assert not (tmp_path / "bar").exists() + result = runner.invoke(cli.cli, ["mkdir", "--verbose", "/foo", "/bar"]) + assert result.exit_code == 0 + assert result.stderr == "/foo already exists\n" + assert result.stdout == "Created /bar\n" + assert (tmp_path / "foo").is_dir() + assert (tmp_path / "bar").is_dir() + + +######################################################################################## +# ls + + +def permissions(target: Literal["file", "dir"] = "file"): + return st.builds( + Permissions, + st.sampled_from( + [ + 0o0400, + 0o0440, + 0o0444, + 0o0600, + 0o0640, + 0o0644, + 0o0664, + 0o0750, + 0o0755, + 0o0777, + ] + if target == "file" + else [ + 0o0400, + 0o0600, + 0o0640, + ] + ), + ) + + +class Tree(NamedTuple): + type: Literal["file", "dir"] + name: str + perms: Permissions + time: datetime + content: bytes | list[Tree] + + @property + def type_prefix(self) -> str: + if self.type == "dir": + return "d" + return "-" + + def build(self, parent: Path) -> None: + path = parent / self.name + if isinstance(self.content, list): + path.mkdir(mode=self.perms.unix_perms) + for child in self.content: + child.build(path) + else: + path.write_bytes(self.content) + path.chmod(self.perms.unix_perms) + ts = self.time.timestamp() + os.utime(path, (ts, ts)) + + +def filenames() -> st.SearchStrategy: + return st.text( + st.characters(exclude_categories=["Cc", "Cs"], exclude_characters='<>:"/\\|!*'), + min_size=1, + max_size=255, + ).filter(lambda t: len(t.encode()) < 256 and t not in (".", "..")) + + +@st.composite +def trees(draw: st.DrawFn, max_depth: int = 3) -> Tree: + file_type = draw(st.sampled_from(["file", "dir"])) + content = ( + st.binary() + if file_type == "file" + else st.lists( + trees(max_depth - 1), + unique_by=lambda t: t.name, + max_size=0 if max_depth == 0 else 10, + ) + ) + return Tree( + file_type, # type: ignore + draw(filenames()), + draw(permissions(file_type)), # type: ignore + draw( + st.datetimes( + min_value=datetime(1902, 1, 1), + max_value=datetime(2100, 1, 1), + timezones=st.just(UTC), + ) + ), + draw(content), + ) + + +def trees_lists(max_depth: int = 3) -> st.SearchStrategy: + return st.lists(trees(max_depth=max_depth), unique_by=lambda t: t.name) + + +@given(trees=trees_lists(max_depth=0)) +def test_ls(trees: list[Tree]): + with make_runner() as runner: + path = Path(runner.env["BSV_REPO"] or "") + for tree in trees: + tree.build(path) + + result = runner.invoke(cli.cli, ["ls", "-lA"]) + + assert result.exit_code == 0 + assert result.stderr == "" + + trees.sort(key=lambda t: t.name) + lines = [line for line in result.stdout.splitlines() if line != "\n"] + + for line, tree in zip(lines, trees, strict=True): + match = re.fullmatch( + r""" + ([dl-])([r-][w-][x-][r-][w-][x-][r-][w-][x-]) + \ +(\d+) + \ (\d{4}-\d{2}-\d{2}\ \d{2}:\d{2}:\d{2}) + \ ([^\n]+) + """, + line, + re.VERBOSE, + ) + assert match + assert match[1] == tree.type_prefix + assert match[2] == str(tree.perms) + if tree.type_prefix != "d": + assert match[3] == str(len(tree.content)) + assert match[4] == tree.time.astimezone().replace(tzinfo=None).isoformat( + " ", "seconds" + ) + assert match[5] == tree.name + + pass diff --git a/tests/test_bsv/test_vfs.py b/tests/test_bsv/test_vfs.py new file mode 100644 index 0000000..1eaca6a --- /dev/null +++ b/tests/test_bsv/test_vfs.py @@ -0,0 +1,224 @@ +# pybsv - Backup, Synchronization, Versioning. +# Copyright (C) 2025 Simon Boyé +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +"""Tests for the `VirtualFileSystem` class and related stuff.""" + +from __future__ import annotations + +from datetime import UTC, datetime +from io import BytesIO +from pathlib import Path, PurePosixPath + +import pytest + +from bsv.vfs import FsError, Permissions, VirtualFileSystem + + +@pytest.fixture +def fs(tmp_path: Path) -> VirtualFileSystem: + """Fixture that returns a `VirtualFileSystem`.""" + return VirtualFileSystem(tmp_path) + + +######################################################################################## +# Permissions + + +def test_permissions(): + perm0 = Permissions(0o1234) + assert perm0.unix_perms == 0o1234 + + perm1 = Permissions("752") + assert perm1.unix_perms == 0o752 + + assert perm0 == perm0 + assert perm0 != perm1 + + assert repr(perm0) == "Permissions(0o1234)" + assert repr(perm1) == "Permissions(0o0752)" + + assert str(perm0) == "-w--wxr-T" + assert str(perm1) == "rwxr-x-w-" + + +######################################################################################## +# mkdir + + +def test_mkdir_fails_with_relative_path(fs: VirtualFileSystem): + with pytest.raises(FsError): + fs.mkdir("test") + + +def test_mkdir_default(fs: VirtualFileSystem): + assert not fs.exists("/test") + fs.mkdir("/test") + assert fs.is_dir("/test") + + +def test_mkdir_nested_fails_without_parents(fs: VirtualFileSystem): + assert not fs.exists("/foo") + with pytest.raises(FsError): + fs.mkdir("/foo/bar") + + +def test_mkdir_nested(fs: VirtualFileSystem): + assert not fs.exists("/test") + fs.mkdir("/test/foobar", parents=True) + assert fs.is_dir("/test/foobar") + + +def test_mkdir_fails_if_exists(fs: VirtualFileSystem): + assert not fs.exists("/foo") + fs.mkdir("/foo") + assert fs.is_dir("/foo") + with pytest.raises(FsError): + fs.mkdir("/foo") + + +def test_mkdir_exists_ok(fs: VirtualFileSystem): + assert not fs.exists("/test") + fs.mkdir("/test") + assert fs.is_dir("/test") + fs.mkdir("/test", exist_ok=True) + + +def test_mkdir_exists_ok_fail_if_file(fs: VirtualFileSystem): + fs.write_bytes("/test", b"test") + assert fs.is_file("/test") + with pytest.raises(FsError): + fs.mkdir("/test", exist_ok=True) + + +def test_mkdir_mode(fs: VirtualFileSystem): + assert not fs.exists("/test") + permissions = Permissions(0o741) + fs.mkdir("/test", mode=permissions) + assert fs.is_dir("/test") + assert fs.metadata("/test").permissions == permissions + + +######################################################################################## +# read_bytes / write_bytes + + +def test_read_write_bytes(fs: VirtualFileSystem): + assert not fs.exists("/test") + + fs.write_bytes("/test", b"This is a test.") + assert fs.read_bytes("/test") == b"This is a test." + + stream = BytesIO(b"Another test.") + fs.write_bytes("/test", stream) + assert fs.read_bytes("/test") == b"Another test." + + with pytest.raises(FsError): + fs.read_bytes("/does_not_exist") + + with pytest.raises(FsError): + fs.write_bytes("/does_not_exist/foobar", b"") + + +def test_open_read_write(fs: VirtualFileSystem): + assert not fs.exists("/test") + + with fs.open_write("/test") as stream: + stream.write(b"foo") + stream.write(b"bar") + + assert fs.exists("/test") + with fs.open_read("/test") as stream: + assert stream.read(3) == b"foo" + assert stream.read(3) == b"bar" + assert stream.read() == b"" + + # Test overwrite + with fs.open_write("/test") as stream: + stream.write(b"baz") + + with fs.open_read("/test") as stream: + assert stream.read() == b"baz" + + with pytest.raises(FsError): + fs.open_read("/does_not_exist") + + with pytest.raises(FsError): + fs.open_write("/does_not_exist/foobar") + + +######################################################################################## +# metadata + + +def test_metadata(fs: VirtualFileSystem): + file_permissions = Permissions(0o754) + file_time = datetime(2025, 5, 17, 13, 57, 32, tzinfo=UTC) + file_content = b"This is a test\n" + + fs.write_bytes("/test_file", file_content) + fs.set_permissions("/test_file", file_permissions) + fs.set_modification_time("/test_file", file_time) + + md = fs.metadata("/test_file") + assert md.path == PurePosixPath("/test_file") + assert md.permissions == file_permissions + assert md.type == "file" + assert md.modification_time == file_time + assert md.byte_size == len(file_content) + assert not md.is_hidden_files + assert fs.metadata("/test_file") == md + + fs.set_permissions("/test_file", Permissions(0o644)) + assert fs.metadata("/test_file") != md + + fs.mkdir("/.test_dir") + assert fs.metadata("/.test_dir").type == "dir" + assert fs.metadata("/.test_dir").is_hidden_files + + fs.make_link("/test_link", "/link_target") + assert fs.metadata("/test_link").type == "link" + + +######################################################################################## +# iter_dir + + +def test_iter_dir(fs: VirtualFileSystem): + expected = [ + (PurePosixPath("/dir"), "dir"), + (PurePosixPath("/file"), "file"), + (PurePosixPath("/link"), "link"), + ] + for path, file_type in expected: + if file_type == "dir": + fs.mkdir(path) + elif file_type == "file": + fs.write_bytes(path, b"") + elif file_type == "link": + fs.make_link(path, "/foobar") + + items_metadata = sorted(fs.iter_dir("/")) + for md, [path, file_type] in zip(items_metadata, expected, strict=True): + assert md.path == path + assert md.type == file_type + + +def test_iter_dir_failure(fs: VirtualFileSystem): + with pytest.raises(FsError): + list(fs.iter_dir("/test")) + + fs.write_bytes("/test", b"") + with pytest.raises(FsError): + list(fs.iter_dir("/test"))