Browse Source

Blob, Tree & Snapshot objects.

master
Draklaw 2 years ago
parent
commit
67d15f989a
  1. 1
      pyproject.toml
  2. 353
      src/bsv/repository.py
  3. 2
      src/bsv/simple_cas/__init__.py
  4. 81
      src/bsv/simple_cas/cas.py
  5. 16
      src/bsv/simple_cas/util.py
  6. 111
      tests/test_repository.py
  7. 4
      tests/test_simple_cas.py

1
pyproject.toml

@ -12,6 +12,7 @@ classifiers = [
]
dynamic = ["version"]
dependencies = [
"fastcdc",
"tomlkit",
]

353
src/bsv/repository.py

@ -15,20 +15,43 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from dataclasses import dataclass
from datetime import UTC, datetime as DateTime, timedelta as TimeDelta
import hashlib
from io import BytesIO
from pathlib import Path, PurePosixPath
import platform
import tomllib
from typing import Any
from typing import Any, BinaryIO, Callable, Type
from fastcdc import fastcdc
import tomlkit
from bsv import __version__
from bsv.simple_cas import SimpleCas
from bsv.simple_cas.cas import ConfigError, Digest, SimpleCas
from bsv.simple_cas.util import Hash, read_exact, read_exact_or_eof
DEFAULT_MIN_CHUNK_SIZE = 1 << 12
DEFAULT_AVG_CHUNK_SIZE = 1 << 16
DEFAULT_MAX_CHUNK_SIZE = 1 << 20
class Repository:
_path: Path
_name: str
_cas: SimpleCas
_min_chunk_size: int = DEFAULT_MIN_CHUNK_SIZE
_avg_chunk_size: int = DEFAULT_AVG_CHUNK_SIZE
_max_chunk_size: int = DEFAULT_MAX_CHUNK_SIZE
_path_map: list[PathPair]
# _remotes: list[object]
_context_depth: int = 0
def __init__(self, path: Path):
self._path = path
@ -39,6 +62,15 @@ class Repository:
self._name = bsv.get("name") or platform.node()
self._cas = make_cas(
bsv.get("cas"),
self._path,
lambda: hashlib.new(bsv.get("hash")), # type: ignore
)
self._min_chunk_size = bsv.get("min_chunk_size")
self._avg_chunk_size = bsv.get("avg_chunk_size")
self._max_chunk_size = bsv.get("max_chunk_size")
self._path_map = [
PathPair.from_obj(pair)
for pair in bsv.get("path_map", [])
@ -60,6 +92,318 @@ class Repository:
def path_map(self) -> list[PathPair]:
return list(self._path_map)
def get_blob(self, digest: Digest) -> Blob:
with self:
return self._read(digest, object_type=b"blob", cls=Blob) # type: ignore
def add_blob(self, stream: BinaryIO) -> Digest:
with self:
return self._write(b"blob", stream)
def get_tree(self, digest: Digest) -> Tree:
with self:
return Tree.from_bytes(self, self._cas.read(digest, object_type=b"tree").data)
def add_tree(self, tree: Tree) -> Digest:
with self:
return self._cas.write(b"tree", tree.to_bytes())
def get_snapshot(self, digest: Digest) -> Snapshot:
with self:
return Snapshot.from_bytes(self, self._cas.read(digest, object_type=b"snap").data)
def add_snapshot(self, snapshot: Snapshot) -> Digest:
with self:
return self._cas.write(b"snap", snapshot.to_bytes())
def _read(self, digest: Digest, object_type: bytes, cls: Type[ChunkedObject]) -> ChunkedObject:
obj = self._cas.read(digest, object_type=object_type)
stream = BytesIO(obj.data)
return cls.from_stream(self, stream, digest_size=self._cas._digest_size)
def _write(self, object_type: bytes, stream: BinaryIO) -> Digest:
out = BytesIO()
size = 0
for chunk in fastcdc(
stream,
min_size = self._min_chunk_size,
avg_size = self._avg_chunk_size,
max_size = self._max_chunk_size,
fat = True,
):
size += chunk.length
digest = self._cas.write(b"chnk", chunk.data)
out.write(digest.digest)
out.write(chunk.length.to_bytes(4))
return self._cas.write(object_type, size.to_bytes(8) + out.getvalue())
def __enter__(self):
if self._context_depth == 0:
self._cas.__enter__()
self._context_depth += 1
return self
def __exit__(self, exc_type, exc_value, traceback):
self._context_depth -= 1
if self._context_depth == 0:
return self._cas.__exit__(exc_type, exc_value, traceback)
def create_repository(
destination: Path,
name: str,
cas: str = "simple",
hash: str = "sha256",
min_chunk_size: int = DEFAULT_MIN_CHUNK_SIZE,
avg_chunk_size: int = DEFAULT_AVG_CHUNK_SIZE,
max_chunk_size: int = DEFAULT_MAX_CHUNK_SIZE,
):
from datetime import datetime as DateTime
from os import getlogin
if not name:
raise RuntimeError("repository name cannot be empty")
if not destination.parent.exists():
raise RuntimeError(f"destination directory {destination.parent} does not exists")
if destination.exists() and not destination.is_dir():
raise RuntimeError(f"destination {destination} exists but is not a directory")
if destination.exists() and len(list(destination.iterdir())):
raise RuntimeError(f"destination directory {destination} is not empty")
try:
destination.mkdir(exist_ok=True)
except:
raise RuntimeError(f"failed to create destination directory {destination}")
bsv_table = tomlkit.table()
bsv_table.add(tomlkit.comment("Name of the repository."))
bsv_table.add(tomlkit.comment("Ideally, this should be unique among all connected repositories."))
bsv_table.add("name", name)
bsv_table.add(tomlkit.nl())
bsv_table.add(tomlkit.comment("Mapping between bsv tree and the actual filesystem."))
bsv_table.add("path_map", tomlkit.array())
bsv_table.add("cas", cas)
bsv_table.add("hash", hash)
bsv_table.add("min_chunk_size", min_chunk_size)
bsv_table.add("avg_chunk_size", avg_chunk_size)
bsv_table.add("max_chunk_size", max_chunk_size)
doc = tomlkit.document()
doc.add(tomlkit.comment("bsv repository configuration"))
doc.add(tomlkit.comment(f"Created by {getlogin()} on {DateTime.now().isoformat()}."))
doc.add(tomlkit.nl())
doc.add("bsv", bsv_table)
config_path = destination / "bsv_config.toml"
try:
stream = config_path.open("w", encoding="utf-8")
except:
raise RuntimeError("failed to open configuration file {config_path}")
with stream:
tomlkit.dump(doc, stream)
return Repository(destination)
def make_cas(cas_name: str, path: Path, hash_factory: Callable[[], Hash]) -> SimpleCas:
if cas_name == "simple":
return SimpleCas(path, hash_factory)
raise ConfigError(f"unknown cas name {cas_name}")
@dataclass
class ChunkedObject:
repo: Repository
size: int
chunks: list[Chunk]
@classmethod
def from_stream(cls, repo: Repository, stream: BinaryIO, digest_size: int) -> ChunkedObject:
self = cls(
repo = repo,
size = int.from_bytes(read_exact(stream, 8)),
chunks = [],
)
while (chunk := Chunk.from_stream(stream, digest_size)) is not None:
self.chunks.append(chunk)
return self
@dataclass
class Blob(ChunkedObject):
_chunk_index: int = 0
_chunk_data: bytes = b""
def read(self, num_bytes: int = -1) -> bytes:
parts = [self._chunk_data]
size = len(parts[-1])
while (num_bytes < 0 or size < num_bytes) and self._chunk_index < len(self.chunks):
parts.append(self.read1())
size += len(parts[-1])
if num_bytes >= 0:
self._chunk_data = parts[-1][num_bytes - size:]
else:
self._chunk_data = b""
return b"".join(parts)
def read1(self) -> bytes:
if self._chunk_index == len(self.chunks):
return b""
object = self.repo._cas.read(self.chunks[self._chunk_index].digest, object_type=b"chnk")
self._chunk_index += 1
return object.data
@dataclass
class Tree:
repo: Repository
items: list[TreeItem]
@classmethod
def from_stream(cls, repo: Repository, stream: BinaryIO) -> Tree:
tree = Tree(repo, [])
while (item := TreeItem.from_stream(stream, repo._cas._digest_size)) is not None:
tree.items.append(item)
return tree
@classmethod
def from_bytes(cls, repo: Repository, data: bytes) -> Tree:
stream = BytesIO(data)
return cls.from_stream(repo, stream)
def write(self, stream: BinaryIO):
self.items.sort(key=lambda i: i.name)
for item in self.items:
item.write(stream)
def to_bytes(self) -> bytes:
stream = BytesIO()
self.write(stream)
return stream.getvalue()
EPOCH = DateTime(1970, 1, 1, tzinfo=UTC)
@dataclass
class TreeItem:
name: str
digest: Digest
permissions: int
creation_timestamp: int
modification_timestamp: int
def __init__(
self,
name: str,
digest: Digest,
permissions: int,
creation_timestamp: int,
modification_timestamp: int,
):
if "/\\" in name:
raise ValueError(f"invalid tree item name {name}")
self.name = name
self.digest = digest
self.permissions = permissions
self.creation_timestamp = creation_timestamp
self.modification_timestamp = modification_timestamp
@property
def creation_time(self) -> DateTime:
return time_from_timestamp(self.creation_timestamp)
@creation_time.setter
def creation_time(self, time: DateTime):
self.creation_timestamp = timestamp_from_time(time)
@property
def modification_time(self) -> DateTime:
return time_from_timestamp(self.modification_timestamp)
@modification_time.setter
def modification_time(self, time: DateTime):
self.modification_timestamp = timestamp_from_time(time)
@classmethod
def from_stream(cls, stream: BinaryIO, digest_size: int) -> TreeItem | None:
digest_bytes = read_exact_or_eof(stream, digest_size)
if digest_bytes is None:
return None
return TreeItem(
digest = Digest(digest_bytes),
permissions = int.from_bytes(read_exact(stream, 2)),
creation_timestamp = int.from_bytes(read_exact(stream, 8), signed=True),
modification_timestamp = int.from_bytes(read_exact(stream, 8), signed=True),
name = read_exact(stream, int.from_bytes(read_exact(stream, 2))).decode("utf-8"),
)
def write(self, stream: BinaryIO):
stream.write(self.digest.digest)
stream.write(self.permissions.to_bytes(2))
stream.write(self.creation_timestamp.to_bytes(8, signed=True))
stream.write(self.modification_timestamp.to_bytes(8, signed=True))
name_bytes = self.name.encode("utf-8")
stream.write(len(name_bytes).to_bytes(2))
stream.write(name_bytes)
@dataclass
class Snapshot:
repo: Repository
tree_digest: Digest
repo_name: str
timestamp: int
@property
def time(self) -> DateTime:
return time_from_timestamp(self.timestamp)
@time.setter
def time(self, time: DateTime):
self.timestamp = timestamp_from_time(time)
@classmethod
def from_stream(cls, repo: Repository, stream: BinaryIO) -> Snapshot:
return Snapshot(
repo = repo,
tree_digest = Digest(read_exact(stream, repo._cas._digest_size)),
repo_name = read_exact(stream, int.from_bytes(read_exact(stream, 2))).decode("utf-8"),
timestamp = int.from_bytes(read_exact(stream, 8), signed=True),
)
@classmethod
def from_bytes(cls, repo: Repository, data: bytes) -> Snapshot:
stream = BytesIO(data)
return cls.from_stream(repo, stream)
def write(self, stream: BinaryIO):
stream.write(self.tree_digest.digest)
repo_name_bytes = self.repo_name.encode("utf-8")
stream.write(len(repo_name_bytes).to_bytes(2))
stream.write(repo_name_bytes)
stream.write(self.timestamp.to_bytes(8, signed=True))
def to_bytes(self) -> bytes:
stream = BytesIO()
self.write(stream)
return stream.getvalue()
@dataclass
class Chunk:
digest: Digest
size: int
@classmethod
def from_stream(cls, stream: BinaryIO, digest_size: int) -> Chunk | None:
digest_bytes = read_exact_or_eof(stream, digest_size)
if digest_bytes is None:
return None
digest = Digest(digest_bytes)
return cls(
digest = digest,
size = int.from_bytes(read_exact(stream, 4)),
)
class PathPair:
bsv: PurePosixPath
@ -84,3 +428,10 @@ class PathPair:
def __lt__(self, rhs: PathPair) -> bool:
return self.bsv < rhs.bsv
def time_from_timestamp(timestamp: int) -> DateTime:
return EPOCH + TimeDelta(microseconds=timestamp)
def timestamp_from_time(time: DateTime) -> int:
return (time.astimezone(UTC) - EPOCH) // TimeDelta(microseconds=1)

2
src/bsv/simple_cas/__init__.py

@ -16,4 +16,4 @@
from __future__ import annotations
from bsv.simple_cas.cas import SimpleCas as Cas
from bsv.simple_cas.cas import SimpleCas

81
src/bsv/simple_cas/cas.py

@ -18,9 +18,22 @@ from __future__ import annotations
from dataclasses import dataclass
import hashlib
from pathlib import Path
from typing import Any, BinaryIO, Callable, Optional
from typing import Any, BinaryIO, Callable, Iterator
from bsv.simple_cas.util import Hash
from bsv.simple_cas.util import Hash, read_exact_or_eof
class BsvError(RuntimeError):
pass
class NotFound(BsvError):
pass
class UnexpectedObjectType(BsvError):
pass
class ConfigError(BsvError):
pass
class SimpleCas:
@ -28,7 +41,7 @@ class SimpleCas:
_hash_factory: Callable[[], Hash]
_digest_size: int
_index: dict[bytes, IndexItem]
_index: dict[Digest, IndexItem]
_is_inside_context: bool = False
@ -41,9 +54,10 @@ class SimpleCas:
if (self._root_dir / "cas.idx").exists():
with (self._root_dir / "cas.idx").open("rb") as stream:
while True:
digest = stream.read(self._digest_size)
if not digest:
digest_bytes = read_exact_or_eof(stream, self._digest_size)
if not digest_bytes:
break
digest = Digest(digest_bytes)
object_type = stream.read(4)
offset = int.from_bytes(stream.read(4))
size = int.from_bytes(stream.read(4))
@ -67,18 +81,24 @@ class SimpleCas:
def __len__(self) -> int:
return len(self._index)
def __contains__(self, digest: bytes) -> bool:
assert len(digest) == self._digest_size
def __contains__(self, digest: Digest) -> bool:
assert len(digest.digest) == self._digest_size
return digest in self._index
def read(self, digest: bytes) -> Optional[Object]:
def __iter__(self) -> Iterator[ObjectInfo]:
for digest, item in self._index.items():
yield ObjectInfo(digest, item.object_type, item.size)
def read(self, digest: Digest, object_type: bytes | None=None) -> Object:
item = self._index.get(digest)
if item is None:
return None
raise NotFound(f"object {digest} not found")
if object_type is not None and item.object_type != object_type:
raise UnexpectedObjectType(f"expected object of type {object_type.decode()}, got {item.object_type.decode()}")
with (self._root_dir / "cas.dat").open("rb") as stream:
stream.seek(item.offset)
assert stream.read(self._digest_size) == digest
assert stream.read(self._digest_size) == digest.digest
object_type = stream.read(4)
assert object_type == item.object_type
size = int.from_bytes(stream.read(4))
@ -87,7 +107,7 @@ class SimpleCas:
return Object(object_type, data)
def write(self, object_type: bytes, data: bytes) -> bytes:
def write(self, object_type: bytes, data: bytes) -> Digest:
assert len(object_type) == 4
assert len(data) < 2**32
@ -97,38 +117,38 @@ class SimpleCas:
hash.update(len(data).to_bytes(4))
hash.update(b"\0")
hash.update(data)
digest = hash.digest()
digest = Digest(hash.digest())
if digest not in self:
with self._open_writer(digest, object_type, len(data)) as out:
out.write(digest)
out.write(digest.digest)
out.write(object_type)
out.write(len(data).to_bytes(4))
out.write(data)
return digest
def get_ref(self, key: str) -> bytes | None:
def get_ref(self, key: str) -> Digest | None:
ref_path = self._ref_path(key)
if not ref_path.is_file():
return None
hex = ref_path.read_text().strip()
if len(hex) != 2 * self._digest_size:
return None
return bytes.fromhex(hex)
return Digest(bytes.fromhex(hex))
def set_ref(self, key: str, digest: bytes):
def set_ref(self, key: str, digest: Digest):
ref_path = self._ref_path(key)
ref_path.parent.mkdir(parents=True, exist_ok=True)
ref_path.write_text(digest.hex())
ref_path.write_text(str(digest))
def _open_writer(self, digest: bytes, object_type: bytes, size: int) -> BinaryIO:
def _open_writer(self, digest: Digest, object_type: bytes, size: int) -> BinaryIO:
dat_file = (self._root_dir / "cas.dat").open("ab")
offset = dat_file.tell()
self._index[digest] = IndexItem(object_type, offset, size)
with (self._root_dir / "cas.idx").open("ab") as idx_file:
idx_file.write(digest)
idx_file.write(digest.digest)
idx_file.write(object_type)
idx_file.write(offset.to_bytes(4))
idx_file.write(size.to_bytes(4))
@ -144,13 +164,36 @@ class SimpleCas:
return self._root_dir / "refs" / key_path
@dataclass(frozen=True, order=True, slots=True)
class Digest:
digest: bytes
def __repr__(self) -> str:
return self.digest.hex()
@dataclass
class Object:
object_type: bytes
data: bytes
def __repr__(self) -> str:
return f"<Object {self.object_type.decode()}: {len(self.data)}B>"
@dataclass
class IndexItem:
object_type: bytes
offset: int
size: int
def __repr__(self) -> str:
return f"<IndexItem {self.object_type.decode()}: {self.offset}B +{self.size}B>"
@dataclass
class ObjectInfo:
digest: Digest
object_type: bytes
size: int
def __repr__(self) -> str:
return f"<ObjectInfo {self.digest} {self.object_type.decode()} {self.size}B>"

16
src/bsv/simple_cas/util.py

@ -16,6 +16,22 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import BinaryIO
def read_exact(stream: BinaryIO, num_bytes: int) -> bytes:
data = stream.read(num_bytes)
if len(data) != num_bytes:
raise IOError(f"expected {num_bytes} bytes, got {len(data)}")
return data
def read_exact_or_eof(stream: BinaryIO, num_bytes: int) -> bytes | None:
data = stream.read(num_bytes)
if not data:
return None
if len(data) != num_bytes:
raise IOError(f"expected {num_bytes} bytes, got {len(data)}")
return data
class Hash(ABC):

111
tests/test_repository.py

@ -0,0 +1,111 @@
# bsv - Backup, Synchronization, Versioning
# Copyright (C) 2023 Simon Boyé
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from datetime import UTC, datetime
from io import BytesIO
from pathlib import Path
from random import randbytes
from typing import Iterator
import pytest
from tempfile import TemporaryDirectory
from bsv.repository import Repository, Snapshot, Tree, TreeItem, create_repository, timestamp_from_time
from bsv.simple_cas.cas import Digest
@pytest.fixture
def tmp_dir():
with TemporaryDirectory(prefix="simple_cas_") as tmp_dir:
yield Path(tmp_dir)
@pytest.fixture
def repo(tmp_dir):
return create_repository(
tmp_dir / "bsv",
"test_repo",
)
def test_read_write_blob(tmp_dir: Path, repo: Repository):
path = tmp_dir / "test.dat"
make_random_file(path, 1 << 20)
with path.open("rb") as stream:
digest = repo.add_blob(stream)
blob = repo.get_blob(digest)
data = blob.read()
with path.open("rb") as stream:
assert data == stream.read()
def test_read_write_tree(repo: Repository):
now = datetime.now(UTC)
tree = Tree(
repo,
[
TreeItem(
"xyz",
Digest(bytes([42]) * repo._cas._digest_size),
0o744,
creation_timestamp = timestamp_from_time(now),
modification_timestamp = timestamp_from_time(now),
),
TreeItem(
"foobar",
Digest(bytes([123]) * repo._cas._digest_size),
0o777,
creation_timestamp = timestamp_from_time(now),
modification_timestamp = timestamp_from_time(now),
),
]
)
assert Tree.from_bytes(repo, tree.to_bytes()) == tree
digest = repo.add_tree(tree)
assert repo.get_tree(digest) == tree
def test_read_write_snapshot(repo: Repository):
snapshot = Snapshot(
repo = repo,
tree_digest = Digest(bytes([42]) * repo._cas._digest_size),
repo_name = "test_repo",
timestamp = timestamp_from_time(datetime.now()),
)
assert Snapshot.from_bytes(repo, snapshot.to_bytes()) == snapshot
digest = repo.add_snapshot(snapshot)
assert repo.get_snapshot(digest) == snapshot
def make_random_file(path: Path, size: int):
with path.open("wb") as stream:
for chunk_size in iter_chunks(size):
stream.write(randbytes(chunk_size))
def iter_chunks(size: int, chunk_size: int=1 << 16) -> Iterator[int]:
num_full_chunks = (size - 1) // chunk_size
for _ in range(num_full_chunks):
yield chunk_size
offset = num_full_chunks * chunk_size
if offset != size:
yield size - offset

4
tests/test_simple_cas.py

@ -20,7 +20,7 @@ from pathlib import Path
from tempfile import TemporaryDirectory
import pytest
from bsv.simple_cas.cas import SimpleCas
from bsv.simple_cas.cas import Digest, SimpleCas
@pytest.fixture
@ -77,7 +77,7 @@ def test_simple_cas(tmp_dir: Path):
def test_refs(cas: SimpleCas):
digest = bytes([42] * cas._digest_size)
digest = Digest(bytes([42] * cas._digest_size))
assert cas.get_ref("foo/bar") is None
cas.set_ref("foo/bar", digest)
assert cas.get_ref("foo/bar") == digest

Loading…
Cancel
Save