From 98d31a7d73021c3c4344914066aebed32511aaab Mon Sep 17 00:00:00 2001 From: Matthieu Vachon Date: Thu, 21 Mar 2024 17:05:01 -0400 Subject: [PATCH 1/7] Added Rust toolchain file to project so version to use is convey --- rust-toolchain.toml | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 rust-toolchain.toml diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 0000000..00d3c5c --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,3 @@ +[toolchain] +channel = "1.75" +components = [ "rustfmt" ] From 0406bbe81174893b19ff6b8743a7fa8c46387ff7 Mon Sep 17 00:00:00 2001 From: Matthieu Vachon Date: Thu, 21 Mar 2024 22:59:20 -0400 Subject: [PATCH 2/7] Show case of a possible store abstraction around `url` --- Cargo.lock | 4 +++ Cargo.toml | 4 +++ src/era_verifier.rs | 41 ++++++++++++--------- src/lib.rs | 1 + src/main.rs | 14 ++++---- src/store.rs | 88 +++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 129 insertions(+), 23 deletions(-) create mode 100644 src/store.rs diff --git a/Cargo.lock b/Cargo.lock index 548a82d..cf58598 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1506,7 +1506,9 @@ checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" name = "flat_head" version = "0.1.0" dependencies = [ + "anyhow", "bincode", + "bytes", "clap", "decoder", "dotenv", @@ -1516,9 +1518,11 @@ dependencies = [ "log", "object_store", "sf-protos", + "thiserror", "tokio", "tree_hash", "trin-validation", + "url", "zstd 0.13.0", ] diff --git a/Cargo.toml b/Cargo.toml index e7dbc58..fb52634 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,8 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +anyhow = "1" +bytes = "1.5" clap = { version = "4.3.21", features = ["derive"] } bincode = "1.3.3" tokio = { version = "1.0", features = ["full"] } @@ -16,7 +18,9 @@ decoder = { git = "https://github.com/semiotic-ai/flat-files-decoder.git", tag = sf-protos = { git = "https://github.com/semiotic-ai/sf-protos.git", version = "0.1.0"} trin-validation = { git = "https://github.com/ethereum/trin.git", version = "0.1.0" } tree_hash = "0.5.2" +thiserror = "1" object_store = { version = "0.9.0", features = ["gcp", "http", "aws"] } log = "0.4.20" dotenv = "0.15.0" +url = "2.5" zstd = "0.13.0" diff --git a/src/era_verifier.rs b/src/era_verifier.rs index 5226547..aadb662 100644 --- a/src/era_verifier.rs +++ b/src/era_verifier.rs @@ -2,14 +2,16 @@ use decoder::decode_flat_files; use header_accumulator::types::ExtHeaderRecord; use header_accumulator::{era_validator::era_validate, errors::EraValidateError}; use sf_protos::ethereum::r#type::v2::Block; + +use crate::store; pub const MAX_EPOCH_SIZE: usize = 8192; pub const FINAL_EPOCH: usize = 1896; pub const MERGE_BLOCK: usize = 15537394; /// verifies flat flies stored in directory against a header accumulator /// -pub fn verify_eras( - directory: &String, +pub async fn verify_eras( + store_url: &String, master_acc_file: Option<&String>, start_epoch: usize, end_epoch: Option, @@ -17,7 +19,7 @@ pub fn verify_eras( ) -> Result, EraValidateError> { let mut validated_epochs = Vec::new(); for epoch in start_epoch..=end_epoch.unwrap_or(start_epoch + 1) { - let blocks = get_blocks_from_dir(epoch, directory, decompress)?; + let blocks = get_blocks_from_dir(epoch, store_url, decompress).await?; let (successful_headers, _): (Vec<_>, Vec<_>) = blocks .iter() .cloned() @@ -46,15 +48,16 @@ pub fn verify_eras( Ok(validated_epochs) } -fn get_blocks_from_dir( +async fn get_blocks_from_dir( epoch: usize, - directory: &String, + store_url: &String, decompress: Option, ) -> Result, EraValidateError> { let start_100_block = epoch * MAX_EPOCH_SIZE; let end_100_block = (epoch + 1) * MAX_EPOCH_SIZE; - let mut blocks = extract_100s_blocks(directory, start_100_block, end_100_block, decompress)?; + let mut blocks = + extract_100s_blocks(store_url, start_100_block, end_100_block, decompress).await?; if epoch < FINAL_EPOCH { blocks = blocks[0..MAX_EPOCH_SIZE].to_vec(); @@ -65,8 +68,8 @@ fn get_blocks_from_dir( Ok(blocks) } -fn extract_100s_blocks( - directory: &String, +async fn extract_100s_blocks( + store_url: &String, start_block: usize, end_block: usize, decompress: Option, @@ -83,19 +86,23 @@ fn extract_100s_blocks( zst_extension = ".zst"; } for block_number in (start_100_block..end_100_block).step_by(100) { - let block_file_name = - directory.to_owned() + &format!("/{:010}.dbin{}", block_number, zst_extension); + let block_file_name = format!("{:010}.dbin{}", block_number, zst_extension); + let blocks_store = store::new(store_url).map_err(anyhow_error_to_era)?; + let decoded_blocks = blocks_store + .read_blocks(&block_file_name, store::ReadOptions { decompress }) + .await + .map_err(|err| { + log::error!("Error decoding flat files: {:?}", err); + EraValidateError::FlatFileDecodeError + })?; - let decoded_blocks = match decode_flat_files(block_file_name, None, None, decompress) { - Ok(blocks) => blocks, - Err(e) => { - log::error!("Error decoding flat files: {:?}", e); - return Err(EraValidateError::FlatFileDecodeError); - } - }; blocks.extend(decoded_blocks); } // Return only the requested blocks Ok(blocks[start_block - start_100_block..end_block - start_100_block].to_vec()) } + +fn anyhow_error_to_era(_: anyhow::Error) -> EraValidateError { + EraValidateError::IoError +} diff --git a/src/lib.rs b/src/lib.rs index 3395657..0159aa8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ pub mod era_verifier; pub mod s3; +pub mod store; pub mod stream; pub mod utils; diff --git a/src/main.rs b/src/main.rs index 3b7c1ef..1169231 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,9 +19,9 @@ struct Cli { enum Commands { /// Decode and validates flat files from a directory. EraValidate { - #[clap(short, long)] + #[clap(short = 'b', long)] // directory where flat files are located - dir: String, + store_url: String, #[clap(short, long)] // master accumulator file. default Portal Network file will be used if none provided @@ -41,7 +41,8 @@ enum Commands { }, } -fn main() { +#[tokio::main] +async fn main() { let cli = Cli::parse(); match cli.debug { @@ -55,18 +56,19 @@ fn main() { match &cli.command { Some(Commands::EraValidate { decompress, - dir, + store_url, master_acc_file, start_epoch, end_epoch, }) => { let result = verify_eras( - dir, + store_url, master_acc_file.as_ref(), *start_epoch, *end_epoch, *decompress, - ); + ) + .await; log::info!("epochs validated: {:?}", result); } None => {} diff --git a/src/store.rs b/src/store.rs new file mode 100644 index 0000000..f3bc4f9 --- /dev/null +++ b/src/store.rs @@ -0,0 +1,88 @@ +use bytes::Bytes; +use futures::stream::BoxStream; +use object_store::{aws::AmazonS3Builder, gcp::GoogleCloudStorageBuilder, path::Path, ObjectStore}; +use std::{io, sync::Arc}; +use thiserror::Error; +use url::Url; + +use sf_protos::ethereum::r#type::v2::Block; + +pub fn new>(store_url: S) -> Result { + let url = Url::parse(store_url.as_ref())?; + + match url.scheme() { + "gs" => { + let bucket = url.host_str().ok_or_else(|| anyhow::anyhow!("No bucket"))?; + let path = url.path(); + if path.starts_with("/") {} + + let store = GoogleCloudStorageBuilder::new() + .with_bucket_name(bucket.to_string()) + .build()?; + + Ok(Store { + store: Arc::new(store), + base: match path.starts_with("/") { + false => path.to_string(), + true => path[1..].to_string(), + }, + }) + } + _ => return Err(anyhow::anyhow!("Unsupported scheme: {}", url.scheme())), + } +} + +pub struct Store { + store: Arc, + base: String, +} + +impl Store { + pub async fn read_blocks( + &self, + path: &String, + options: ReadOptions, + ) -> Result, ReadError> { + let content = self.store.get(&self.join_path(path)).await?; + + // FIXME: Use a version appropriate, we could use `content.into_store` and support async reader API. + fake_handle_from_stream(content.into_stream(), options.decompress()).await + } + + fn join_path(&self, path: &String) -> Path { + Path::from(format!("{}/{}", self.base, path.trim_start_matches('/'))) + } +} + +#[derive(Error, Debug)] +pub enum ReadError { + #[error("Path '{0}' not found")] + NotFound(String), + #[error("Storage error: {0}")] + Storage(#[from] object_store::Error), +} + +pub struct ReadOptions { + pub decompress: Option, +} + +impl ReadOptions { + pub fn decompress(&self) -> bool { + self.decompress.unwrap_or(true) + } +} + +async fn fake_handle_from_stream( + mut stream: BoxStream<'static, Result>, + decompress: bool, +) -> Result, ReadError> { + use futures::stream::TryStreamExt; // for `try_next` + let mut sum = 0; + while let Some(item) = stream.try_next().await? { + sum += item.len(); + } + + println!("Bytes read: {}", sum); + + Ok(vec![]) +} From 7699a406a0d0009bf18b87eabba4b7f789279448 Mon Sep 17 00:00:00 2001 From: Matthieu Vachon Date: Thu, 21 Mar 2024 23:19:31 -0400 Subject: [PATCH 3/7] Tweak further so that GCP reading even work --- src/era_verifier.rs | 1 + src/store.rs | 32 +++++++++++++++++++++++++------- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/src/era_verifier.rs b/src/era_verifier.rs index aadb662..e8362db 100644 --- a/src/era_verifier.rs +++ b/src/era_verifier.rs @@ -85,6 +85,7 @@ async fn extract_100s_blocks( if decompress.unwrap() { zst_extension = ".zst"; } + for block_number in (start_100_block..end_100_block).step_by(100) { let block_file_name = format!("{:010}.dbin{}", block_number, zst_extension); let blocks_store = store::new(store_url).map_err(anyhow_error_to_era)?; diff --git a/src/store.rs b/src/store.rs index f3bc4f9..9462a55 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,8 +1,13 @@ -use bytes::Bytes; +use bytes::{Buf, Bytes, BytesMut}; +use decoder::{decode_flat_files, handle_file}; use futures::stream::BoxStream; use object_store::{aws::AmazonS3Builder, gcp::GoogleCloudStorageBuilder, path::Path, ObjectStore}; -use std::{io, sync::Arc}; +use std::{ + io::{self, Read}, + sync::Arc, +}; use thiserror::Error; +use tokio::io::AsyncWriteExt; use url::Url; use sf_protos::ethereum::r#type::v2::Block; @@ -77,12 +82,25 @@ async fn fake_handle_from_stream( decompress: bool, ) -> Result, ReadError> { use futures::stream::TryStreamExt; // for `try_next` - let mut sum = 0; + + let mut file = tokio::fs::OpenOptions::new() + .write(true) + .create(true) + .open("/tmp/temp_block.dbin.zst") + .await + .expect("demo code, no file would be use when flat_file_decoders will be updated"); + while let Some(item) = stream.try_next().await? { - sum += item.len(); + file.write_all(&item) + .await + .expect("demo code, unable to write to temp file"); } - println!("Bytes read: {}", sum); - - Ok(vec![]) + Ok(decode_flat_files( + "/tmp/temp_block.dbin.zst".to_string(), + None, + None, + Some(decompress), + ) + .expect("demo code, deal with error nicely")) } From ec1de7250e250c8ff4bed79c7c1ff600fd19c08d Mon Sep 17 00:00:00 2001 From: Matthieu Vachon Date: Fri, 22 Mar 2024 00:00:37 -0400 Subject: [PATCH 4/7] Added local file support --- src/era_verifier.rs | 19 +++++-------------- src/main.rs | 19 ++++++++++++++++--- src/store.rs | 40 +++++++++++++++++++++++++++++++--------- 3 files changed, 52 insertions(+), 26 deletions(-) diff --git a/src/era_verifier.rs b/src/era_verifier.rs index e8362db..a15691d 100644 --- a/src/era_verifier.rs +++ b/src/era_verifier.rs @@ -1,4 +1,3 @@ -use decoder::decode_flat_files; use header_accumulator::types::ExtHeaderRecord; use header_accumulator::{era_validator::era_validate, errors::EraValidateError}; use sf_protos::ethereum::r#type::v2::Block; @@ -16,7 +15,7 @@ pub async fn verify_eras( start_epoch: usize, end_epoch: Option, decompress: Option, -) -> Result, EraValidateError> { +) -> Result, anyhow::Error> { let mut validated_epochs = Vec::new(); for epoch in start_epoch..=end_epoch.unwrap_or(start_epoch + 1) { let blocks = get_blocks_from_dir(epoch, store_url, decompress).await?; @@ -52,7 +51,7 @@ async fn get_blocks_from_dir( epoch: usize, store_url: &String, decompress: Option, -) -> Result, EraValidateError> { +) -> Result, anyhow::Error> { let start_100_block = epoch * MAX_EPOCH_SIZE; let end_100_block = (epoch + 1) * MAX_EPOCH_SIZE; @@ -73,7 +72,7 @@ async fn extract_100s_blocks( start_block: usize, end_block: usize, decompress: Option, -) -> Result, EraValidateError> { +) -> Result, anyhow::Error> { // Flat files are stored in 100 block files // So we need to find the 100 block file that contains the start block and the 100 block file that contains the end block let start_100_block = (start_block / 100) * 100; @@ -88,14 +87,10 @@ async fn extract_100s_blocks( for block_number in (start_100_block..end_100_block).step_by(100) { let block_file_name = format!("{:010}.dbin{}", block_number, zst_extension); - let blocks_store = store::new(store_url).map_err(anyhow_error_to_era)?; + let blocks_store = store::new(store_url)?; let decoded_blocks = blocks_store .read_blocks(&block_file_name, store::ReadOptions { decompress }) - .await - .map_err(|err| { - log::error!("Error decoding flat files: {:?}", err); - EraValidateError::FlatFileDecodeError - })?; + .await?; blocks.extend(decoded_blocks); } @@ -103,7 +98,3 @@ async fn extract_100s_blocks( // Return only the requested blocks Ok(blocks[start_block - start_100_block..end_block - start_100_block].to_vec()) } - -fn anyhow_error_to_era(_: anyhow::Error) -> EraValidateError { - EraValidateError::IoError -} diff --git a/src/main.rs b/src/main.rs index 1169231..78220d1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -61,15 +61,28 @@ async fn main() { start_epoch, end_epoch, }) => { - let result = verify_eras( + println!( + "Starting era validation {} - {}", + start_epoch, + end_epoch.map(|x| x.to_string()).unwrap_or("".to_string()) + ); + + match verify_eras( store_url, master_acc_file.as_ref(), *start_epoch, *end_epoch, *decompress, ) - .await; - log::info!("epochs validated: {:?}", result); + .await + { + Ok(result) => { + println!("Epochs validated: {:?}", result); + } + Err(e) => { + log::error!("error: {:#}", e); + } + } } None => {} } diff --git a/src/store.rs b/src/store.rs index 9462a55..05632b0 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,11 +1,11 @@ -use bytes::{Buf, Bytes, BytesMut}; -use decoder::{decode_flat_files, handle_file}; +use anyhow::Context; +use bytes::Bytes; +use decoder::decode_flat_files; use futures::stream::BoxStream; -use object_store::{aws::AmazonS3Builder, gcp::GoogleCloudStorageBuilder, path::Path, ObjectStore}; -use std::{ - io::{self, Read}, - sync::Arc, +use object_store::{ + gcp::GoogleCloudStorageBuilder, local::LocalFileSystem, path::Path, ObjectStore, }; +use std::sync::Arc; use thiserror::Error; use tokio::io::AsyncWriteExt; use url::Url; @@ -13,13 +13,26 @@ use url::Url; use sf_protos::ethereum::r#type::v2::Block; pub fn new>(store_url: S) -> Result { - let url = Url::parse(store_url.as_ref())?; + let store_url = store_url.as_ref(); + let url = match Url::parse(store_url) { + Ok(url) => url, + Err(url::ParseError::RelativeUrlWithoutBase) => { + let absolute_path = std::fs::canonicalize(store_url) + .map_err(|e| anyhow::anyhow!("Invalid store URL: {}: {}", store_url, e))?; + + Url::parse(&format!("file://{}", absolute_path.to_string_lossy())) + .with_context(|| format!("Invalid store URL: {}", store_url))? + } + Err(e) => Err(e).with_context(|| format!("Invalid store URL: {}", store_url))?, + }; match url.scheme() { + "s3" => { + unimplemented!("s3://... support not implemented yet") + } "gs" => { let bucket = url.host_str().ok_or_else(|| anyhow::anyhow!("No bucket"))?; let path = url.path(); - if path.starts_with("/") {} let store = GoogleCloudStorageBuilder::new() .with_bucket_name(bucket.to_string()) @@ -33,7 +46,15 @@ pub fn new>(store_url: S) -> Result { }, }) } - _ => return Err(anyhow::anyhow!("Unsupported scheme: {}", url.scheme())), + "file" => { + let store = LocalFileSystem::new_with_prefix(url.path()).context("new local store")?; + + Ok(Store { + store: Arc::new(store), + base: "".to_string(), + }) + } + _ => Err(anyhow::anyhow!("Unsupported scheme: {}", url.scheme()))?, } } @@ -86,6 +107,7 @@ async fn fake_handle_from_stream( let mut file = tokio::fs::OpenOptions::new() .write(true) .create(true) + .truncate(true) .open("/tmp/temp_block.dbin.zst") .await .expect("demo code, no file would be use when flat_file_decoders will be updated"); From 99a982f1e86f50fb768d6153417ece42044de1a3 Mon Sep 17 00:00:00 2001 From: Matthieu Vachon Date: Fri, 22 Mar 2024 00:23:00 -0400 Subject: [PATCH 5/7] Fixed temp writing that was not flushin to disk fully before read in all cases --- src/era_verifier.rs | 2 +- src/store.rs | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/era_verifier.rs b/src/era_verifier.rs index a15691d..2706230 100644 --- a/src/era_verifier.rs +++ b/src/era_verifier.rs @@ -1,5 +1,5 @@ +use header_accumulator::era_validator::era_validate; use header_accumulator::types::ExtHeaderRecord; -use header_accumulator::{era_validator::era_validate, errors::EraValidateError}; use sf_protos::ethereum::r#type::v2::Block; use crate::store; diff --git a/src/store.rs b/src/store.rs index 05632b0..829a867 100644 --- a/src/store.rs +++ b/src/store.rs @@ -118,6 +118,11 @@ async fn fake_handle_from_stream( .expect("demo code, unable to write to temp file"); } + file.sync_all() + .await + .expect("demo code, unable to sync temp file"); + drop(file); + Ok(decode_flat_files( "/tmp/temp_block.dbin.zst".to_string(), None, From c42cd573d4859fe4909469e9e10c7a2dddf6d390 Mon Sep 17 00:00:00 2001 From: pedro bufulin Date: Wed, 27 Mar 2024 19:00:15 -0300 Subject: [PATCH 6/7] feat(store): handle_from_bytes function, using decompression if needed from decoder dependency --- Cargo.lock | 4 +-- Cargo.toml | 2 +- src/store.rs | 83 ++++++++++++++++++++++++++++------------------------ 3 files changed, 47 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cf58598..0bc37d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -931,7 +931,7 @@ dependencies = [ [[package]] name = "decoder" version = "0.1.1" -source = "git+https://github.com/semiotic-ai/flat-files-decoder.git?tag=v0.2.0#7a30d1d52b80a62ef446bf53dc3daf490c228d61" +source = "git+https://github.com/semiotic-ai/flat-files-decoder.git?branch=main#780707ed7dc2d0c1bd4d9f5fbfbb3cdaeb29ae98" dependencies = [ "anyhow", "bincode", @@ -1968,7 +1968,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.5.5", + "socket2 0.4.10", "tokio", "tower-service", "tracing", diff --git a/Cargo.toml b/Cargo.toml index fb52634..a45bf5a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ tokio = { version = "1.0", features = ["full"] } futures = "0.3" env_logger = "0.11.2" header_accumulator = { git = "https://git@github.com/semiotic-ai/header_accumulator.git", tag = "v0.1.1-alpha" } -decoder = { git = "https://github.com/semiotic-ai/flat-files-decoder.git", tag = "v0.2.0"} +decoder = { git = "https://github.com/semiotic-ai/flat-files-decoder.git", branch = "main"} sf-protos = { git = "https://github.com/semiotic-ai/sf-protos.git", version = "0.1.0"} trin-validation = { git = "https://github.com/ethereum/trin.git", version = "0.1.0" } tree_hash = "0.5.2" diff --git a/src/store.rs b/src/store.rs index 829a867..a8e7c13 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,13 +1,11 @@ use anyhow::Context; use bytes::Bytes; -use decoder::decode_flat_files; -use futures::stream::BoxStream; +use decoder::handle_buf; use object_store::{ gcp::GoogleCloudStorageBuilder, local::LocalFileSystem, path::Path, ObjectStore, }; use std::sync::Arc; use thiserror::Error; -use tokio::io::AsyncWriteExt; use url::Url; use sf_protos::ethereum::r#type::v2::Block; @@ -58,6 +56,7 @@ pub fn new>(store_url: S) -> Result { } } +#[derive(Clone)] pub struct Store { store: Arc, base: String, @@ -66,16 +65,16 @@ pub struct Store { impl Store { pub async fn read_blocks( &self, - path: &String, + path: String, options: ReadOptions, ) -> Result, ReadError> { let content = self.store.get(&self.join_path(path)).await?; + let bytes: Bytes = content.bytes().await.unwrap(); - // FIXME: Use a version appropriate, we could use `content.into_store` and support async reader API. - fake_handle_from_stream(content.into_stream(), options.decompress()).await + handle_from_bytes(bytes, options.decompress()).await } - fn join_path(&self, path: &String) -> Path { + fn join_path(&self, path: String) -> Path { Path::from(format!("{}/{}", self.base, path.trim_start_matches('/'))) } } @@ -86,6 +85,8 @@ pub enum ReadError { NotFound(String), #[error("Storage error: {0}")] Storage(#[from] object_store::Error), + #[error("Decode error: {0}")] + DecodeError(String), // Or directly use DecodeError if it implements `std::error::Error` } pub struct ReadOptions { @@ -98,36 +99,40 @@ impl ReadOptions { } } -async fn fake_handle_from_stream( - mut stream: BoxStream<'static, Result>, - decompress: bool, -) -> Result, ReadError> { - use futures::stream::TryStreamExt; // for `try_next` - - let mut file = tokio::fs::OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open("/tmp/temp_block.dbin.zst") - .await - .expect("demo code, no file would be use when flat_file_decoders will be updated"); - - while let Some(item) = stream.try_next().await? { - file.write_all(&item) - .await - .expect("demo code, unable to write to temp file"); - } - - file.sync_all() - .await - .expect("demo code, unable to sync temp file"); - drop(file); - - Ok(decode_flat_files( - "/tmp/temp_block.dbin.zst".to_string(), - None, - None, - Some(decompress), - ) - .expect("demo code, deal with error nicely")) +async fn handle_from_bytes(bytes: Bytes, decompress: bool) -> Result, ReadError> { + handle_buf(bytes.as_ref(), Some(decompress)).map_err(|e| ReadError::DecodeError(e.to_string())) } + +// async fn fake_handle_from_stream( +// mut stream: BoxStream<'static, Result>, +// decompress: bool, +// ) -> Result, ReadError> { +// use futures::stream::TryStreamExt; // for `try_next` + +// let mut file = tokio::fs::OpenOptions::new() +// .write(true) +// .create(true) +// .truncate(true) +// .open("/tmp/temp_block.dbin.zst") +// .await +// .expect("demo code, no file would be use when flat_file_decoders will be updated"); + +// while let Some(item) = stream.try_next().await? { +// file.write_all(&item) +// .await +// .expect("demo code, unable to write to temp file"); +// } + +// file.sync_all() +// .await +// .expect("demo code, unable to sync temp file"); +// drop(file); + +// Ok(decode_flat_files( +// "/tmp/temp_block.dbin.zst".to_string(), +// None, +// None, +// Some(decompress), +// ) +// .expect("demo code, deal with error nicely")) +// } From 26c73569f5ac811db728246515f12308c92d2a56 Mon Sep 17 00:00:00 2001 From: pedro bufulin Date: Wed, 27 Mar 2024 19:04:02 -0300 Subject: [PATCH 7/7] feat(era_verifier): use futures in fetching 100s blocks from storage to run it concurrently hopefully decreasing fetch time --- src/bin/fetch-webdav.rs | 2 +- src/era_verifier.rs | 26 +++++++++++++++----------- src/s3.rs | 2 +- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/src/bin/fetch-webdav.rs b/src/bin/fetch-webdav.rs index 7cf2d69..9630a7c 100644 --- a/src/bin/fetch-webdav.rs +++ b/src/bin/fetch-webdav.rs @@ -51,7 +51,7 @@ async fn main() { let mut headers: Vec = Vec::new(); // Use `as_ref` to get a &[u8] from `bytes` and pass it to `handle_buf` - match handle_buf(bytes.as_ref()) { + match handle_buf(bytes.as_ref(), Some(false)) { Ok(blocks) => { let (successful_headers, _): (Vec<_>, Vec<_>) = blocks .iter() diff --git a/src/era_verifier.rs b/src/era_verifier.rs index 2706230..c38aa83 100644 --- a/src/era_verifier.rs +++ b/src/era_verifier.rs @@ -1,3 +1,4 @@ +use futures::stream::{FuturesOrdered, StreamExt}; use header_accumulator::era_validator::era_validate; use header_accumulator::types::ExtHeaderRecord; use sf_protos::ethereum::r#type::v2::Block; @@ -78,23 +79,26 @@ async fn extract_100s_blocks( let start_100_block = (start_block / 100) * 100; let end_100_block = (((end_block as f32) / 100.0).ceil() as usize) * 100; - let mut blocks: Vec = Vec::new(); + let zst_extension = if decompress.unwrap() { ".zst" } else { "" }; + let blocks_store = store::new(store_url).expect("failed to create blocks store"); - let mut zst_extension = ""; - if decompress.unwrap() { - zst_extension = ".zst"; - } + let mut futs = FuturesOrdered::new(); for block_number in (start_100_block..end_100_block).step_by(100) { let block_file_name = format!("{:010}.dbin{}", block_number, zst_extension); - let blocks_store = store::new(store_url)?; - let decoded_blocks = blocks_store - .read_blocks(&block_file_name, store::ReadOptions { decompress }) - .await?; + futs.push_back(blocks_store.read_blocks(block_file_name, store::ReadOptions { decompress })) + } - blocks.extend(decoded_blocks); + let mut all_blocks = Vec::new(); + + while let Some(res) = futs.next().await { + match res { + Ok(blocks) => all_blocks.extend(blocks), + Err(e) => println!("{:?}", e), + } } // Return only the requested blocks - Ok(blocks[start_block - start_100_block..end_block - start_100_block].to_vec()) + + Ok(all_blocks[start_block - start_100_block..end_block - start_100_block].to_vec()) } diff --git a/src/s3.rs b/src/s3.rs index 03986e4..8cf9e92 100644 --- a/src/s3.rs +++ b/src/s3.rs @@ -58,7 +58,7 @@ pub async fn s3_fetch( let bytes = result.bytes().await.unwrap(); // Use `as_ref` to get a &[u8] from `bytes` and pass it to `handle_buf` - match handle_buf(bytes.as_ref()) { + match handle_buf(bytes.as_ref(), Some(false)) { Ok(blocks) => { let (successful_headers, _): (Vec<_>, Vec<_>) = blocks .iter()