diff --git a/Cargo.lock b/Cargo.lock index 548a82d..0bc37d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -931,7 +931,7 @@ dependencies = [ [[package]] name = "decoder" version = "0.1.1" -source = "git+https://github.com/semiotic-ai/flat-files-decoder.git?tag=v0.2.0#7a30d1d52b80a62ef446bf53dc3daf490c228d61" +source = "git+https://github.com/semiotic-ai/flat-files-decoder.git?branch=main#780707ed7dc2d0c1bd4d9f5fbfbb3cdaeb29ae98" dependencies = [ "anyhow", "bincode", @@ -1506,7 +1506,9 @@ checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" name = "flat_head" version = "0.1.0" dependencies = [ + "anyhow", "bincode", + "bytes", "clap", "decoder", "dotenv", @@ -1516,9 +1518,11 @@ dependencies = [ "log", "object_store", "sf-protos", + "thiserror", "tokio", "tree_hash", "trin-validation", + "url", "zstd 0.13.0", ] @@ -1964,7 +1968,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.5.5", + "socket2 0.4.10", "tokio", "tower-service", "tracing", diff --git a/Cargo.toml b/Cargo.toml index e7dbc58..a45bf5a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,17 +6,21 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +anyhow = "1" +bytes = "1.5" clap = { version = "4.3.21", features = ["derive"] } bincode = "1.3.3" tokio = { version = "1.0", features = ["full"] } futures = "0.3" env_logger = "0.11.2" header_accumulator = { git = "https://git@github.com/semiotic-ai/header_accumulator.git", tag = "v0.1.1-alpha" } -decoder = { git = "https://github.com/semiotic-ai/flat-files-decoder.git", tag = "v0.2.0"} +decoder = { git = "https://github.com/semiotic-ai/flat-files-decoder.git", branch = "main"} sf-protos = { git = "https://github.com/semiotic-ai/sf-protos.git", version = "0.1.0"} trin-validation = { git = "https://github.com/ethereum/trin.git", version = "0.1.0" } tree_hash = "0.5.2" +thiserror = "1" object_store = { version = "0.9.0", features = ["gcp", "http", "aws"] } log = "0.4.20" dotenv = "0.15.0" +url = "2.5" zstd = "0.13.0" diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 0000000..00d3c5c --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,3 @@ +[toolchain] +channel = "1.75" +components = [ "rustfmt" ] diff --git a/src/bin/fetch-webdav.rs b/src/bin/fetch-webdav.rs index 7cf2d69..9630a7c 100644 --- a/src/bin/fetch-webdav.rs +++ b/src/bin/fetch-webdav.rs @@ -51,7 +51,7 @@ async fn main() { let mut headers: Vec = Vec::new(); // Use `as_ref` to get a &[u8] from `bytes` and pass it to `handle_buf` - match handle_buf(bytes.as_ref()) { + match handle_buf(bytes.as_ref(), Some(false)) { Ok(blocks) => { let (successful_headers, _): (Vec<_>, Vec<_>) = blocks .iter() diff --git a/src/era_verifier.rs b/src/era_verifier.rs index 5226547..c38aa83 100644 --- a/src/era_verifier.rs +++ b/src/era_verifier.rs @@ -1,23 +1,25 @@ -use decoder::decode_flat_files; +use futures::stream::{FuturesOrdered, StreamExt}; +use header_accumulator::era_validator::era_validate; use header_accumulator::types::ExtHeaderRecord; -use header_accumulator::{era_validator::era_validate, errors::EraValidateError}; use sf_protos::ethereum::r#type::v2::Block; + +use crate::store; pub const MAX_EPOCH_SIZE: usize = 8192; pub const FINAL_EPOCH: usize = 1896; pub const MERGE_BLOCK: usize = 15537394; /// verifies flat flies stored in directory against a header accumulator /// -pub fn verify_eras( - directory: &String, +pub async fn verify_eras( + store_url: &String, master_acc_file: Option<&String>, start_epoch: usize, end_epoch: Option, decompress: Option, -) -> Result, EraValidateError> { +) -> Result, anyhow::Error> { let mut validated_epochs = Vec::new(); for epoch in start_epoch..=end_epoch.unwrap_or(start_epoch + 1) { - let blocks = get_blocks_from_dir(epoch, directory, decompress)?; + let blocks = get_blocks_from_dir(epoch, store_url, decompress).await?; let (successful_headers, _): (Vec<_>, Vec<_>) = blocks .iter() .cloned() @@ -46,15 +48,16 @@ pub fn verify_eras( Ok(validated_epochs) } -fn get_blocks_from_dir( +async fn get_blocks_from_dir( epoch: usize, - directory: &String, + store_url: &String, decompress: Option, -) -> Result, EraValidateError> { +) -> Result, anyhow::Error> { let start_100_block = epoch * MAX_EPOCH_SIZE; let end_100_block = (epoch + 1) * MAX_EPOCH_SIZE; - let mut blocks = extract_100s_blocks(directory, start_100_block, end_100_block, decompress)?; + let mut blocks = + extract_100s_blocks(store_url, start_100_block, end_100_block, decompress).await?; if epoch < FINAL_EPOCH { blocks = blocks[0..MAX_EPOCH_SIZE].to_vec(); @@ -65,37 +68,37 @@ fn get_blocks_from_dir( Ok(blocks) } -fn extract_100s_blocks( - directory: &String, +async fn extract_100s_blocks( + store_url: &String, start_block: usize, end_block: usize, decompress: Option, -) -> Result, EraValidateError> { +) -> Result, anyhow::Error> { // Flat files are stored in 100 block files // So we need to find the 100 block file that contains the start block and the 100 block file that contains the end block let start_100_block = (start_block / 100) * 100; let end_100_block = (((end_block as f32) / 100.0).ceil() as usize) * 100; - let mut blocks: Vec = Vec::new(); + let zst_extension = if decompress.unwrap() { ".zst" } else { "" }; + let blocks_store = store::new(store_url).expect("failed to create blocks store"); + + let mut futs = FuturesOrdered::new(); - let mut zst_extension = ""; - if decompress.unwrap() { - zst_extension = ".zst"; - } for block_number in (start_100_block..end_100_block).step_by(100) { - let block_file_name = - directory.to_owned() + &format!("/{:010}.dbin{}", block_number, zst_extension); + let block_file_name = format!("{:010}.dbin{}", block_number, zst_extension); + futs.push_back(blocks_store.read_blocks(block_file_name, store::ReadOptions { decompress })) + } + + let mut all_blocks = Vec::new(); - let decoded_blocks = match decode_flat_files(block_file_name, None, None, decompress) { - Ok(blocks) => blocks, - Err(e) => { - log::error!("Error decoding flat files: {:?}", e); - return Err(EraValidateError::FlatFileDecodeError); - } - }; - blocks.extend(decoded_blocks); + while let Some(res) = futs.next().await { + match res { + Ok(blocks) => all_blocks.extend(blocks), + Err(e) => println!("{:?}", e), + } } // Return only the requested blocks - Ok(blocks[start_block - start_100_block..end_block - start_100_block].to_vec()) + + Ok(all_blocks[start_block - start_100_block..end_block - start_100_block].to_vec()) } diff --git a/src/lib.rs b/src/lib.rs index 3395657..0159aa8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ pub mod era_verifier; pub mod s3; +pub mod store; pub mod stream; pub mod utils; diff --git a/src/main.rs b/src/main.rs index 3b7c1ef..78220d1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,9 +19,9 @@ struct Cli { enum Commands { /// Decode and validates flat files from a directory. EraValidate { - #[clap(short, long)] + #[clap(short = 'b', long)] // directory where flat files are located - dir: String, + store_url: String, #[clap(short, long)] // master accumulator file. default Portal Network file will be used if none provided @@ -41,7 +41,8 @@ enum Commands { }, } -fn main() { +#[tokio::main] +async fn main() { let cli = Cli::parse(); match cli.debug { @@ -55,19 +56,33 @@ fn main() { match &cli.command { Some(Commands::EraValidate { decompress, - dir, + store_url, master_acc_file, start_epoch, end_epoch, }) => { - let result = verify_eras( - dir, + println!( + "Starting era validation {} - {}", + start_epoch, + end_epoch.map(|x| x.to_string()).unwrap_or("".to_string()) + ); + + match verify_eras( + store_url, master_acc_file.as_ref(), *start_epoch, *end_epoch, *decompress, - ); - log::info!("epochs validated: {:?}", result); + ) + .await + { + Ok(result) => { + println!("Epochs validated: {:?}", result); + } + Err(e) => { + log::error!("error: {:#}", e); + } + } } None => {} } diff --git a/src/s3.rs b/src/s3.rs index 03986e4..8cf9e92 100644 --- a/src/s3.rs +++ b/src/s3.rs @@ -58,7 +58,7 @@ pub async fn s3_fetch( let bytes = result.bytes().await.unwrap(); // Use `as_ref` to get a &[u8] from `bytes` and pass it to `handle_buf` - match handle_buf(bytes.as_ref()) { + match handle_buf(bytes.as_ref(), Some(false)) { Ok(blocks) => { let (successful_headers, _): (Vec<_>, Vec<_>) = blocks .iter() diff --git a/src/store.rs b/src/store.rs new file mode 100644 index 0000000..a8e7c13 --- /dev/null +++ b/src/store.rs @@ -0,0 +1,138 @@ +use anyhow::Context; +use bytes::Bytes; +use decoder::handle_buf; +use object_store::{ + gcp::GoogleCloudStorageBuilder, local::LocalFileSystem, path::Path, ObjectStore, +}; +use std::sync::Arc; +use thiserror::Error; +use url::Url; + +use sf_protos::ethereum::r#type::v2::Block; + +pub fn new>(store_url: S) -> Result { + let store_url = store_url.as_ref(); + let url = match Url::parse(store_url) { + Ok(url) => url, + Err(url::ParseError::RelativeUrlWithoutBase) => { + let absolute_path = std::fs::canonicalize(store_url) + .map_err(|e| anyhow::anyhow!("Invalid store URL: {}: {}", store_url, e))?; + + Url::parse(&format!("file://{}", absolute_path.to_string_lossy())) + .with_context(|| format!("Invalid store URL: {}", store_url))? + } + Err(e) => Err(e).with_context(|| format!("Invalid store URL: {}", store_url))?, + }; + + match url.scheme() { + "s3" => { + unimplemented!("s3://... support not implemented yet") + } + "gs" => { + let bucket = url.host_str().ok_or_else(|| anyhow::anyhow!("No bucket"))?; + let path = url.path(); + + let store = GoogleCloudStorageBuilder::new() + .with_bucket_name(bucket.to_string()) + .build()?; + + Ok(Store { + store: Arc::new(store), + base: match path.starts_with("/") { + false => path.to_string(), + true => path[1..].to_string(), + }, + }) + } + "file" => { + let store = LocalFileSystem::new_with_prefix(url.path()).context("new local store")?; + + Ok(Store { + store: Arc::new(store), + base: "".to_string(), + }) + } + _ => Err(anyhow::anyhow!("Unsupported scheme: {}", url.scheme()))?, + } +} + +#[derive(Clone)] +pub struct Store { + store: Arc, + base: String, +} + +impl Store { + pub async fn read_blocks( + &self, + path: String, + options: ReadOptions, + ) -> Result, ReadError> { + let content = self.store.get(&self.join_path(path)).await?; + let bytes: Bytes = content.bytes().await.unwrap(); + + handle_from_bytes(bytes, options.decompress()).await + } + + fn join_path(&self, path: String) -> Path { + Path::from(format!("{}/{}", self.base, path.trim_start_matches('/'))) + } +} + +#[derive(Error, Debug)] +pub enum ReadError { + #[error("Path '{0}' not found")] + NotFound(String), + #[error("Storage error: {0}")] + Storage(#[from] object_store::Error), + #[error("Decode error: {0}")] + DecodeError(String), // Or directly use DecodeError if it implements `std::error::Error` +} + +pub struct ReadOptions { + pub decompress: Option, +} + +impl ReadOptions { + pub fn decompress(&self) -> bool { + self.decompress.unwrap_or(true) + } +} + +async fn handle_from_bytes(bytes: Bytes, decompress: bool) -> Result, ReadError> { + handle_buf(bytes.as_ref(), Some(decompress)).map_err(|e| ReadError::DecodeError(e.to_string())) +} + +// async fn fake_handle_from_stream( +// mut stream: BoxStream<'static, Result>, +// decompress: bool, +// ) -> Result, ReadError> { +// use futures::stream::TryStreamExt; // for `try_next` + +// let mut file = tokio::fs::OpenOptions::new() +// .write(true) +// .create(true) +// .truncate(true) +// .open("/tmp/temp_block.dbin.zst") +// .await +// .expect("demo code, no file would be use when flat_file_decoders will be updated"); + +// while let Some(item) = stream.try_next().await? { +// file.write_all(&item) +// .await +// .expect("demo code, unable to write to temp file"); +// } + +// file.sync_all() +// .await +// .expect("demo code, unable to sync temp file"); +// drop(file); + +// Ok(decode_flat_files( +// "/tmp/temp_block.dbin.zst".to_string(), +// None, +// None, +// Some(decompress), +// ) +// .expect("demo code, deal with error nicely")) +// }