# bsv - Backup, Synchronization, Versioning # Copyright (C) 2023 Simon Boyé # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from __future__ import annotations from datetime import UTC, datetime from os import stat_result from pathlib import Path from random import randbytes from shutil import rmtree from typing import Iterator import pytest from tempfile import TemporaryDirectory from bsv.repository import Repository, Snapshot, Tree, TreeItem, create_repository, timestamp_us_from_time from bsv.simple_cas.cas import Digest from bsv.tree_walker import Action, IgnoreCause, TreeWalker @pytest.fixture def tmp_dir(): with TemporaryDirectory(prefix="simple_cas_") as tmp_dir: yield Path(tmp_dir) @pytest.fixture def repo(tmp_dir): return create_repository( tmp_dir / "bsv", "test_repo", ) def test_read_write_blob(tmp_dir: Path, repo: Repository): path = tmp_dir / "test.dat" make_random_file(path, 1 << 20) with path.open("rb") as stream: digest = repo.add_blob(stream) blob = repo.get_blob(digest) data = blob.reader().read() with path.open("rb") as stream: assert data == stream.read() def test_read_write_tree(repo: Repository): now = datetime.now(UTC) tree = Tree( repo, [ TreeItem( digest = Digest(bytes([42]) * repo._cas._digest_size), object_type = b"blob", size = 123, permissions = 0o744, modification_timestamp_us = timestamp_us_from_time(now), name = "xyz", ), TreeItem( digest = Digest(bytes([123]) * repo._cas._digest_size), object_type = b"slnk", size = 42, permissions = 0o777, modification_timestamp_us = timestamp_us_from_time(now), name = "foobar", ), ] ) assert Tree.from_bytes(repo, tree.to_bytes()) == tree digest = repo.add_tree(tree) assert repo.get_tree(digest) == tree def test_read_write_snapshot(repo: Repository): snapshot = Snapshot( repo = repo, tree_digest = Digest(bytes([42]) * repo._cas._digest_size), parents = [ Digest(bytes([123]) * repo._cas._digest_size), Digest(bytes([124]) * repo._cas._digest_size), ], repo_name = "test_repo", timestamp_us = timestamp_us_from_time(datetime.now()), ) assert Snapshot.from_bytes(repo, snapshot.to_bytes()) == snapshot digest = repo.add_snapshot(snapshot) assert repo.get_snapshot(digest) == snapshot class TestTreeWalker(TreeWalker): reports: list def __init__(self, repo: Repository, dry_run: bool=False): super().__init__(repo, dry_run=dry_run) self.reports = [] def report( self, action: Action, path: Path, pstat: stat_result | None, info: IgnoreCause | Exception | None = None ): super().report(action, path, pstat, info) self.reports.append((action, path, info if action != Action.REMOVE else None)) def test_add_tree(tmp_dir: Path, repo: Repository): dir = tmp_dir / "test0" structure0 = { "folder": { "sub_folder": { "empty_folder": {}, "foo.txt": b"Hello World!\n", }, "test.py": b"print(\"Hello World!\")\n", "bar.dat": bytes(range(256)), }, "Another test with long name and spaces and a bang !": b"Should works.\n", "bsv_repo": { "bsv_config.toml": b"[bsv]\n", }, } structure1 = { "folder": { "sub_folder": { "empty_folder": {}, "foo.txt": b"Hello World!\n", }, "bar.dat": bytes(range(256)) * 2, }, "new_file": b"whatever", "Another test with long name and spaces and a bang !": b"Should works.\n", "bsv_repo": { "bsv_config.toml": b"[bsv]\n", }, } expected0 = dict(structure0) del expected0["bsv_repo"] expected1 = dict(structure1) del expected1["bsv_repo"] create_file_structure(dir, structure0) def check(digest: Digest, value: dict | bytes): if isinstance(value, dict): tree = repo.get_tree(digest) assert tree assert list(map(lambda i: i.name, tree.items)) == sorted(value.keys()) for item in tree.items: check(item.digest, value[item.name]) elif isinstance(value, bytes): blob = repo.get_blob(digest) data = blob.reader().read() assert data == value walker = TestTreeWalker(repo) dir_digest0 = walker.add_tree(dir) assert walker.reports == [ (Action.ADD, dir / "Another test with long name and spaces and a bang !", None), (Action.IGNORE, dir / "bsv_repo", IgnoreCause.IGNORE_RULE), (Action.ADD, dir / "folder/bar.dat", None), (Action.ADD, dir / "folder/sub_folder/empty_folder", None), (Action.ADD, dir / "folder/sub_folder/foo.txt", None), (Action.ADD, dir / "folder/sub_folder", None), (Action.ADD, dir / "folder/test.py", None), (Action.ADD, dir / "folder", None), (Action.ADD, dir, None), ] check(dir_digest0, expected0) create_file_structure(dir, structure1) walker.reports.clear() dir_digest1 = walker.add_tree(dir, source_digest=dir_digest0) assert walker.reports == [ (Action.IGNORE, dir / "Another test with long name and spaces and a bang !", IgnoreCause.UNCHANGED), (Action.IGNORE, dir / "bsv_repo", IgnoreCause.IGNORE_RULE), (Action.UPDATE, dir / "folder/bar.dat", None), (Action.IGNORE, dir / "folder/sub_folder/empty_folder", IgnoreCause.UNCHANGED), (Action.IGNORE, dir / "folder/sub_folder/foo.txt", IgnoreCause.UNCHANGED), (Action.IGNORE, dir / "folder/sub_folder", IgnoreCause.UNCHANGED), (Action.REMOVE, dir / "folder/test.py", None), (Action.UPDATE, dir / "folder", None), (Action.ADD, dir / "new_file", None), (Action.UPDATE, dir, None), ] check(dir_digest1, expected1) def create_file_structure(dst: Path, value: dict | bytes): if isinstance(value, bytes): if dst.is_dir(): rmtree(str(dst)) if not dst.is_file() or dst.read_bytes() != value: dst.write_bytes(value) elif isinstance(value, dict): if dst.is_file(): dst.unlink() if not dst.is_dir(): dst.mkdir() items = sorted(value.items()) fs_paths = sorted(dst.iterdir()) item_index = 0 fs_path_index = 0 while item_index < len(value) or fs_path_index < len(fs_paths): name, subitem = items[item_index] if item_index < len(items) else (None, None) fs_path = fs_paths[fs_path_index] if fs_path_index < len(fs_paths) else None if name and fs_path: if name < fs_path.name: fs_path = None elif name > fs_path.name: name = None if name: item_index += 1 if fs_path: fs_path_index += 1 if name: create_file_structure(dst / name, subitem) # type: ignore elif fs_path and fs_path.is_dir(): rmtree(fs_path) elif fs_path: fs_path.unlink() else: raise TypeError(f"invalid type {type(value).__name__} for parameter value") def make_random_file(path: Path, size: int): with path.open("wb") as stream: for chunk_size in iter_chunks(size): stream.write(randbytes(chunk_size)) def iter_chunks(size: int, chunk_size: int=1 << 16) -> Iterator[int]: num_full_chunks = (size - 1) // chunk_size for _ in range(num_full_chunks): yield chunk_size offset = num_full_chunks * chunk_size if offset != size: yield size - offset