# bsv - Backup, Synchronization, Versioning # Copyright (C) 2023 Simon Boyé # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from __future__ import annotations from datetime import UTC, datetime from os import stat_result from pathlib import Path from random import randbytes from typing import Iterator import pytest from tempfile import TemporaryDirectory from bsv.repository import Repository, Snapshot, Tree, TreeItem, create_repository, timestamp_from_time from bsv.simple_cas.cas import Digest from bsv.tree_walker import Action, IgnoreCause, TreeWalker @pytest.fixture def tmp_dir(): with TemporaryDirectory(prefix="simple_cas_") as tmp_dir: yield Path(tmp_dir) @pytest.fixture def repo(tmp_dir): return create_repository( tmp_dir / "bsv", "test_repo", ) def test_read_write_blob(tmp_dir: Path, repo: Repository): path = tmp_dir / "test.dat" make_random_file(path, 1 << 20) with path.open("rb") as stream: digest = repo.add_blob(stream) blob = repo.get_blob(digest) data = blob.reader().read() with path.open("rb") as stream: assert data == stream.read() def test_read_write_tree(repo: Repository): now = datetime.now(UTC) tree = Tree( repo, [ TreeItem( digest = Digest(bytes([42]) * repo._cas._digest_size), object_type = b"blob", size = 123, permissions = 0o744, modification_timestamp = timestamp_from_time(now), name = "xyz", ), TreeItem( digest = Digest(bytes([123]) * repo._cas._digest_size), object_type = b"slnk", size = 42, permissions = 0o777, modification_timestamp = timestamp_from_time(now), name = "foobar", ), ] ) assert Tree.from_bytes(repo, tree.to_bytes()) == tree digest = repo.add_tree(tree) assert repo.get_tree(digest) == tree def test_read_write_snapshot(repo: Repository): snapshot = Snapshot( repo = repo, tree_digest = Digest(bytes([42]) * repo._cas._digest_size), repo_name = "test_repo", timestamp = timestamp_from_time(datetime.now()), ) assert Snapshot.from_bytes(repo, snapshot.to_bytes()) == snapshot digest = repo.add_snapshot(snapshot) assert repo.get_snapshot(digest) == snapshot class TestTreeWalker(TreeWalker): reports: list def __init__(self, repo: Repository, dry_run: bool=False): super().__init__(repo, dry_run) self.reports = [] def report( self, action: Action, path: Path, pstat: stat_result | None, info: IgnoreCause | Exception | None = None ): super().report(action, path, pstat, info) self.reports.append((action, path, pstat, info)) def test_add_tree(tmp_dir: Path, repo: Repository): dir = tmp_dir / "test" structure = { "folder": { "sub_folder": { "empty_folder": {}, "foo.txt": b"Hello World!\n", }, "test.py": b"print(\"Hello World!\")\n", "bar.dat": bytes(range(256)), }, "Another test with long name and spaces and a bang !": b"Should works.\n", "bsv_repo": { "bsv_config.toml": b"[bsv]\n", }, } create_file_structure(dir, structure) walker = TestTreeWalker(repo) dir_digest = walker.add_tree(dir) def check(digest: Digest, value: dict | bytes): if isinstance(value, dict): tree = repo.get_tree(digest) assert tree assert list(map(lambda i: i.name, tree.items)) == sorted(value.keys()) for item in tree.items: check(item.digest, value[item.name]) elif isinstance(value, bytes): blob = repo.get_blob(digest) data = blob.reader().read() assert data == value expected = dict(structure) del expected["bsv_repo"] check(dir_digest, expected) def create_file_structure(dst: Path, value: dict | bytes): assert not dst.exists() if isinstance(value, dict): dst.mkdir() for name, item in value.items(): create_file_structure(dst / name, item) elif isinstance(value, bytes): dst.write_bytes(value) else: raise TypeError(f"invalid type {type(value).__name__} for parameter value") def make_random_file(path: Path, size: int): with path.open("wb") as stream: for chunk_size in iter_chunks(size): stream.write(randbytes(chunk_size)) def iter_chunks(size: int, chunk_size: int=1 << 16) -> Iterator[int]: num_full_chunks = (size - 1) // chunk_size for _ in range(num_full_chunks): yield chunk_size offset = num_full_chunks * chunk_size if offset != size: yield size - offset