Backup, Synchronization, Versioning.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

270 lines
8.1 KiB

# pybsv - Backup, Synchronization, Versioning.
# Copyright (C) 2025 Simon Boyé
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from contextlib import contextmanager
from datetime import UTC, datetime
import os
from pathlib import Path
import re
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING, Literal, NamedTuple
from click.testing import CliRunner
from hypothesis import given
import hypothesis.strategies as st
import pytest
from bsv import cli
from bsv.vfs import Permissions
if TYPE_CHECKING:
from collections.abc import Generator
@pytest.fixture
def runner(tmp_path: Path) -> CliRunner:
runner = CliRunner(env={"BSV_REPO": str(tmp_path)})
return runner
@contextmanager
def make_runner() -> Generator[CliRunner, None, None]:
with TemporaryDirectory(prefix="test_vfs_") as tmp:
runner = CliRunner(env={"BSV_REPO": tmp})
yield runner
########################################################################################
# mkdir
def test_mkdir_fails_with_relative_path(tmp_path: Path, runner: CliRunner):
assert not (tmp_path / "test").exists()
result = runner.invoke(cli.cli, ["mkdir", "test"])
assert result.exit_code == 2
assert "test is not an absolute path" in result.stderr
def test_mkdir_default(tmp_path: Path, runner: CliRunner):
assert not (tmp_path / "test").exists()
result = runner.invoke(cli.cli, ["mkdir", "/test"])
assert result.exit_code == 0
assert result.stderr == ""
assert result.stdout == ""
assert (tmp_path / "test").is_dir()
def test_mkdir_multiple_dirs(tmp_path: Path, runner: CliRunner):
assert not (tmp_path / "foo").exists()
assert not (tmp_path / "bar").exists()
result = runner.invoke(cli.cli, ["mkdir", "/foo", "/bar"])
assert result.exit_code == 0
assert result.stderr == ""
assert result.stdout == ""
assert (tmp_path / "foo").is_dir()
assert (tmp_path / "bar").is_dir()
def test_mkdir_nested_fails_without_parents(tmp_path: Path, runner: CliRunner):
assert not (tmp_path / "foo").exists()
result = runner.invoke(cli.cli, ["mkdir", "/foo/bar"])
assert result.exit_code == 1
assert result.stderr == "/foo does not exist\n"
assert result.stdout == ""
def test_mkdir_nested(tmp_path: Path, runner: CliRunner):
assert not (tmp_path / "foo/bar").exists()
result = runner.invoke(cli.cli, ["mkdir", "--parents", "/foo/bar"])
assert result.exit_code == 0
assert result.stderr == ""
assert result.stdout == ""
assert (tmp_path / "foo/bar").is_dir()
def test_mkdir_message_if_exists(tmp_path: Path, runner: CliRunner):
result = runner.invoke(cli.cli, ["mkdir", "/test"])
assert result.exit_code == 0
assert result.stderr == ""
assert result.stdout == ""
assert (tmp_path / "test").is_dir()
result = runner.invoke(cli.cli, ["mkdir", "/test"])
assert result.exit_code == 0
assert result.stderr == "/test already exists\n"
assert result.stdout == ""
assert (tmp_path / "test").is_dir()
def test_mkdir_mode(tmp_path: Path, runner: CliRunner):
assert not (tmp_path / "test").exists()
result = runner.invoke(cli.cli, ["mkdir", "/test", "--mode=741"])
assert result.exit_code == 0
assert result.stderr == ""
assert result.stdout == ""
assert (tmp_path / "test").is_dir()
assert (tmp_path / "test").stat().st_mode & 0o7777 == 0o741
def test_mkdir_verbose(tmp_path: Path, runner: CliRunner):
result = runner.invoke(cli.cli, ["mkdir", "/foo"])
assert result.exit_code == 0
assert result.stderr == ""
assert result.stdout == ""
assert (tmp_path / "foo").is_dir()
assert not (tmp_path / "bar").exists()
result = runner.invoke(cli.cli, ["mkdir", "--verbose", "/foo", "/bar"])
assert result.exit_code == 0
assert result.stderr == "/foo already exists\n"
assert result.stdout == "Created /bar\n"
assert (tmp_path / "foo").is_dir()
assert (tmp_path / "bar").is_dir()
########################################################################################
# ls
def permissions(target: Literal["file", "dir"] = "file"):
return st.builds(
Permissions,
st.sampled_from(
[
0o0400,
0o0440,
0o0444,
0o0600,
0o0640,
0o0644,
0o0664,
0o0750,
0o0755,
0o0777,
]
if target == "file"
else [
0o0400,
0o0600,
0o0640,
]
),
)
class Tree(NamedTuple):
type: Literal["file", "dir"]
name: str
perms: Permissions
time: datetime
content: bytes | list[Tree]
@property
def type_prefix(self) -> str:
if self.type == "dir":
return "d"
return "-"
def build(self, parent: Path) -> None:
path = parent / self.name
if isinstance(self.content, list):
path.mkdir(mode=self.perms.unix_perms)
for child in self.content:
child.build(path)
else:
path.write_bytes(self.content)
path.chmod(self.perms.unix_perms)
ts = self.time.timestamp()
os.utime(path, (ts, ts))
def filenames() -> st.SearchStrategy:
return st.text(
st.characters(exclude_categories=["Cc", "Cs"], exclude_characters='<>:"/\\|!*'),
min_size=1,
max_size=255,
).filter(lambda t: len(t.encode()) < 256 and t not in (".", ".."))
@st.composite
def trees(draw: st.DrawFn, max_depth: int = 3) -> Tree:
file_type = draw(st.sampled_from(["file", "dir"]))
content = (
st.binary()
if file_type == "file"
else st.lists(
trees(max_depth - 1),
unique_by=lambda t: t.name,
max_size=0 if max_depth == 0 else 10,
)
)
return Tree(
file_type, # type: ignore
draw(filenames()),
draw(permissions(file_type)), # type: ignore
draw(
st.datetimes(
min_value=datetime(1902, 1, 1),
max_value=datetime(2100, 1, 1),
timezones=st.just(UTC),
)
),
draw(content),
)
def trees_lists(max_depth: int = 3) -> st.SearchStrategy:
return st.lists(trees(max_depth=max_depth), unique_by=lambda t: t.name)
@given(trees=trees_lists(max_depth=0))
def test_ls(trees: list[Tree]):
with make_runner() as runner:
path = Path(runner.env["BSV_REPO"] or "")
for tree in trees:
tree.build(path)
result = runner.invoke(cli.cli, ["ls", "-lA"])
assert result.exit_code == 0
assert result.stderr == ""
trees.sort(key=lambda t: t.name)
lines = [line for line in result.stdout.splitlines() if line != "\n"]
for line, tree in zip(lines, trees, strict=True):
match = re.fullmatch(
r"""
([dl-])([r-][w-][x-][r-][w-][x-][r-][w-][x-])
\ +(\d+)
\ (\d{4}-\d{2}-\d{2}\ \d{2}:\d{2}:\d{2})
\ ([^\n]+)
""",
line,
re.VERBOSE,
)
assert match
assert match[1] == tree.type_prefix
assert match[2] == str(tree.perms)
if tree.type_prefix != "d":
assert match[3] == str(len(tree.content))
assert match[4] == tree.time.astimezone().replace(tzinfo=None).isoformat(
" ", "seconds"
)
assert match[5] == tree.name
pass