diff --git a/cas-core/src/cas.rs b/cas-core/src/cas.rs
index 089e7de..27750e4 100644
--- a/cas-core/src/cas.rs
+++ b/cas-core/src/cas.rs
@@ -14,10 +14,13 @@
// along with cdb. If not, see .
-use super::error::Result;
+use std::path::Path;
+
+use super::error::{Error, Result};
use super::object_id::ObjectId;
use super::object_type::ObjectType;
use super::object_metadata::ObjectMetadata;
+use super::pipeline::{Reader, Writer};
// pub struct ObjectIdIterator;
@@ -25,16 +28,41 @@ use super::object_metadata::ObjectMetadata;
pub trait Cas {
fn object_id_from_string(&self, hex: &str) -> Result;
- fn object_id_from_partial(&self, hex: &str) -> Result;
fn has_object_id(&self, oid: &ObjectId) -> Result;
- // fn iter_object_id(&self) -> Result;
+ fn iter_object_id<'s>(&'s self) -> Result>>>;
+
+ fn open_object(&self, oid: &ObjectId) -> Result<(ObjectMetadata, Box)>;
+ fn new_writer(&mut self, otype: &ObjectType, size: u64) -> Result>;
+
+ fn read_object(&self, oid: &ObjectId) -> Result<(ObjectMetadata, Vec)> {
+ let (metadata, mut reader) = self.open_object(oid)?;
+ let mut data = Vec::new();
+
+ reader.read_to_end(&mut data).or_else(|err|
+ Err(Error::error(format!("failed to read object {}: {}", oid, err))))?;
+
+ let result_oid = reader.finalize()?;
+ if &result_oid == oid {
+ Ok((metadata, data))
+ }
+ else {
+ Err(Error::error(format!("object id mismatch: requested {}, read {}", oid, result_oid)))
+ }
+ }
+
+ fn write_object(&mut self, otype: &ObjectType, data: &[u8]) -> Result {
+ let mut writer = self.new_writer(otype, data.len() as u64)?;
+ writer.write_all(data).or_else(|err|
+ Err(Error::error(format!("failed to write object: {}", err))))?;
+ writer.finalize()
+ }
- fn read_object(&self, oid: &ObjectId) -> Result<(ObjectMetadata, &dyn std::io::Read)>;
- fn write_object(&mut self, otype: ObjectType, data: &mut dyn std::io::Read) -> Result;
fn remove_object(&mut self, oid: &ObjectId) -> Result<()>;
+}
- fn read_ref(&self, key: &str) -> Result;
- fn write_ref(&mut self, key: &str, value: &ObjectId) -> Result<()>;
- fn remove_ref(&mut self, key: &str) -> Result<()>;
+pub trait RefStore {
+ fn get_ref>(&self, key: P) -> Result;
+ fn set_ref>(&mut self, key: P, value: &ObjectId) -> Result<()>;
+ fn remove_ref>(&mut self, key: P) -> Result<()>;
}
\ No newline at end of file
diff --git a/cas-core/src/error.rs b/cas-core/src/error.rs
index 6f36aa2..9903ecd 100644
--- a/cas-core/src/error.rs
+++ b/cas-core/src/error.rs
@@ -111,6 +111,10 @@ impl Error {
pub fn error>(message: M) -> Box {
Box::new(Error::Error(message.into()))
}
+
+ pub fn err, T>(message: M) -> Result {
+ Err(Self::error(message))
+ }
}
diff --git a/cas-core/src/lib.rs b/cas-core/src/lib.rs
index 1f2e681..9d8bacd 100644
--- a/cas-core/src/lib.rs
+++ b/cas-core/src/lib.rs
@@ -38,7 +38,7 @@ pub use crate::{
object_id::{ObjectId, hex, write_hex},
object_type::{ObjectType},
object_metadata::{ObjectMetadata},
- pipeline::{Pipeline, DefaultPipeline},
+ pipeline::{Pipeline, DefaultPipeline, Reader, Writer, ReadWrapper, WriteWrapper},
cas::{Cas},
};
diff --git a/cas-core/src/object_metadata.rs b/cas-core/src/object_metadata.rs
index aeae1ea..13c741e 100644
--- a/cas-core/src/object_metadata.rs
+++ b/cas-core/src/object_metadata.rs
@@ -20,11 +20,11 @@ use super::object_type::{ObjectType};
#[derive(Clone, Eq, PartialEq)]
pub struct ObjectMetadata {
otype: ObjectType,
- size: usize,
+ size: u64,
}
impl ObjectMetadata {
- pub fn new(otype: ObjectType, size: usize) -> Self {
+ pub fn new(otype: ObjectType, size: u64) -> Self {
Self {
otype,
size,
@@ -35,7 +35,7 @@ impl ObjectMetadata {
&self.otype
}
- pub fn size(&self) -> usize {
+ pub fn size(&self) -> u64 {
self.size
}
}
diff --git a/cas-core/src/pipeline.rs b/cas-core/src/pipeline.rs
index 487720c..744a14f 100644
--- a/cas-core/src/pipeline.rs
+++ b/cas-core/src/pipeline.rs
@@ -19,29 +19,13 @@ use super::error::*;
use super::object_id::ObjectId;
-pub trait Reader {
- fn read(&mut self, buf: &mut [u8]) -> Result;
+pub trait Reader: std::io::Read {
fn finalize(self: Box) -> Result;
-
- fn read_all(&mut self) -> Result> {
- let mut buffer = [0u8; 1 << 13];
- let mut data = Vec::new();
- let mut count = usize::MAX;
-
- while count != 0 {
- count = self.read(&mut buffer)?;
- if count != 0 {
- data.extend_from_slice(&buffer[..count]);
- }
- }
-
- Ok(data)
- }
}
-pub trait Writer {
- fn write(&mut self, buf: &[u8]) -> Result<()>;
+pub trait Writer: std::io::Write {
fn finalize(self: Box) -> Result;
+ fn _finalize(self: Box, oid: ObjectId) -> Result;
}
@@ -51,29 +35,57 @@ pub trait Pipeline {
}
-impl Reader for R {
- fn read(&mut self, buf: &mut [u8]) -> Result {
- ::read(self, buf).or_else(|err|
- Err(Error::error(format!("Read error: {}", err)))
- )
+pub struct ReadWrapper {
+ reader: R,
+}
+
+impl ReadWrapper {
+ pub fn new(reader: R) -> Self {
+ Self { reader }
+ }
+}
+
+impl std::io::Read for ReadWrapper {
+ fn read(&mut self, buf: &mut [u8]) -> std::io::Result {
+ self.reader.read(buf)
}
+}
+impl Reader for ReadWrapper {
fn finalize(self: Box) -> Result {
Err(Error::error("reader pipline has no digest step"))
}
}
-impl Writer for W {
- fn write(&mut self, buf: &[u8]) -> Result<()> {
- self.write_all(buf).or_else(|err|
- Err(Error::error(format!("Write error: {}", err)))
- )
+pub struct WriteWrapper {
+ writer: W,
+}
+
+impl WriteWrapper {
+ pub fn new(writer: W) -> Self {
+ Self { writer }
+ }
+}
+
+impl std::io::Write for WriteWrapper {
+ fn write(&mut self, buf: &[u8]) -> std::io::Result {
+ self.writer.write(buf)
}
+ fn flush(&mut self) -> std::io::Result<()> {
+ self.writer.flush()
+ }
+}
+
+impl Writer for WriteWrapper {
fn finalize(self: Box) -> Result {
Err(Error::error("reader pipline has no digest step"))
}
+
+ fn _finalize(self: Box, oid: ObjectId) -> Result {
+ Ok(oid)
+ }
}
@@ -93,8 +105,8 @@ impl<'a> DigestReader<'a> {
}
}
-impl<'a> Reader for DigestReader<'a> {
- fn read(&mut self, buf: &mut [u8]) -> Result {
+impl<'a> std::io::Read for DigestReader<'a> {
+ fn read(&mut self, buf: &mut [u8]) -> std::io::Result {
let count = if let Some(dig) = &mut self.digest {
let count = self.reader.read(buf)?;
dig.update(&buf[..count]);
@@ -112,7 +124,9 @@ impl<'a> Reader for DigestReader<'a> {
Ok(count)
}
+}
+impl<'a> Reader for DigestReader<'a> {
fn finalize(self: Box) -> Result {
self.oid.ok_or_else(|| Error::error("Reader not finalized"))
}
@@ -133,15 +147,26 @@ impl<'a> DigestWriter<'a> {
}
}
-impl<'a> Writer for DigestWriter<'a> {
- fn write(&mut self, buf: &[u8]) -> Result<()> {
- self.digest.update(buf);
- self.writer.write(buf)?;
- Ok(())
+impl<'a> std::io::Write for DigestWriter<'a> {
+ fn write(&mut self, buf: &[u8]) -> std::io::Result {
+ let count = self.writer.write(buf)?;
+ self.digest.update(&buf[..count]);
+ Ok(count)
}
+ fn flush(&mut self) -> std::io::Result<()> {
+ self.writer.flush()
+ }
+}
+
+impl<'a> Writer for DigestWriter<'a> {
fn finalize(self: Box) -> Result {
- Ok(ObjectId::new(&self.digest.finalize()))
+ let oid = ObjectId::new(&self.digest.finalize());
+ self.writer._finalize(oid)
+ }
+
+ fn _finalize(self: Box, _oid: ObjectId) -> Result {
+ Err(Error::error("writer pipeline has several digest steps"))
}
}
@@ -182,9 +207,10 @@ mod tests {
#[test]
fn std_read_as_reader() {
let data = b"Hello World!".to_vec();
- let mut reader: Box = Box::new(data.as_slice());
+ let mut reader: Box = Box::new(ReadWrapper::new(data.as_slice()));
- let data2 = reader.read_all().expect("read failed");
+ let mut data2 = Vec::new();
+ reader.read_to_end(&mut data2).expect("read failed");
let maybe_oid = reader.finalize();
assert_eq!(data2, data);
@@ -197,7 +223,7 @@ mod tests {
let mut buffer = [0u8; 32];
let maybe_oid = {
- let mut writer: Box = Box::new(&mut buffer[..]);
+ let mut writer: Box = Box::new(WriteWrapper::new(&mut buffer[..]));
writer.write(&data).expect("write failed");
writer.finalize()
};
@@ -210,17 +236,20 @@ mod tests {
#[test]
fn digest_reader() {
+ use std::io::Read;
+
let data = b"Hello World!".to_vec();
let oid = ObjectId::from_str(
"7f83b1657ff1fc53b92dc18148a1d65dfc2d4b1fa3d677284addd200126d9069"
).expect("invalid object id");
let mut reader = Box::new(DigestReader::new(
- Box::new(data.as_slice()),
+ Box::new(ReadWrapper::new(data.as_slice())),
Box::new(Sha256::default()),
));
- let data2 = reader.read_all().expect("read failed");
+ let mut data2 = Vec::new();
+ reader.read_to_end(&mut data2).expect("read failed");
let oid2 = reader.finalize().expect("finalize failed");
assert_eq!(data2, data);
@@ -229,6 +258,8 @@ mod tests {
#[test]
fn digest_writer() {
+ use std::io::Write;
+
let data = b"Hello World!".to_vec();
let oid = ObjectId::from_str(
"7f83b1657ff1fc53b92dc18148a1d65dfc2d4b1fa3d677284addd200126d9069"
@@ -237,7 +268,7 @@ mod tests {
let mut buffer = [0u8; 32];
let oid2 = {
let mut writer = Box::new(DigestWriter::new(
- Box::new(&mut buffer[..]),
+ Box::new(WriteWrapper::new(&mut buffer[..])),
Box::new(Sha256::default()),
));
writer.write(&data).expect("write failed");
diff --git a/cas-simple/src/cas.rs b/cas-simple/src/cas.rs
new file mode 100644
index 0000000..9d3714d
--- /dev/null
+++ b/cas-simple/src/cas.rs
@@ -0,0 +1,563 @@
+// This file is part of bsv.
+//
+// bsv is free software: you can redistribute it and/or modify it under the
+// terms of the GNU Affero General Public License as published by the Free
+// Software Foundation, either version 3 of the License, or (at your option)
+// any later version.
+//
+// cdb is distributed in the hope that it will be useful, but WITHOUT ANY
+// WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+// FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for
+// more details.
+//
+// You should have received a copy of the Affero GNU General Public License
+// along with cdb. If not, see .
+
+
+use std::str::FromStr;
+use std::path::{Path, PathBuf};
+
+use digest::DynDigest;
+use toml::Value;
+use cas_core::{
+ Cas, DefaultPipeline, Error, ObjectId, ObjectMetadata, ObjectType,
+ Pipeline, Reader, ReadWrapper, Result, Writer,
+};
+
+use crate::utils::{
+ obj_dir, obj_path, ref_dir, tmp_dir,
+ read_config, write_config,
+ read_metadata, write_metadata,
+ new_digest,
+};
+use crate::wfile::WFile;
+
+
+pub struct SimpleCas {
+ db_path: PathBuf,
+ digest: Box,
+ pipeline: DefaultPipeline,
+ config: Value,
+}
+
+
+impl SimpleCas {
+ pub fn create(db_path: PathBuf, config: Value) -> Result {
+ let digest_id = config["cas"]["digest"].as_str()
+ .ok_or_else(|| Error::error(
+ "mandatory cas.digest value is invalid or missing from config"
+ ))?;
+ let digest = new_digest(digest_id)?;
+ let pipeline = DefaultPipeline::new(digest.box_clone());
+
+ if db_path.exists() {
+ return Err(Error::error(format!(
+ "failed to create SimpleCas: target directory already exists ({})",
+ db_path.to_string_lossy(),
+ )));
+ }
+
+ for path in [
+ &db_path,
+ &obj_dir(&db_path),
+ &ref_dir(&db_path),
+ &tmp_dir(&db_path),
+ ] {
+ std::fs::create_dir(path).or_else(|e|
+ Err(Error::error(format!(
+ "failed to create directory ({}): {}",
+ path.to_string_lossy(), e,
+ )))
+ )?;
+ }
+
+ write_config(&config, &db_path)?;
+
+ Ok(SimpleCas {
+ db_path,
+ digest,
+ pipeline,
+ config,
+ })
+ }
+
+ pub fn open(db_path: PathBuf) -> Result {
+ let config = read_config(&db_path)?;
+
+ let digest_id = config["cas"]["digest"].as_str()
+ .ok_or_else(|| Error::error(
+ "mandatory cas.digest value is invalid or missing from config"
+ ))?;
+ let digest = new_digest(digest_id)?;
+ let pipeline = DefaultPipeline::new(digest.box_clone());
+
+ Ok(SimpleCas {
+ db_path,
+ digest,
+ pipeline,
+ config,
+ })
+ }
+
+ pub fn save_config(&self) -> Result<()> {
+ write_config(&self.config, &self.db_path)
+ }
+}
+
+
+impl Cas for SimpleCas {
+ fn object_id_from_string(&self, hex: &str) -> Result {
+ if hex.len() == self.digest.output_size() * 2 {
+ ObjectId::from_str(hex)
+ }
+ else {
+ Err(Error::error(format!(
+ "invalid object id size: got {}, expected {}",
+ hex.len(), self.digest.output_size() * 2,
+ )))
+ }
+ }
+
+ fn has_object_id(&self, oid: &ObjectId) -> Result {
+ let opath = obj_path(&self.db_path, oid);
+ Ok(opath.is_file())
+ }
+
+ fn iter_object_id<'s>(&'s self) -> Result>>> {
+ Ok(Box::new(ObjectIdIterator::new(self)?))
+ }
+
+
+ fn open_object(&self, oid: &ObjectId) -> Result<(ObjectMetadata, Box)> {
+ let opath = obj_path(&self.db_path, oid);
+ if !opath.is_file() {
+ return Err(Error::error(format!("object not found: {}", oid)));
+ }
+
+ let file = std::fs::File::open(opath).or_else(|err|
+ Err(Error::error(format!("failed to open object {}: {}", oid, err))))?;
+ let wrapper = Box::new(ReadWrapper::new(file));
+ let mut reader = self.pipeline.new_reader(wrapper);
+ let metadata = read_metadata(&mut reader)?;
+
+ Ok((metadata, reader))
+ }
+
+ fn new_writer(&mut self, otype: &ObjectType, size: u64) -> Result> {
+ let file = Box::new(WFile::new(self.db_path.clone())?);
+ let mut writer = self.pipeline.new_writer(file);
+ write_metadata(&mut writer, otype, size)?;
+
+ Ok(writer)
+ }
+
+ fn remove_object(&mut self, oid: &ObjectId) -> Result<()> {
+ let opath = obj_path(&self.db_path, oid);
+ if !opath.is_file() {
+ return Err(Error::error(format!("object not found: {}", oid)));
+ }
+ std::fs::remove_file(opath).or_else(|err|
+ Err(Error::error(format!("failed to remove object {}: {}", oid, err))))?;
+ Ok(())
+ }
+}
+
+
+pub struct ObjectIdIterator {
+ root_dirs: Vec,
+ inner_dirs: Vec,
+ objects: Vec,
+ root_index: usize,
+ inner_index: usize,
+ object_index: usize,
+}
+
+impl ObjectIdIterator {
+ fn new(cas: &SimpleCas) -> Result {
+ let root_dirs = read_dir(&obj_dir(&cas.db_path))?;
+ let mut it = Self {
+ root_dirs,
+ inner_dirs: Vec::new(),
+ objects: Vec::new(),
+ root_index: 0,
+ inner_index: 0,
+ object_index: 0,
+ };
+
+ it.fetch_inner_dirs()?;
+ if !it.is_item_valid() {
+ it.next_valid_item()?;
+ }
+
+ Ok(it)
+ }
+
+ fn get_object_id(&mut self) -> Result {
+ let path = &self.objects[self.object_index];
+
+ if !path.is_file() {
+ return Error::err(format!("item in object database is not a file: {:?}", path));
+ }
+
+ let id = path.ancestors()
+ .take(3)
+ .collect::>()
+ .iter()
+ .rev()
+ .map(|p| p.file_name()?.to_str())
+ .collect::