Compare commits
5 Commits
| Author | SHA1 | Date |
|---|---|---|
|
|
f95c68ee41 | 5 months ago |
|
|
2b961baa5b | 6 months ago |
|
|
69bb85af01 | 6 months ago |
|
|
a97395370a | 6 months ago |
|
|
b1d2fe7717 | 8 months ago |
31 changed files with 1713 additions and 103 deletions
@ -0,0 +1,18 @@ |
|||||
|
# bsv - Backup, Synchronization, Versioning |
||||
|
# Copyright (C) 2023 Simon Boyé |
||||
|
# |
||||
|
# This program is free software: you can redistribute it and/or modify |
||||
|
# it under the terms of the GNU Affero General Public License as published by |
||||
|
# the Free Software Foundation, either version 3 of the License, or |
||||
|
# (at your option) any later version. |
||||
|
# |
||||
|
# This program is distributed in the hope that it will be useful, |
||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||
|
# GNU Affero General Public License for more details. |
||||
|
# |
||||
|
# You should have received a copy of the GNU Affero General Public License |
||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>. |
||||
|
from __future__ import annotations |
||||
|
|
||||
|
from bsv._version import __version__, __version_tuple__ |
||||
@ -0,0 +1,21 @@ |
|||||
|
# bsv - Backup, Synchronization, Versioning |
||||
|
# Copyright (C) 2023 Simon Boyé |
||||
|
# |
||||
|
# This program is free software: you can redistribute it and/or modify |
||||
|
# it under the terms of the GNU Affero General Public License as published by |
||||
|
# the Free Software Foundation, either version 3 of the License, or |
||||
|
# (at your option) any later version. |
||||
|
# |
||||
|
# This program is distributed in the hope that it will be useful, |
||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||
|
# GNU Affero General Public License for more details. |
||||
|
# |
||||
|
# You should have received a copy of the GNU Affero General Public License |
||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>. |
||||
|
from __future__ import annotations |
||||
|
|
||||
|
from bsv.main import main |
||||
|
|
||||
|
|
||||
|
exit(main()) |
||||
@ -0,0 +1,16 @@ |
|||||
|
# file generated by setuptools_scm |
||||
|
# don't change, don't track in version control |
||||
|
TYPE_CHECKING = False |
||||
|
if TYPE_CHECKING: |
||||
|
from typing import Tuple, Union |
||||
|
VERSION_TUPLE = Tuple[Union[int, str], ...] |
||||
|
else: |
||||
|
VERSION_TUPLE = object |
||||
|
|
||||
|
version: str |
||||
|
__version__: str |
||||
|
__version_tuple__: VERSION_TUPLE |
||||
|
version_tuple: VERSION_TUPLE |
||||
|
|
||||
|
__version__ = version = '0.0.1.dev8+g52a553d.d20231127' |
||||
|
__version_tuple__ = version_tuple = (0, 0, 1, 'dev8', 'g52a553d.d20231127') |
||||
@ -0,0 +1,126 @@ |
|||||
|
# bsv - Backup, Synchronization, Versioning |
||||
|
# Copyright (C) 2023 Simon Boyé |
||||
|
# |
||||
|
# This program is free software: you can redistribute it and/or modify |
||||
|
# it under the terms of the GNU Affero General Public License as published by |
||||
|
# the Free Software Foundation, either version 3 of the License, or |
||||
|
# (at your option) any later version. |
||||
|
# |
||||
|
# This program is distributed in the hope that it will be useful, |
||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||
|
# GNU Affero General Public License for more details. |
||||
|
# |
||||
|
# You should have received a copy of the GNU Affero General Public License |
||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>. |
||||
|
from __future__ import annotations |
||||
|
|
||||
|
from typing import Any, Callable, TypeVar |
||||
|
|
||||
|
from rich.console import Console |
||||
|
from rich.text import Text |
||||
|
|
||||
|
|
||||
|
_console: Console | None = None |
||||
|
def get_console() -> Console: |
||||
|
assert _console is not None |
||||
|
return _console |
||||
|
|
||||
|
_error_console: Console | None = None |
||||
|
def get_error_console() -> Console: |
||||
|
assert _error_console is not None |
||||
|
return _error_console |
||||
|
|
||||
|
|
||||
|
def init_consoles(color: str="auto"): |
||||
|
global _console |
||||
|
global _error_console |
||||
|
|
||||
|
assert _console is None |
||||
|
assert _error_console is None |
||||
|
|
||||
|
kwargs: dict[str, Any] = { |
||||
|
"tab_size": 4, |
||||
|
} |
||||
|
match color: |
||||
|
case "always": |
||||
|
kwargs["force_terminal"] = True |
||||
|
case "auto": |
||||
|
pass |
||||
|
case "never": |
||||
|
kwargs["no_color"] = True |
||||
|
|
||||
|
_console = Console( |
||||
|
**kwargs, |
||||
|
) |
||||
|
_error_console = Console( |
||||
|
stderr = True, |
||||
|
**kwargs, |
||||
|
) |
||||
|
|
||||
|
|
||||
|
PromptType = TypeVar("PromptType") |
||||
|
|
||||
|
class NoDefaultType: |
||||
|
def __repr__(self): |
||||
|
return "NoDefault" |
||||
|
NoDefault = NoDefaultType() |
||||
|
|
||||
|
def prompt( |
||||
|
prompt: str, |
||||
|
factory: Callable[[str], PromptType], |
||||
|
*, |
||||
|
console: Console | None = None, |
||||
|
default: PromptType | NoDefaultType = NoDefault, |
||||
|
show_default: bool = True, |
||||
|
) -> PromptType: |
||||
|
if console is None: |
||||
|
console = get_console() |
||||
|
|
||||
|
prompt_text = Text(prompt, style="prompt") |
||||
|
prompt_text.end = "" |
||||
|
if show_default and default is not NoDefault: |
||||
|
prompt_text.append(" ") |
||||
|
prompt_text.append(f"({default})", style="prompt.default") |
||||
|
prompt_text.append(": ") |
||||
|
|
||||
|
while True: |
||||
|
try: |
||||
|
value = console.input(prompt_text) |
||||
|
except KeyboardInterrupt: |
||||
|
console.print("") |
||||
|
raise |
||||
|
|
||||
|
if not value and not isinstance(default, NoDefaultType): |
||||
|
return default |
||||
|
try: |
||||
|
return factory(value) |
||||
|
except ValueError as err: |
||||
|
console.print(err) |
||||
|
|
||||
|
def prompt_confirmation(prompt: str, *, console: Console | None=None, default: bool=True) -> bool: |
||||
|
if console is None: |
||||
|
console = get_console() |
||||
|
|
||||
|
prompt_text = Text(prompt, style="prompt") |
||||
|
prompt_text.end = "" |
||||
|
prompt_text.append(" ") |
||||
|
if default: |
||||
|
prompt_text.append("(Y/n)", style="prompt.default") |
||||
|
else: |
||||
|
prompt_text.append("(y/N)", style="prompt.default") |
||||
|
prompt_text.append(": ") |
||||
|
|
||||
|
while True: |
||||
|
try: |
||||
|
value = console.input(prompt_text).strip().lower() |
||||
|
except KeyboardInterrupt: |
||||
|
console.print("") |
||||
|
raise |
||||
|
|
||||
|
if not value and not isinstance(default, NoDefaultType): |
||||
|
return default |
||||
|
if value not in "yn": |
||||
|
console.print("Please answer 'y' or 'n'.") |
||||
|
else: |
||||
|
return value == "y" |
||||
@ -0,0 +1,48 @@ |
|||||
|
# pybsv - Backup, Synchronization, Versioning. |
||||
|
# Copyright (C) 2025 Simon Boyé |
||||
|
# |
||||
|
# This program is free software: you can redistribute it and/or modify |
||||
|
# it under the terms of the GNU Affero General Public License as published by |
||||
|
# the Free Software Foundation, either version 3 of the License, or |
||||
|
# (at your option) any later version. |
||||
|
# |
||||
|
# This program is distributed in the hope that it will be useful, |
||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||
|
# GNU Affero General Public License for more details. |
||||
|
# |
||||
|
# You should have received a copy of the GNU Affero General Public License |
||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>. |
||||
|
"""Tools and utilities to build the command-line interface.""" |
||||
|
|
||||
|
from __future__ import annotations |
||||
|
|
||||
|
from typing import Final |
||||
|
|
||||
|
|
||||
|
BINARY_PREFIXES: Final[list[str]] = [ |
||||
|
"", |
||||
|
"Ki", |
||||
|
"Mi", |
||||
|
"Gi", |
||||
|
"Ti", |
||||
|
"Pi", |
||||
|
"Ei", |
||||
|
"Zi", |
||||
|
"Yi", |
||||
|
"Ri", |
||||
|
"Qi", |
||||
|
] |
||||
|
|
||||
|
|
||||
|
def format_human_byte_size(byte_size: int) -> str: |
||||
|
"""Format the given `byte_size` as a human-readable string.""" |
||||
|
index = min(max((byte_size.bit_length() - 1) // 10, 0), len(BINARY_PREFIXES) - 1) |
||||
|
size = byte_size / 1024**index |
||||
|
num_digits = len(str(int(size))) |
||||
|
decimals = max(0, 3 - num_digits) |
||||
|
rounded = round(size, decimals) |
||||
|
if rounded == 1024 and index + 1 < len(BINARY_PREFIXES): |
||||
|
rounded = 1 |
||||
|
index += 1 |
||||
|
return f"{rounded:.16g}{BINARY_PREFIXES[index]}B" |
||||
@ -0,0 +1,41 @@ |
|||||
|
# pybsv - Backup, Synchronization, Versioning. |
||||
|
# Copyright (C) 2025 Simon Boyé |
||||
|
# |
||||
|
# This program is free software: you can redistribute it and/or modify |
||||
|
# it under the terms of the GNU Affero General Public License as published by |
||||
|
# the Free Software Foundation, either version 3 of the License, or |
||||
|
# (at your option) any later version. |
||||
|
# |
||||
|
# This program is distributed in the hope that it will be useful, |
||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||
|
# GNU Affero General Public License for more details. |
||||
|
# |
||||
|
# You should have received a copy of the GNU Affero General Public License |
||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>. |
||||
|
from __future__ import annotations |
||||
|
|
||||
|
import os |
||||
|
from pathlib import Path |
||||
|
import platform |
||||
|
|
||||
|
|
||||
|
def default_repository_path() -> Path: |
||||
|
"""Return the system-dependent default repository path.""" |
||||
|
if platform.system() in ("Windows", "Darwin", "Java"): |
||||
|
msg = f"default_repository_path does not support {platform.system()} system" |
||||
|
raise NotImplementedError(msg) |
||||
|
else: # Assume Unix |
||||
|
# See https://specifications.freedesktop.org/basedir-spec/latest/ |
||||
|
data_home = os.environ.get("XDG_DATA_HOME", "") |
||||
|
if data_home: |
||||
|
path = Path(data_home) |
||||
|
if not path.is_absolute() or not path.exists(): |
||||
|
msg = ( |
||||
|
f"invalid XDG_DATA_HOME ({path}): path is relative or does not " |
||||
|
"exists" |
||||
|
) |
||||
|
raise RuntimeError(msg) |
||||
|
else: |
||||
|
path = Path.home() / ".local/share" |
||||
|
return path / "bsv/repo" |
||||
@ -0,0 +1,348 @@ |
|||||
|
# pybsv - Backup, Synchronization, Versioning. |
||||
|
# Copyright (C) 2025 Simon Boyé |
||||
|
# |
||||
|
# This program is free software: you can redistribute it and/or modify |
||||
|
# it under the terms of the GNU Affero General Public License as published by |
||||
|
# the Free Software Foundation, either version 3 of the License, or |
||||
|
# (at your option) any later version. |
||||
|
# |
||||
|
# This program is distributed in the hope that it will be useful, |
||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||
|
# GNU Affero General Public License for more details. |
||||
|
# |
||||
|
# You should have received a copy of the GNU Affero General Public License |
||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>. |
||||
|
"""Provide a virtual file system interface alongside associated tools.""" |
||||
|
|
||||
|
from __future__ import annotations |
||||
|
|
||||
|
from datetime import UTC, datetime |
||||
|
from functools import total_ordering |
||||
|
import os |
||||
|
from pathlib import Path, PurePosixPath |
||||
|
from stat import S_IFDIR, S_IFLNK, S_IFMT, S_IFREG, S_IMODE, filemode |
||||
|
from typing import TYPE_CHECKING, Any, BinaryIO, Literal, Self |
||||
|
|
||||
|
from typing_extensions import Buffer |
||||
|
|
||||
|
|
||||
|
if TYPE_CHECKING: |
||||
|
from collections.abc import Iterator |
||||
|
from os import stat_result |
||||
|
|
||||
|
|
||||
|
AnyBsvPath = PurePosixPath | str |
||||
|
|
||||
|
|
||||
|
class FsError(RuntimeError): |
||||
|
"""Error type raised by `FileSystem` objects.""" |
||||
|
|
||||
|
|
||||
|
class AlreadyExistError(FsError): |
||||
|
"""Raise when trying to create an item that already exists.""" |
||||
|
|
||||
|
|
||||
|
class NotFoundError(FsError): |
||||
|
"""Raise when trying to access an item that do not exist.""" |
||||
|
|
||||
|
|
||||
|
class Permissions: |
||||
|
"""Represent the permissions of an object in a filesystem.""" |
||||
|
|
||||
|
unix_perms: int |
||||
|
|
||||
|
def __init__(self, unix_perms: int | str = 0o640): |
||||
|
"""Create a `Permissions` object from `unix_perms`.""" |
||||
|
if isinstance(unix_perms, str): |
||||
|
unix_perms = int(unix_perms, 8) |
||||
|
self.unix_perms = unix_perms |
||||
|
|
||||
|
def __eq__(self, rhs: Any) -> bool: |
||||
|
"""Test if two `Permission` are the same.""" |
||||
|
return ( |
||||
|
rhs.unix_perms == self.unix_perms |
||||
|
if isinstance(rhs, Permissions) |
||||
|
else NotImplemented |
||||
|
) |
||||
|
|
||||
|
def __repr__(self) -> str: |
||||
|
"""Return a representation of the permissions as valid python code.""" |
||||
|
return f"Permissions(0o{oct(self.unix_perms)[2:].rjust(4, '0')})" |
||||
|
|
||||
|
def __str__(self) -> str: |
||||
|
"""Return a representation of the permissions of the form 'rwxrwxrwx'.""" |
||||
|
return filemode(self.unix_perms)[1:] |
||||
|
|
||||
|
|
||||
|
DEFAULT_DIR_PERMS = Permissions(0o770) |
||||
|
DEFAULT_FILE_PERMS = Permissions(0o640) |
||||
|
|
||||
|
|
||||
|
FileType = Literal["dir", "file", "symlink", "other"] |
||||
|
|
||||
|
_IFMT_MAP: dict[int, FileType] = { |
||||
|
S_IFDIR: "dir", |
||||
|
S_IFREG: "file", |
||||
|
S_IFLNK: "symlink", |
||||
|
} |
||||
|
|
||||
|
|
||||
|
@total_ordering |
||||
|
class FileMetadata: |
||||
|
"""Metadata associated with vfs files: file type, permissions, etc.""" |
||||
|
|
||||
|
path: PurePosixPath |
||||
|
type: FileType |
||||
|
permissions: Permissions |
||||
|
modification_time: datetime |
||||
|
byte_size: int |
||||
|
_stat: stat_result |
||||
|
|
||||
|
def __init__( |
||||
|
self, |
||||
|
path: PurePosixPath, |
||||
|
*, |
||||
|
type: FileType, |
||||
|
permissions: Permissions, |
||||
|
modification_time: datetime, |
||||
|
byte_size: int, |
||||
|
): |
||||
|
"""Create a `FileMetadata`.""" |
||||
|
self.path = path |
||||
|
self.type = type |
||||
|
self.permissions = permissions |
||||
|
self.modification_time = modification_time |
||||
|
self.byte_size = byte_size |
||||
|
|
||||
|
@classmethod |
||||
|
def from_stat( |
||||
|
cls, |
||||
|
path: PurePosixPath, |
||||
|
stat: stat_result, |
||||
|
) -> Self: |
||||
|
"""Create a `FileMetadata` from a `stat_result`.""" |
||||
|
return cls( |
||||
|
path, |
||||
|
type=_IFMT_MAP.get(S_IFMT(stat.st_mode), "other"), |
||||
|
permissions=Permissions(S_IMODE(stat.st_mode)), |
||||
|
modification_time=datetime.fromtimestamp(stat.st_mtime, UTC), |
||||
|
byte_size=stat.st_size, |
||||
|
) |
||||
|
|
||||
|
@property |
||||
|
def unix_mode(self) -> str: |
||||
|
"""Return unix-like mode in the form '-rwxrwxrwx'.""" |
||||
|
return UNIX_MODE_FILE_TYPE[self.type] + str(self.permissions) |
||||
|
|
||||
|
@property |
||||
|
def is_hidden(self) -> bool: |
||||
|
"""Return true if the file starts with a '.'.""" |
||||
|
return self.path.name.startswith(".") |
||||
|
|
||||
|
@property |
||||
|
def is_file(self) -> bool: |
||||
|
"""Test if this is a file.""" |
||||
|
return self.type == "file" |
||||
|
|
||||
|
@property |
||||
|
def is_dir(self) -> bool: |
||||
|
"""Test if this is a directory.""" |
||||
|
return self.type == "dir" |
||||
|
|
||||
|
@property |
||||
|
def is_symlink(self) -> bool: |
||||
|
"""Test if this is a symbolic link.""" |
||||
|
return self.type == "symlink" |
||||
|
|
||||
|
@property |
||||
|
def is_other(self) -> bool: |
||||
|
"""Test if this is a symbolic link.""" |
||||
|
return self.type == "other" |
||||
|
|
||||
|
def _as_tuple( |
||||
|
self, |
||||
|
) -> tuple[PurePosixPath, FileType, Permissions, datetime, int]: |
||||
|
return ( |
||||
|
self.path, |
||||
|
self.type, |
||||
|
self.permissions, |
||||
|
self.modification_time, |
||||
|
self.byte_size, |
||||
|
) |
||||
|
|
||||
|
def __eq__(self, rhs: Any) -> bool: |
||||
|
"""Test if two `Metadata` are the same.""" |
||||
|
return ( |
||||
|
self._as_tuple() == rhs._as_tuple() |
||||
|
if isinstance(rhs, FileMetadata) |
||||
|
else NotImplemented |
||||
|
) |
||||
|
|
||||
|
def __lt__(self, rhs: Any) -> bool: |
||||
|
"""Compare `rhs.path` with `self.path`.""" |
||||
|
return self.path < rhs.path if isinstance(rhs, FileMetadata) else NotImplemented |
||||
|
|
||||
|
|
||||
|
UNIX_MODE_FILE_TYPE = { |
||||
|
"dir": "d", |
||||
|
"file": "-", |
||||
|
"other": "o", |
||||
|
"link": "l", |
||||
|
} |
||||
|
|
||||
|
|
||||
|
class VirtualFileSystem: |
||||
|
"""Represent a file system, with common file system operations.""" |
||||
|
|
||||
|
path: Path |
||||
|
|
||||
|
def __init__(self, path: Path): |
||||
|
"""Initialize the file system to point to `path`.""" |
||||
|
self.path = path |
||||
|
|
||||
|
def exists(self, path: AnyBsvPath) -> bool: |
||||
|
"""Test if the `path` point to an existing item.""" |
||||
|
path = self._make_path(path) |
||||
|
return self._real_path(path).exists() |
||||
|
|
||||
|
def is_file(self, path: AnyBsvPath) -> bool: |
||||
|
"""Test if `path` is a file.""" |
||||
|
return self.metadata(path).is_file |
||||
|
|
||||
|
def is_dir(self, path: AnyBsvPath) -> bool: |
||||
|
"""Test if `path` is a directory.""" |
||||
|
return self.metadata(path).is_dir |
||||
|
|
||||
|
def is_symlink(self, path: AnyBsvPath) -> bool: |
||||
|
"""Test if `path` is a symbolic link.""" |
||||
|
return self.metadata(path).is_symlink |
||||
|
|
||||
|
def is_other(self, path: AnyBsvPath) -> bool: |
||||
|
"""Test if `path` is not a file, directory or symbolic link.""" |
||||
|
return self.metadata(path).is_other |
||||
|
|
||||
|
def metadata(self, path: AnyBsvPath) -> FileMetadata: |
||||
|
"""Return the metadata of a given object.""" |
||||
|
metadata = self.metadata_or_none(path) |
||||
|
if metadata is None: |
||||
|
msg = f"file '{path}' not found" |
||||
|
raise NotFoundError(msg) |
||||
|
return metadata |
||||
|
|
||||
|
def metadata_or_none(self, path: AnyBsvPath) -> FileMetadata | None: |
||||
|
"""Return the metadata of a given object or `None` if it does not exists.""" |
||||
|
path = self._make_path(path) |
||||
|
try: |
||||
|
stat = self._real_path(path).stat(follow_symlinks=False) |
||||
|
except FileNotFoundError: |
||||
|
return None |
||||
|
except OSError as err: |
||||
|
msg = f"failed to read '{path}' metadata" |
||||
|
raise FsError(msg) from err |
||||
|
return FileMetadata.from_stat(path, stat) |
||||
|
|
||||
|
def iter_dir(self, path: AnyBsvPath) -> Iterator[FileMetadata]: |
||||
|
"""Return the metadata of all items in the directory `path`.""" |
||||
|
path = self._make_path(path) |
||||
|
real_path = self._real_path(path) |
||||
|
try: |
||||
|
for entry in os.scandir(real_path): |
||||
|
yield FileMetadata.from_stat( |
||||
|
path / entry.name, entry.stat(follow_symlinks=False) |
||||
|
) |
||||
|
except OSError as err: |
||||
|
msg = f"failed to read directory {path}" |
||||
|
raise FsError(msg) from err |
||||
|
|
||||
|
def read_bytes(self, path: AnyBsvPath) -> bytes: |
||||
|
"""Return the content of `path` as `bytes`.""" |
||||
|
with self.open_read(path) as stream: |
||||
|
return stream.read() |
||||
|
|
||||
|
def write_bytes(self, path: AnyBsvPath, data: Buffer | BinaryIO) -> int: |
||||
|
"""Create or replace a file at `path`, setting its content to `data`.""" |
||||
|
written = 0 |
||||
|
with self.open_write(path) as sout: |
||||
|
if isinstance(data, Buffer): |
||||
|
written += sout.write(data) |
||||
|
else: |
||||
|
while chunk := data.read(65536): |
||||
|
written += sout.write(chunk) |
||||
|
return written |
||||
|
|
||||
|
def open_read(self, path: AnyBsvPath) -> BinaryIO: |
||||
|
"""Return a read-only binary stream that read the content of `path`.""" |
||||
|
path = self._make_path(path) |
||||
|
try: |
||||
|
return self._real_path(path).open("rb") |
||||
|
except OSError as err: |
||||
|
msg = f"failed to read {path}" |
||||
|
raise FsError(msg) from err |
||||
|
|
||||
|
def open_write(self, path: AnyBsvPath) -> BinaryIO: |
||||
|
"""Return a write-only binary stream write to `path`.""" |
||||
|
path = self._make_path(path) |
||||
|
try: |
||||
|
return self._real_path(path).open("wb") |
||||
|
except OSError as err: |
||||
|
msg = f"failed to read {path}" |
||||
|
raise FsError(msg) from err |
||||
|
|
||||
|
def mkdir( |
||||
|
self, |
||||
|
path: AnyBsvPath, |
||||
|
mode: Permissions = DEFAULT_DIR_PERMS, |
||||
|
parents: bool = False, |
||||
|
exist_ok: bool = False, |
||||
|
): |
||||
|
"""Create a directory at `path`. |
||||
|
|
||||
|
Args: |
||||
|
path: The directory to create. |
||||
|
mode: The permissions of the new directory. |
||||
|
parents: If `True`, create parent directories if they don't exists. |
||||
|
exist_ok: If `False` and `path` already exist, raise an error. |
||||
|
|
||||
|
Raises: |
||||
|
FsError: If something goes wrong. |
||||
|
""" |
||||
|
path = self._make_path(path) |
||||
|
try: |
||||
|
self._real_path(path).mkdir( |
||||
|
mode=mode.unix_perms, parents=parents, exist_ok=exist_ok |
||||
|
) |
||||
|
except FileExistsError as err: |
||||
|
msg = f"{path} already exists" |
||||
|
raise AlreadyExistError(msg) from err |
||||
|
except FileNotFoundError as err: |
||||
|
msg = f"{path.parent} does not exist" |
||||
|
raise NotFoundError(msg) from err |
||||
|
|
||||
|
def make_link(self, path: AnyBsvPath, target: AnyBsvPath) -> None: |
||||
|
"""Creates a symbolic link from `path` to `target`.""" |
||||
|
path = self._make_path(path) |
||||
|
target = self._make_path(path) |
||||
|
self._real_path(path).symlink_to(self._real_path(target)) |
||||
|
|
||||
|
def set_permissions(self, path: AnyBsvPath, permissions: Permissions) -> None: |
||||
|
"""Set the permissions of `path` to `permissions`.""" |
||||
|
path = self._make_path(path) |
||||
|
self._real_path(path).chmod(permissions.unix_perms) |
||||
|
|
||||
|
def set_modification_time(self, path: AnyBsvPath, mod_time: datetime) -> None: |
||||
|
"""Set the modification time of `path` to `mod_time`.""" |
||||
|
path = self._make_path(path) |
||||
|
ts = mod_time.timestamp() |
||||
|
os.utime(self._real_path(path), (ts, ts)) |
||||
|
|
||||
|
def _make_path(self, path: AnyBsvPath) -> PurePosixPath: |
||||
|
if not isinstance(path, PurePosixPath): |
||||
|
path = PurePosixPath(path) |
||||
|
if not path.is_absolute(): |
||||
|
msg = f"{path} is not absolute" |
||||
|
raise FsError(msg) |
||||
|
return path |
||||
|
|
||||
|
def _real_path(self, path: PurePosixPath) -> Path: |
||||
|
return self.path / path.relative_to("/") |
||||
@ -0,0 +1,18 @@ |
|||||
|
# pybsv - Backup, Synchronization, Versioning. |
||||
|
# Copyright (C) 2025 Simon Boyé |
||||
|
# |
||||
|
# This program is free software: you can redistribute it and/or modify |
||||
|
# it under the terms of the GNU Affero General Public License as published by |
||||
|
# the Free Software Foundation, either version 3 of the License, or |
||||
|
# (at your option) any later version. |
||||
|
# |
||||
|
# This program is distributed in the hope that it will be useful, |
||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||
|
# GNU Affero General Public License for more details. |
||||
|
# |
||||
|
# You should have received a copy of the GNU Affero General Public License |
||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>. |
||||
|
"""pybsv test module.""" |
||||
|
|
||||
|
from __future__ import annotations |
||||
@ -0,0 +1,270 @@ |
|||||
|
# pybsv - Backup, Synchronization, Versioning. |
||||
|
# Copyright (C) 2025 Simon Boyé |
||||
|
# |
||||
|
# This program is free software: you can redistribute it and/or modify |
||||
|
# it under the terms of the GNU Affero General Public License as published by |
||||
|
# the Free Software Foundation, either version 3 of the License, or |
||||
|
# (at your option) any later version. |
||||
|
# |
||||
|
# This program is distributed in the hope that it will be useful, |
||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||
|
# GNU Affero General Public License for more details. |
||||
|
# |
||||
|
# You should have received a copy of the GNU Affero General Public License |
||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>. |
||||
|
from __future__ import annotations |
||||
|
|
||||
|
from contextlib import contextmanager |
||||
|
from datetime import UTC, datetime |
||||
|
import os |
||||
|
from pathlib import Path |
||||
|
import re |
||||
|
from tempfile import TemporaryDirectory |
||||
|
from typing import TYPE_CHECKING, Literal, NamedTuple |
||||
|
|
||||
|
from click.testing import CliRunner |
||||
|
from hypothesis import given |
||||
|
import hypothesis.strategies as st |
||||
|
import pytest |
||||
|
|
||||
|
from bsv import cli |
||||
|
from bsv.vfs import Permissions |
||||
|
|
||||
|
|
||||
|
if TYPE_CHECKING: |
||||
|
from collections.abc import Generator |
||||
|
|
||||
|
|
||||
|
@pytest.fixture |
||||
|
def runner(tmp_path: Path) -> CliRunner: |
||||
|
runner = CliRunner(env={"BSV_REPO": str(tmp_path)}) |
||||
|
return runner |
||||
|
|
||||
|
|
||||
|
@contextmanager |
||||
|
def make_runner() -> Generator[CliRunner, None, None]: |
||||
|
with TemporaryDirectory(prefix="test_vfs_") as tmp: |
||||
|
runner = CliRunner(env={"BSV_REPO": tmp}) |
||||
|
yield runner |
||||
|
|
||||
|
|
||||
|
######################################################################################## |
||||
|
# mkdir |
||||
|
|
||||
|
|
||||
|
def test_mkdir_fails_with_relative_path(tmp_path: Path, runner: CliRunner): |
||||
|
assert not (tmp_path / "test").exists() |
||||
|
result = runner.invoke(cli.cli, ["mkdir", "test"]) |
||||
|
assert result.exit_code == 2 |
||||
|
assert "test is not an absolute path" in result.stderr |
||||
|
|
||||
|
|
||||
|
def test_mkdir_default(tmp_path: Path, runner: CliRunner): |
||||
|
assert not (tmp_path / "test").exists() |
||||
|
result = runner.invoke(cli.cli, ["mkdir", "/test"]) |
||||
|
assert result.exit_code == 0 |
||||
|
assert result.stderr == "" |
||||
|
assert result.stdout == "" |
||||
|
assert (tmp_path / "test").is_dir() |
||||
|
|
||||
|
|
||||
|
def test_mkdir_multiple_dirs(tmp_path: Path, runner: CliRunner): |
||||
|
assert not (tmp_path / "foo").exists() |
||||
|
assert not (tmp_path / "bar").exists() |
||||
|
result = runner.invoke(cli.cli, ["mkdir", "/foo", "/bar"]) |
||||
|
assert result.exit_code == 0 |
||||
|
assert result.stderr == "" |
||||
|
assert result.stdout == "" |
||||
|
assert (tmp_path / "foo").is_dir() |
||||
|
assert (tmp_path / "bar").is_dir() |
||||
|
|
||||
|
|
||||
|
def test_mkdir_nested_fails_without_parents(tmp_path: Path, runner: CliRunner): |
||||
|
assert not (tmp_path / "foo").exists() |
||||
|
result = runner.invoke(cli.cli, ["mkdir", "/foo/bar"]) |
||||
|
assert result.exit_code == 1 |
||||
|
assert result.stderr == "/foo does not exist\n" |
||||
|
assert result.stdout == "" |
||||
|
|
||||
|
|
||||
|
def test_mkdir_nested(tmp_path: Path, runner: CliRunner): |
||||
|
assert not (tmp_path / "foo/bar").exists() |
||||
|
result = runner.invoke(cli.cli, ["mkdir", "--parents", "/foo/bar"]) |
||||
|
assert result.exit_code == 0 |
||||
|
assert result.stderr == "" |
||||
|
assert result.stdout == "" |
||||
|
assert (tmp_path / "foo/bar").is_dir() |
||||
|
|
||||
|
|
||||
|
def test_mkdir_message_if_exists(tmp_path: Path, runner: CliRunner): |
||||
|
result = runner.invoke(cli.cli, ["mkdir", "/test"]) |
||||
|
assert result.exit_code == 0 |
||||
|
assert result.stderr == "" |
||||
|
assert result.stdout == "" |
||||
|
assert (tmp_path / "test").is_dir() |
||||
|
|
||||
|
result = runner.invoke(cli.cli, ["mkdir", "/test"]) |
||||
|
assert result.exit_code == 0 |
||||
|
assert result.stderr == "/test already exists\n" |
||||
|
assert result.stdout == "" |
||||
|
assert (tmp_path / "test").is_dir() |
||||
|
|
||||
|
|
||||
|
def test_mkdir_mode(tmp_path: Path, runner: CliRunner): |
||||
|
assert not (tmp_path / "test").exists() |
||||
|
result = runner.invoke(cli.cli, ["mkdir", "/test", "--mode=741"]) |
||||
|
assert result.exit_code == 0 |
||||
|
assert result.stderr == "" |
||||
|
assert result.stdout == "" |
||||
|
assert (tmp_path / "test").is_dir() |
||||
|
assert (tmp_path / "test").stat().st_mode & 0o7777 == 0o741 |
||||
|
|
||||
|
|
||||
|
def test_mkdir_verbose(tmp_path: Path, runner: CliRunner): |
||||
|
result = runner.invoke(cli.cli, ["mkdir", "/foo"]) |
||||
|
assert result.exit_code == 0 |
||||
|
assert result.stderr == "" |
||||
|
assert result.stdout == "" |
||||
|
assert (tmp_path / "foo").is_dir() |
||||
|
assert not (tmp_path / "bar").exists() |
||||
|
result = runner.invoke(cli.cli, ["mkdir", "--verbose", "/foo", "/bar"]) |
||||
|
assert result.exit_code == 0 |
||||
|
assert result.stderr == "/foo already exists\n" |
||||
|
assert result.stdout == "Created /bar\n" |
||||
|
assert (tmp_path / "foo").is_dir() |
||||
|
assert (tmp_path / "bar").is_dir() |
||||
|
|
||||
|
|
||||
|
######################################################################################## |
||||
|
# ls |
||||
|
|
||||
|
|
||||
|
def permissions(target: Literal["file", "dir"] = "file"): |
||||
|
return st.builds( |
||||
|
Permissions, |
||||
|
st.sampled_from( |
||||
|
[ |
||||
|
0o0400, |
||||
|
0o0440, |
||||
|
0o0444, |
||||
|
0o0600, |
||||
|
0o0640, |
||||
|
0o0644, |
||||
|
0o0664, |
||||
|
0o0750, |
||||
|
0o0755, |
||||
|
0o0777, |
||||
|
] |
||||
|
if target == "file" |
||||
|
else [ |
||||
|
0o0400, |
||||
|
0o0600, |
||||
|
0o0640, |
||||
|
] |
||||
|
), |
||||
|
) |
||||
|
|
||||
|
|
||||
|
class Tree(NamedTuple): |
||||
|
type: Literal["file", "dir"] |
||||
|
name: str |
||||
|
perms: Permissions |
||||
|
time: datetime |
||||
|
content: bytes | list[Tree] |
||||
|
|
||||
|
@property |
||||
|
def type_prefix(self) -> str: |
||||
|
if self.type == "dir": |
||||
|
return "d" |
||||
|
return "-" |
||||
|
|
||||
|
def build(self, parent: Path) -> None: |
||||
|
path = parent / self.name |
||||
|
if isinstance(self.content, list): |
||||
|
path.mkdir(mode=self.perms.unix_perms) |
||||
|
for child in self.content: |
||||
|
child.build(path) |
||||
|
else: |
||||
|
path.write_bytes(self.content) |
||||
|
path.chmod(self.perms.unix_perms) |
||||
|
ts = self.time.timestamp() |
||||
|
os.utime(path, (ts, ts)) |
||||
|
|
||||
|
|
||||
|
def filenames() -> st.SearchStrategy: |
||||
|
return st.text( |
||||
|
st.characters(exclude_categories=["Cc", "Cs"], exclude_characters='<>:"/\\|!*'), |
||||
|
min_size=1, |
||||
|
max_size=255, |
||||
|
).filter(lambda t: len(t.encode()) < 256 and t not in (".", "..")) |
||||
|
|
||||
|
|
||||
|
@st.composite |
||||
|
def trees(draw: st.DrawFn, max_depth: int = 3) -> Tree: |
||||
|
file_type = draw(st.sampled_from(["file", "dir"])) |
||||
|
content = ( |
||||
|
st.binary() |
||||
|
if file_type == "file" |
||||
|
else st.lists( |
||||
|
trees(max_depth - 1), |
||||
|
unique_by=lambda t: t.name, |
||||
|
max_size=0 if max_depth == 0 else 10, |
||||
|
) |
||||
|
) |
||||
|
return Tree( |
||||
|
file_type, # type: ignore |
||||
|
draw(filenames()), |
||||
|
draw(permissions(file_type)), # type: ignore |
||||
|
draw( |
||||
|
st.datetimes( |
||||
|
min_value=datetime(1902, 1, 1), |
||||
|
max_value=datetime(2100, 1, 1), |
||||
|
timezones=st.just(UTC), |
||||
|
) |
||||
|
), |
||||
|
draw(content), |
||||
|
) |
||||
|
|
||||
|
|
||||
|
def trees_lists(max_depth: int = 3) -> st.SearchStrategy: |
||||
|
return st.lists(trees(max_depth=max_depth), unique_by=lambda t: t.name) |
||||
|
|
||||
|
|
||||
|
@given(trees=trees_lists(max_depth=0)) |
||||
|
def test_ls(trees: list[Tree]): |
||||
|
with make_runner() as runner: |
||||
|
path = Path(runner.env["BSV_REPO"] or "") |
||||
|
for tree in trees: |
||||
|
tree.build(path) |
||||
|
|
||||
|
result = runner.invoke(cli.cli, ["ls", "-lA"]) |
||||
|
|
||||
|
assert result.exit_code == 0 |
||||
|
assert result.stderr == "" |
||||
|
|
||||
|
trees.sort(key=lambda t: t.name) |
||||
|
lines = [line for line in result.stdout.splitlines() if line != "\n"] |
||||
|
|
||||
|
for line, tree in zip(lines, trees, strict=True): |
||||
|
match = re.fullmatch( |
||||
|
r""" |
||||
|
([dl-])([r-][w-][x-][r-][w-][x-][r-][w-][x-]) |
||||
|
\ +(\d+) |
||||
|
\ (\d{4}-\d{2}-\d{2}\ \d{2}:\d{2}:\d{2}) |
||||
|
\ ([^\n]+) |
||||
|
""", |
||||
|
line, |
||||
|
re.VERBOSE, |
||||
|
) |
||||
|
assert match |
||||
|
assert match[1] == tree.type_prefix |
||||
|
assert match[2] == str(tree.perms) |
||||
|
if tree.type_prefix != "d": |
||||
|
assert match[3] == str(len(tree.content)) |
||||
|
assert match[4] == tree.time.astimezone().replace(tzinfo=None).isoformat( |
||||
|
" ", "seconds" |
||||
|
) |
||||
|
assert match[5] == tree.name |
||||
|
|
||||
|
pass |
||||
@ -0,0 +1,56 @@ |
|||||
|
# pybsv - Backup, Synchronization, Versioning. |
||||
|
# Copyright (C) 2025 Simon Boyé |
||||
|
# |
||||
|
# This program is free software: you can redistribute it and/or modify |
||||
|
# it under the terms of the GNU Affero General Public License as published by |
||||
|
# the Free Software Foundation, either version 3 of the License, or |
||||
|
# (at your option) any later version. |
||||
|
# |
||||
|
# This program is distributed in the hope that it will be useful, |
||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||
|
# GNU Affero General Public License for more details. |
||||
|
# |
||||
|
# You should have received a copy of the GNU Affero General Public License |
||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>. |
||||
|
"""Tests for cli_utils.py.""" |
||||
|
|
||||
|
from __future__ import annotations |
||||
|
|
||||
|
from bsv.cli_utils import format_human_byte_size |
||||
|
|
||||
|
|
||||
|
def test_format_human_byte_size(): |
||||
|
assert format_human_byte_size(0) == "0B" |
||||
|
assert format_human_byte_size(1) == "1B" |
||||
|
assert format_human_byte_size(9) == "9B" |
||||
|
assert format_human_byte_size(10) == "10B" |
||||
|
assert format_human_byte_size(99) == "99B" |
||||
|
assert format_human_byte_size(100) == "100B" |
||||
|
assert format_human_byte_size(999) == "999B" |
||||
|
assert format_human_byte_size(1000) == "1000B" |
||||
|
assert format_human_byte_size(1023) == "1023B" |
||||
|
assert format_human_byte_size(2**10) == "1KiB" |
||||
|
assert format_human_byte_size(int(1.23456 * 2**10)) == "1.23KiB" |
||||
|
assert format_human_byte_size(9 * 2**10) == "9KiB" |
||||
|
assert format_human_byte_size(10 * 2**10 - 1) == "10KiB" |
||||
|
assert format_human_byte_size(int(98.76543 * 2**10)) == "98.8KiB" |
||||
|
assert format_human_byte_size(99 * 2**10 - 1) == "99KiB" |
||||
|
assert format_human_byte_size(100 * 2**10 - 1) == "100KiB" |
||||
|
assert format_human_byte_size(int(192.8374 * 2**10)) == "193KiB" |
||||
|
assert format_human_byte_size(999 * 2**10 - 1) == "999KiB" |
||||
|
assert format_human_byte_size(1000 * 2**10 - 1) == "1000KiB" |
||||
|
assert format_human_byte_size(2**20 - 1) == "1MiB" |
||||
|
assert format_human_byte_size(2**20) == "1MiB" |
||||
|
assert format_human_byte_size(2**30) == "1GiB" |
||||
|
assert format_human_byte_size(2**40) == "1TiB" |
||||
|
assert format_human_byte_size(2**50) == "1PiB" |
||||
|
assert format_human_byte_size(2**60) == "1EiB" |
||||
|
assert format_human_byte_size(2**70) == "1ZiB" |
||||
|
assert format_human_byte_size(2**80) == "1YiB" |
||||
|
assert format_human_byte_size(2**90) == "1RiB" |
||||
|
assert format_human_byte_size(2**100 - 2**80) == "1QiB" |
||||
|
assert format_human_byte_size(2**100) == "1QiB" |
||||
|
assert format_human_byte_size(2**110 - 2**90) == "1024QiB" |
||||
|
assert format_human_byte_size(2**110) == "1024QiB" |
||||
|
assert format_human_byte_size(2**120) == "1048576QiB" |
||||
@ -0,0 +1,394 @@ |
|||||
|
# pybsv - Backup, Synchronization, Versioning. |
||||
|
# Copyright (C) 2025 Simon Boyé |
||||
|
# |
||||
|
# This program is free software: you can redistribute it and/or modify |
||||
|
# it under the terms of the GNU Affero General Public License as published by |
||||
|
# the Free Software Foundation, either version 3 of the License, or |
||||
|
# (at your option) any later version. |
||||
|
# |
||||
|
# This program is distributed in the hope that it will be useful, |
||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||
|
# GNU Affero General Public License for more details. |
||||
|
# |
||||
|
# You should have received a copy of the GNU Affero General Public License |
||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>. |
||||
|
"""Tests for the `VirtualFileSystem` class and related stuff.""" |
||||
|
|
||||
|
from __future__ import annotations |
||||
|
|
||||
|
from datetime import UTC, datetime |
||||
|
from io import BytesIO |
||||
|
from pathlib import Path, PurePosixPath |
||||
|
|
||||
|
import pytest |
||||
|
|
||||
|
from bsv.vfs import FileMetadata, FsError, Permissions, VirtualFileSystem |
||||
|
|
||||
|
|
||||
|
@pytest.fixture |
||||
|
def fs(tmp_path: Path) -> VirtualFileSystem: |
||||
|
"""Fixture that returns a `VirtualFileSystem`.""" |
||||
|
return VirtualFileSystem(tmp_path) |
||||
|
|
||||
|
|
||||
|
######################################################################################## |
||||
|
# Permissions |
||||
|
|
||||
|
|
||||
|
def test_permissions(): |
||||
|
perm0 = Permissions(0o1234) |
||||
|
assert perm0.unix_perms == 0o1234 |
||||
|
|
||||
|
perm1 = Permissions("752") |
||||
|
assert perm1.unix_perms == 0o752 |
||||
|
|
||||
|
assert perm0 == perm0 |
||||
|
assert perm0 != perm1 |
||||
|
|
||||
|
assert repr(perm0) == "Permissions(0o1234)" |
||||
|
assert repr(perm1) == "Permissions(0o0752)" |
||||
|
|
||||
|
assert str(perm0) == "-w--wxr-T" |
||||
|
assert str(perm1) == "rwxr-x-w-" |
||||
|
|
||||
|
|
||||
|
######################################################################################## |
||||
|
# FileMetadata |
||||
|
|
||||
|
|
||||
|
def test_file_metadata(): |
||||
|
path = PurePosixPath("/some_dir/some_file") |
||||
|
permissions = Permissions(0o1234) |
||||
|
mod_time = datetime(2025, 7, 12, 12, 34, 56, tzinfo=UTC) |
||||
|
|
||||
|
file_md = FileMetadata( |
||||
|
path, |
||||
|
type="file", |
||||
|
permissions=permissions, |
||||
|
modification_time=mod_time, |
||||
|
byte_size=123, |
||||
|
) |
||||
|
|
||||
|
assert file_md.path == path |
||||
|
assert file_md.type == "file" |
||||
|
assert file_md.permissions == permissions |
||||
|
assert file_md.modification_time == mod_time |
||||
|
assert file_md.byte_size == 123 |
||||
|
assert file_md.unix_mode == "--w--wxr-T" |
||||
|
assert not file_md.is_hidden |
||||
|
assert file_md.is_file |
||||
|
assert not file_md.is_dir |
||||
|
assert not file_md.is_symlink |
||||
|
assert not file_md.is_other |
||||
|
|
||||
|
dir_md = FileMetadata( |
||||
|
path, |
||||
|
type="dir", |
||||
|
permissions=permissions, |
||||
|
modification_time=mod_time, |
||||
|
byte_size=123, |
||||
|
) |
||||
|
assert dir_md.type == "dir" |
||||
|
assert not dir_md.is_file |
||||
|
assert dir_md.is_dir |
||||
|
assert not dir_md.is_symlink |
||||
|
assert not dir_md.is_other |
||||
|
|
||||
|
symlink_md = FileMetadata( |
||||
|
path, |
||||
|
type="symlink", |
||||
|
permissions=permissions, |
||||
|
modification_time=mod_time, |
||||
|
byte_size=123, |
||||
|
) |
||||
|
assert symlink_md.type == "symlink" |
||||
|
assert not symlink_md.is_file |
||||
|
assert not symlink_md.is_dir |
||||
|
assert symlink_md.is_symlink |
||||
|
assert not symlink_md.is_other |
||||
|
|
||||
|
other_md = FileMetadata( |
||||
|
path, |
||||
|
type="other", |
||||
|
permissions=permissions, |
||||
|
modification_time=mod_time, |
||||
|
byte_size=123, |
||||
|
) |
||||
|
assert other_md.type == "other" |
||||
|
assert not other_md.is_file |
||||
|
assert not other_md.is_dir |
||||
|
assert not other_md.is_symlink |
||||
|
assert other_md.is_other |
||||
|
|
||||
|
assert FileMetadata( |
||||
|
PurePosixPath("/some_dir/.some_file"), |
||||
|
type="file", |
||||
|
permissions=permissions, |
||||
|
modification_time=mod_time, |
||||
|
byte_size=123, |
||||
|
).is_hidden |
||||
|
|
||||
|
|
||||
|
def test_file_metadata_eq(): |
||||
|
path = PurePosixPath("/some_dir/some_file") |
||||
|
permissions = Permissions(0o1234) |
||||
|
mod_time = datetime(2025, 7, 12, 12, 34, 56, tzinfo=UTC) |
||||
|
|
||||
|
md = FileMetadata( |
||||
|
path, |
||||
|
type="file", |
||||
|
permissions=permissions, |
||||
|
modification_time=mod_time, |
||||
|
byte_size=123, |
||||
|
) |
||||
|
|
||||
|
assert ( |
||||
|
FileMetadata( |
||||
|
path, |
||||
|
type="file", |
||||
|
permissions=permissions, |
||||
|
modification_time=mod_time, |
||||
|
byte_size=123, |
||||
|
) |
||||
|
== md |
||||
|
) |
||||
|
assert ( |
||||
|
FileMetadata( |
||||
|
PurePosixPath("/some_dir/some_other_file"), |
||||
|
type="file", |
||||
|
permissions=permissions, |
||||
|
modification_time=mod_time, |
||||
|
byte_size=123, |
||||
|
) |
||||
|
!= md |
||||
|
) |
||||
|
assert ( |
||||
|
FileMetadata( |
||||
|
path, |
||||
|
type="dir", |
||||
|
permissions=permissions, |
||||
|
modification_time=mod_time, |
||||
|
byte_size=123, |
||||
|
) |
||||
|
!= md |
||||
|
) |
||||
|
assert ( |
||||
|
FileMetadata( |
||||
|
path, |
||||
|
type="file", |
||||
|
permissions=Permissions(0o0752), |
||||
|
modification_time=mod_time, |
||||
|
byte_size=123, |
||||
|
) |
||||
|
!= md |
||||
|
) |
||||
|
assert ( |
||||
|
FileMetadata( |
||||
|
path, |
||||
|
type="file", |
||||
|
permissions=permissions, |
||||
|
modification_time=datetime(2025, 1, 2, 3, 4, 5), |
||||
|
byte_size=123, |
||||
|
) |
||||
|
!= md |
||||
|
) |
||||
|
assert ( |
||||
|
FileMetadata( |
||||
|
path, |
||||
|
type="file", |
||||
|
permissions=permissions, |
||||
|
modification_time=mod_time, |
||||
|
byte_size=124, |
||||
|
) |
||||
|
!= md |
||||
|
) |
||||
|
|
||||
|
|
||||
|
######################################################################################## |
||||
|
# mkdir |
||||
|
|
||||
|
|
||||
|
def test_mkdir_fails_with_relative_path(fs: VirtualFileSystem): |
||||
|
with pytest.raises(FsError): |
||||
|
fs.mkdir("test") |
||||
|
|
||||
|
|
||||
|
def test_mkdir_default(fs: VirtualFileSystem): |
||||
|
assert not fs.exists("/test") |
||||
|
fs.mkdir("/test") |
||||
|
assert fs.is_dir("/test") |
||||
|
|
||||
|
|
||||
|
def test_mkdir_nested_fails_without_parents(fs: VirtualFileSystem): |
||||
|
assert not fs.exists("/foo") |
||||
|
with pytest.raises(FsError): |
||||
|
fs.mkdir("/foo/bar") |
||||
|
|
||||
|
|
||||
|
def test_mkdir_nested(fs: VirtualFileSystem): |
||||
|
assert not fs.exists("/test") |
||||
|
fs.mkdir("/test/foobar", parents=True) |
||||
|
assert fs.is_dir("/test/foobar") |
||||
|
|
||||
|
|
||||
|
def test_mkdir_fails_if_exists(fs: VirtualFileSystem): |
||||
|
assert not fs.exists("/foo") |
||||
|
fs.mkdir("/foo") |
||||
|
assert fs.is_dir("/foo") |
||||
|
with pytest.raises(FsError): |
||||
|
fs.mkdir("/foo") |
||||
|
|
||||
|
|
||||
|
def test_mkdir_exists_ok(fs: VirtualFileSystem): |
||||
|
assert not fs.exists("/test") |
||||
|
fs.mkdir("/test") |
||||
|
assert fs.is_dir("/test") |
||||
|
fs.mkdir("/test", exist_ok=True) |
||||
|
|
||||
|
|
||||
|
def test_mkdir_exists_ok_fail_if_file(fs: VirtualFileSystem): |
||||
|
fs.write_bytes("/test", b"test") |
||||
|
assert fs.is_file("/test") |
||||
|
with pytest.raises(FsError): |
||||
|
fs.mkdir("/test", exist_ok=True) |
||||
|
|
||||
|
|
||||
|
def test_mkdir_mode(fs: VirtualFileSystem): |
||||
|
assert not fs.exists("/test") |
||||
|
permissions = Permissions(0o741) |
||||
|
fs.mkdir("/test", mode=permissions) |
||||
|
assert fs.is_dir("/test") |
||||
|
assert fs.metadata("/test").permissions == permissions |
||||
|
|
||||
|
|
||||
|
######################################################################################## |
||||
|
# read_bytes / write_bytes |
||||
|
|
||||
|
|
||||
|
def test_read_write_bytes(fs: VirtualFileSystem): |
||||
|
assert not fs.exists("/test") |
||||
|
|
||||
|
fs.write_bytes("/test", b"This is a test.") |
||||
|
assert fs.read_bytes("/test") == b"This is a test." |
||||
|
|
||||
|
stream = BytesIO(b"Another test.") |
||||
|
fs.write_bytes("/test", stream) |
||||
|
assert fs.read_bytes("/test") == b"Another test." |
||||
|
|
||||
|
with pytest.raises(FsError): |
||||
|
fs.read_bytes("/does_not_exist") |
||||
|
|
||||
|
with pytest.raises(FsError): |
||||
|
fs.write_bytes("/does_not_exist/foobar", b"") |
||||
|
|
||||
|
|
||||
|
def test_open_read_write(fs: VirtualFileSystem): |
||||
|
assert not fs.exists("/test") |
||||
|
|
||||
|
with fs.open_write("/test") as stream: |
||||
|
stream.write(b"foo") |
||||
|
stream.write(b"bar") |
||||
|
|
||||
|
assert fs.exists("/test") |
||||
|
with fs.open_read("/test") as stream: |
||||
|
assert stream.read(3) == b"foo" |
||||
|
assert stream.read(3) == b"bar" |
||||
|
assert stream.read() == b"" |
||||
|
|
||||
|
# Test overwrite |
||||
|
with fs.open_write("/test") as stream: |
||||
|
stream.write(b"baz") |
||||
|
|
||||
|
with fs.open_read("/test") as stream: |
||||
|
assert stream.read() == b"baz" |
||||
|
|
||||
|
with pytest.raises(FsError): |
||||
|
fs.open_read("/does_not_exist") |
||||
|
|
||||
|
with pytest.raises(FsError): |
||||
|
fs.open_write("/does_not_exist/foobar") |
||||
|
|
||||
|
|
||||
|
######################################################################################## |
||||
|
# metadata |
||||
|
|
||||
|
|
||||
|
def test_metadata(fs: VirtualFileSystem): |
||||
|
file_permissions = Permissions(0o754) |
||||
|
file_time = datetime(2025, 5, 17, 13, 57, 32, tzinfo=UTC) |
||||
|
file_content = b"This is a test\n" |
||||
|
|
||||
|
fs.write_bytes("/test_file", file_content) |
||||
|
fs.set_permissions("/test_file", file_permissions) |
||||
|
fs.set_modification_time("/test_file", file_time) |
||||
|
|
||||
|
md = fs.metadata("/test_file") |
||||
|
assert md.path == PurePosixPath("/test_file") |
||||
|
assert md.permissions == file_permissions |
||||
|
assert md.type == "file" |
||||
|
assert md.modification_time == file_time |
||||
|
assert md.byte_size == len(file_content) |
||||
|
assert not md.is_hidden |
||||
|
assert fs.metadata("/test_file") == md |
||||
|
assert fs.is_file("/test_file") |
||||
|
assert not fs.is_dir("/test_file") |
||||
|
assert not fs.is_symlink("/test_file") |
||||
|
assert not fs.is_other("/test_file") |
||||
|
|
||||
|
fs.set_permissions("/test_file", Permissions(0o644)) |
||||
|
assert fs.metadata("/test_file") != md |
||||
|
|
||||
|
fs.mkdir("/.test_dir") |
||||
|
md = fs.metadata("/.test_dir") |
||||
|
assert md.type == "dir" |
||||
|
assert fs.metadata("/.test_dir").is_hidden |
||||
|
assert not fs.is_file("/.test_dir") |
||||
|
assert fs.is_dir("/.test_dir") |
||||
|
assert not fs.is_symlink("/.test_dir") |
||||
|
assert not fs.is_other("/.test_dir") |
||||
|
|
||||
|
fs.make_link("/test_link", "/link_target") |
||||
|
md = fs.metadata("/test_link") |
||||
|
assert md.type == "symlink" |
||||
|
assert not fs.is_file("/test_link") |
||||
|
assert not fs.is_dir("/test_link") |
||||
|
assert fs.is_symlink("/test_link") |
||||
|
assert not fs.is_other("/test_link") |
||||
|
|
||||
|
assert fs.metadata_or_none("/does_not_exist") is None |
||||
|
with pytest.raises(FsError): |
||||
|
fs.metadata("/does_not_exist") |
||||
|
|
||||
|
|
||||
|
######################################################################################## |
||||
|
# iter_dir |
||||
|
|
||||
|
|
||||
|
def test_iter_dir(fs: VirtualFileSystem): |
||||
|
expected = [ |
||||
|
(PurePosixPath("/dir"), "dir"), |
||||
|
(PurePosixPath("/file"), "file"), |
||||
|
(PurePosixPath("/link"), "symlink"), |
||||
|
] |
||||
|
for path, file_type in expected: |
||||
|
if file_type == "dir": |
||||
|
fs.mkdir(path) |
||||
|
elif file_type == "file": |
||||
|
fs.write_bytes(path, b"") |
||||
|
elif file_type == "symlink": |
||||
|
fs.make_link(path, "/foobar") |
||||
|
|
||||
|
items_metadata = sorted(fs.iter_dir("/")) |
||||
|
for md, [path, file_type] in zip(items_metadata, expected, strict=True): |
||||
|
assert md.path == path |
||||
|
assert md.type == file_type |
||||
|
|
||||
|
|
||||
|
def test_iter_dir_failure(fs: VirtualFileSystem): |
||||
|
with pytest.raises(FsError): |
||||
|
list(fs.iter_dir("/test")) |
||||
|
|
||||
|
fs.write_bytes("/test", b"") |
||||
|
with pytest.raises(FsError): |
||||
|
list(fs.iter_dir("/test")) |
||||
Loading…
Reference in new issue