Backup, Synchronization, Versioning.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

596 lines
19 KiB

# bsv - Backup, Synchronization, Versioning
# Copyright (C) 2023 Simon Boyé
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime as DateTime
import hashlib
from io import BytesIO
from pathlib import Path, PurePosixPath
import platform
import tomllib
from typing import TYPE_CHECKING, BinaryIO, Callable, Self, Type
from fastcdc import fastcdc
import tomlkit
from bsv import __version__
from bsv.exception import ConfigError
from bsv.object import ObjectInfo
from bsv.path_map import PathMap
from bsv.simple_cas import SimpleCas
from bsv.simple_cas.cas import Digest, SimpleCas
from bsv.util import Hash, read_exact, read_exact_or_eof, time_from_timestamp_us, timestamp_us_from_time
if TYPE_CHECKING:
from bsv.tree_walker import TreeWalker
DEFAULT_MIN_CHUNK_SIZE = 1 << 12
DEFAULT_AVG_CHUNK_SIZE = 1 << 16
DEFAULT_MAX_CHUNK_SIZE = 1 << 20
class Repository:
_path: Path
_name: str
_cas: SimpleCas
_min_chunk_size: int = DEFAULT_MIN_CHUNK_SIZE
_avg_chunk_size: int = DEFAULT_AVG_CHUNK_SIZE
_max_chunk_size: int = DEFAULT_MAX_CHUNK_SIZE
_path_map: PathMap
# _remotes: list[object]
_context_depth: int = 0
def __init__(self, path: Path):
self._path = path
with self.config_file.open("rb") as stream:
config = tomllib.load(stream)
bsv = config.get("bsv", {})
self._name = bsv.get("name") or platform.node()
self._cas = make_cas(
bsv.get("cas"),
self._path,
lambda: hashlib.new(bsv.get("hash")), # type: ignore
)
self._min_chunk_size = bsv.get("min_chunk_size")
self._avg_chunk_size = bsv.get("avg_chunk_size")
self._max_chunk_size = bsv.get("max_chunk_size")
self._path_map = PathMap.from_obj(bsv.get("path_map", []))
@property
def path(self) -> Path:
return self._path
@property
def config_file(self) -> Path:
return self.path / "bsv_config.toml"
@property
def name(self) -> str:
return self._name
@property
def path_map(self) -> PathMap:
return self._path_map.clone()
def get_blob(self, digest: Digest) -> BlobObject:
with self:
obj, blob = self._read(digest, object_type=b"blob")
return BlobObject(
digest = obj.digest,
object_type = obj.object_type,
size = obj.size,
blob = blob,
)
def add_blob(self, stream: BinaryIO, *, dry_run: bool=False) -> BlobObject:
with self:
return self._write(b"blob", stream, dry_run=dry_run)
def get_symlink(self, digest: Digest) -> SymlinkObject:
with self:
obj = self._cas.read(digest, object_type=b"slnk")
return SymlinkObject(
digest = obj.digest,
object_type = obj.object_type,
size = obj.size,
symlink = Symlink.from_bytes(self, obj.data),
)
def add_symlink(self, symlink: Symlink, *, dry_run: bool=False) -> SymlinkObject:
with self:
data = symlink.to_bytes()
return SymlinkObject(
digest = self._cas.write(b"slnk", data, dry_run=dry_run),
object_type = b"slnk",
size = len(data),
symlink = symlink,
)
def add_symlink_from_fs_target(self, fs_symlink: Path, fs_target: Path, *, dry_run: bool=False) -> SymlinkObject:
assert fs_symlink.is_absolute()
return self.add_symlink(
Symlink(
repo = self,
is_absolute = fs_target.is_absolute(),
target = self._path_map.relative_bsv_path(fs_target, relative_to=fs_symlink),
),
dry_run = dry_run,
)
def get_tree(self, digest: Digest) -> TreeObject:
with self:
obj = self._cas.read(digest, object_type=b"tree")
return TreeObject(
digest = obj.digest,
object_type = obj.object_type,
size = obj.size,
tree = Tree.from_bytes(self, obj.data),
)
def add_tree(self, tree: Tree, *, dry_run: bool=False) -> TreeObject:
with self:
data = tree.to_bytes()
return TreeObject(
digest = self._cas.write(b"tree", data, dry_run=dry_run),
object_type = b"tree",
size = len(data),
tree = tree,
)
def add_tree_from_path(self, path: Path, *, dry_run: bool=False) -> TreeObject:
from bsv.tree_walker import TreeWalker
walker = TreeWalker(self, dry_run=dry_run)
return walker.add_tree(path)
def get_snapshot(self, digest: Digest) -> SnapshotObject:
with self:
obj = self._cas.read(digest, object_type=b"snap")
return SnapshotObject(
digest = obj.digest,
object_type = obj.object_type,
size = obj.size,
snapshot = Snapshot.from_bytes(self, obj.data),
)
def add_snapshot(self, snapshot: Snapshot, *, dry_run: bool=False) -> SnapshotObject:
with self:
data = snapshot.to_bytes()
return SnapshotObject(
digest = self._cas.write(b"snap", data, dry_run=dry_run),
object_type = b"snap",
size = len(data),
snapshot = snapshot,
)
# def take_snapshot(
# self,
# parent_digests: list[Digest] = [],
# *,
# walker_type: Type[TreeWalker] | None = None,
# dry_run: bool = False,
# ):
# from bsv.tree_walker import TreeWalker
# walker = (walker_type or TreeWalker)(self, dry_run=dry_run)
# # parents = [
# # self.get_snapshot(digest)
# # for digest in parent_digests
# # ]
# parent = self.get_snapshot(parent_digests[0]) if parent_digests else None
# snapshot = Snapshot(
# repo = self,
# tree_digest = walker.add_virtual_tree(self._path_map, parent=),
# parents = parent_digests,
# repo_name = self._name,
# timestamp = timestamp_us_from_time(DateTime.now()),
# )
# return self.add_snapshot(snapshot, dry_run=dry_run)
def _read(self, digest: Digest, object_type: bytes) -> tuple[ObjectInfo, Blob]:
obj = self._cas.read(digest, object_type=object_type)
stream = BytesIO(obj.data)
return obj, Blob.from_stream(self, stream, digest_size=self._cas._digest_size)
def _write(self, object_type: bytes, stream: BinaryIO, *, dry_run: bool=False) -> BlobObject:
out = BytesIO()
size = 0
chunks = []
for chunk in fastcdc(
stream,
min_size = self._min_chunk_size,
avg_size = self._avg_chunk_size,
max_size = self._max_chunk_size,
fat = True,
):
size += chunk.length
digest = self._cas.write(b"chnk", chunk.data, dry_run=dry_run)
chunks.append(Chunk(digest, chunk.length))
out.write(digest.digest)
out.write(chunk.length.to_bytes(4))
return BlobObject(
digest = self._cas.write(object_type, size.to_bytes(8) + out.getvalue()),
object_type = object_type,
size = 8 + len(out.getvalue()),
blob = Blob(
repo = self,
size = size,
chunks = chunks,
)
)
def __enter__(self):
if self._context_depth == 0:
self._cas.__enter__()
self._context_depth += 1
return self
def __exit__(self, exc_type, exc_value, traceback):
self._context_depth -= 1
if self._context_depth == 0:
return self._cas.__exit__(exc_type, exc_value, traceback)
def create_repository(
destination: Path,
name: str,
cas: str = "simple",
hash: str = "sha256",
min_chunk_size: int = DEFAULT_MIN_CHUNK_SIZE,
avg_chunk_size: int = DEFAULT_AVG_CHUNK_SIZE,
max_chunk_size: int = DEFAULT_MAX_CHUNK_SIZE,
):
from datetime import datetime as DateTime
from os import getlogin
if not name:
raise RuntimeError("repository name cannot be empty")
if not destination.parent.exists():
raise RuntimeError(f"destination directory {destination.parent} does not exists")
if destination.exists() and not destination.is_dir():
raise RuntimeError(f"destination {destination} exists but is not a directory")
if destination.exists() and len(list(destination.iterdir())):
raise RuntimeError(f"destination directory {destination} is not empty")
try:
destination.mkdir(exist_ok=True)
except:
raise RuntimeError(f"failed to create destination directory {destination}")
bsv_table = tomlkit.table()
bsv_table.add(tomlkit.comment("Name of the repository."))
bsv_table.add(tomlkit.comment("Ideally, this should be unique among all connected repositories."))
bsv_table.add("name", name)
bsv_table.add(tomlkit.nl())
bsv_table.add(tomlkit.comment("Mapping between bsv tree and the actual filesystem."))
bsv_table.add("path_map", tomlkit.array())
bsv_table.add("cas", cas)
bsv_table.add("hash", hash)
bsv_table.add("min_chunk_size", min_chunk_size)
bsv_table.add("avg_chunk_size", avg_chunk_size)
bsv_table.add("max_chunk_size", max_chunk_size)
doc = tomlkit.document()
doc.add(tomlkit.comment("bsv repository configuration"))
doc.add(tomlkit.comment(f"Created by {getlogin()} on {DateTime.now().isoformat()}."))
doc.add(tomlkit.nl())
doc.add("bsv", bsv_table)
config_path = destination / "bsv_config.toml"
try:
stream = config_path.open("w", encoding="utf-8")
except:
raise RuntimeError("failed to open configuration file {config_path}")
with stream:
tomlkit.dump(doc, stream)
return Repository(destination)
def make_cas(cas_name: str, path: Path, hash_factory: Callable[[], Hash]) -> SimpleCas:
if cas_name == "simple":
return SimpleCas(path, hash_factory)
raise ConfigError(f"unknown cas name {cas_name}")
@dataclass(slots=True)
class ChunkedObject:
repo: Repository
size: int
chunks: list[Chunk]
@classmethod
def from_stream(cls, repo: Repository, stream: BinaryIO, digest_size: int) -> Self:
self = cls(
repo = repo,
size = int.from_bytes(read_exact(stream, 8)),
chunks = [],
)
while (chunk := Chunk.from_stream(stream, digest_size)) is not None:
self.chunks.append(chunk)
return self
def reader(self) -> ChunkedObjectReader:
return ChunkedObjectReader(self)
@dataclass(frozen=True, slots=True)
class Chunk:
digest: Digest
size: int
@classmethod
def from_stream(cls, stream: BinaryIO, digest_size: int) -> Self | None:
digest_bytes = read_exact_or_eof(stream, digest_size)
if digest_bytes is None:
return None
digest = Digest(digest_bytes)
return cls(
digest = digest,
size = int.from_bytes(read_exact(stream, 4)),
)
class ChunkedObjectReader:
_chunked_object: ChunkedObject
_chunk_index: int = 0
_chunk_data: bytes = b""
def __init__(self, chunked_object: ChunkedObject):
self._chunked_object = chunked_object
def read(self, num_bytes: int = -1) -> bytes:
chunks = self._chunked_object.chunks
parts = [self._chunk_data]
size = len(parts[-1])
while (num_bytes < 0 or size < num_bytes) and self._chunk_index < len(chunks):
parts.append(self.read1())
size += len(parts[-1])
if num_bytes >= 0:
self._chunk_data = parts[-1][num_bytes - size:]
else:
self._chunk_data = b""
return b"".join(parts)
def read1(self) -> bytes:
cas = self._chunked_object.repo._cas
chunks = self._chunked_object.chunks
if self._chunk_index == len(chunks):
return b""
object = cas.read(chunks[self._chunk_index].digest, object_type=b"chnk")
self._chunk_index += 1
return object.data
@dataclass(slots=True)
class Blob(ChunkedObject):
pass
@dataclass(frozen=True, order=True, slots=True)
class BlobObject(ObjectInfo):
blob: Blob
@dataclass(slots=True)
class Symlink:
repo: Repository
is_absolute: bool
target: PurePosixPath
@classmethod
def from_stream(cls, repo: Repository, stream: BinaryIO) -> Self:
return cls(
repo = repo,
is_absolute = bool(read_exact(stream, 1)),
target = PurePosixPath(stream.read().decode("utf-8")),
)
@classmethod
def from_bytes(cls, repo: Repository, bytes: bytes) -> Self:
stream = BytesIO(bytes)
return cls.from_stream(repo, stream)
def write(self, stream: BinaryIO):
stream.write(self.is_absolute.to_bytes(1))
stream.write(self.target.as_posix().encode("utf-8"))
def to_bytes(self) -> bytes:
stream = BytesIO()
self.write(stream)
return stream.getvalue()
@dataclass(frozen=True, order=True, slots=True)
class SymlinkObject(ObjectInfo):
symlink: Symlink
@dataclass
class Tree:
repo: Repository
items: list[TreeItem]
@property
def total_size(self) -> int:
return sum(
item.size
for item in self.items
)
@classmethod
def from_stream(cls, repo: Repository, stream: BinaryIO) -> Self:
tree = Tree(repo, [])
while (item := TreeItem.from_stream(stream, repo._cas._digest_size)) is not None:
tree.items.append(item)
return tree
@classmethod
def from_bytes(cls, repo: Repository, data: bytes) -> Self:
stream = BytesIO(data)
return cls.from_stream(repo, stream)
def write(self, stream: BinaryIO):
self.items.sort(key=lambda i: i.name)
for item in self.items:
item.write(stream)
def to_bytes(self) -> bytes:
stream = BytesIO()
self.write(stream)
return stream.getvalue()
@dataclass(frozen=True, order=True, slots=True)
class TreeObject(ObjectInfo):
tree: Tree
@property
def total_size(self) -> int:
return self.size + self.tree.total_size
@dataclass
class TreeItem:
digest: Digest
object_type: bytes
size: int
permissions: int
modification_timestamp_us: int
name: str
def __init__(
self,
digest: Digest,
object_type: bytes,
size: int,
permissions: int,
modification_timestamp_us: int,
name: str,
):
if "/\\" in name:
raise ValueError(f"invalid tree item name {name}")
self.digest = digest
self.object_type = object_type
self.size = size
self.permissions = permissions
self.modification_timestamp_us = modification_timestamp_us
self.name = name
@property
def modification_time(self) -> DateTime:
return time_from_timestamp_us(self.modification_timestamp_us)
@modification_time.setter
def modification_time(self, time: DateTime):
self.modification_timestamp_us = timestamp_us_from_time(time)
@classmethod
def from_stream(cls, stream: BinaryIO, digest_size: int) -> Self | None:
digest_bytes = read_exact_or_eof(stream, digest_size)
if digest_bytes is None:
return None
return TreeItem(
digest = Digest(digest_bytes),
object_type = read_exact(stream, 4),
size = int.from_bytes(read_exact(stream, 8)),
permissions = int.from_bytes(read_exact(stream, 2)),
modification_timestamp_us = int.from_bytes(read_exact(stream, 8), signed=True),
name = read_exact(stream, int.from_bytes(read_exact(stream, 2))).decode("utf-8"),
)
def write(self, stream: BinaryIO):
stream.write(self.digest.digest)
stream.write(self.object_type)
stream.write(self.size.to_bytes(8))
stream.write(self.permissions.to_bytes(2))
stream.write(self.modification_timestamp_us.to_bytes(8, signed=True))
name_bytes = self.name.encode("utf-8")
stream.write(len(name_bytes).to_bytes(2))
stream.write(name_bytes)
@dataclass
class Snapshot:
repo: Repository
tree_digest: Digest
parents: list[Digest]
repo_name: str
timestamp_us: int
def __post_init__(self):
assert len(self.parents) < 256
@property
def time(self) -> DateTime:
return time_from_timestamp_us(self.timestamp_us)
@time.setter
def time(self, time: DateTime):
self.timestamp_us = timestamp_us_from_time(time)
@classmethod
def from_stream(cls, repo: Repository, stream: BinaryIO) -> Self:
return Snapshot(
repo = repo,
tree_digest = Digest(read_exact(stream, repo._cas._digest_size)),
parents = [
Digest(read_exact(stream, repo._cas._digest_size))
for _ in range(int.from_bytes(read_exact(stream, 1)))
],
repo_name = read_exact(stream, int.from_bytes(read_exact(stream, 2))).decode("utf-8"),
timestamp_us = int.from_bytes(read_exact(stream, 8), signed=True),
)
@classmethod
def from_bytes(cls, repo: Repository, data: bytes) -> Self:
stream = BytesIO(data)
return cls.from_stream(repo, stream)
def write(self, stream: BinaryIO):
assert len(self.parents) < 256
stream.write(self.tree_digest.digest)
stream.write(len(self.parents).to_bytes(1))
for parent in self.parents:
stream.write(parent.digest)
repo_name_bytes = self.repo_name.encode("utf-8")
stream.write(len(repo_name_bytes).to_bytes(2))
stream.write(repo_name_bytes)
stream.write(self.timestamp_us.to_bytes(8, signed=True))
def to_bytes(self) -> bytes:
stream = BytesIO()
self.write(stream)
return stream.getvalue()
@dataclass(frozen=True, order=True, slots=True)
class SnapshotObject(ObjectInfo):
snapshot: Snapshot