Skip to content
This repository has been archived by the owner on Nov 19, 2024. It is now read-only.

Pr 4 #5

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,21 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1"
bytes = "1.5"
clap = { version = "4.3.21", features = ["derive"] }
bincode = "1.3.3"
tokio = { version = "1.0", features = ["full"] }
futures = "0.3"
env_logger = "0.11.2"
header_accumulator = { git = "https://[email protected]/semiotic-ai/header_accumulator.git", tag = "v0.1.1-alpha" }
decoder = { git = "https://github.com/semiotic-ai/flat-files-decoder.git", tag = "v0.2.0"}
decoder = { git = "https://github.com/semiotic-ai/flat-files-decoder.git", branch = "main"}
sf-protos = { git = "https://github.com/semiotic-ai/sf-protos.git", version = "0.1.0"}
trin-validation = { git = "https://github.com/ethereum/trin.git", version = "0.1.0" }
tree_hash = "0.5.2"
thiserror = "1"
object_store = { version = "0.9.0", features = ["gcp", "http", "aws"] }
log = "0.4.20"
dotenv = "0.15.0"
url = "2.5"
zstd = "0.13.0"
3 changes: 3 additions & 0 deletions rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[toolchain]
channel = "1.75"
components = [ "rustfmt" ]
2 changes: 1 addition & 1 deletion src/bin/fetch-webdav.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async fn main() {
let mut headers: Vec<ExtHeaderRecord> = Vec::new();

// Use `as_ref` to get a &[u8] from `bytes` and pass it to `handle_buf`
match handle_buf(bytes.as_ref()) {
match handle_buf(bytes.as_ref(), Some(false)) {
Ok(blocks) => {
let (successful_headers, _): (Vec<_>, Vec<_>) = blocks
.iter()
Expand Down
61 changes: 32 additions & 29 deletions src/era_verifier.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
use decoder::decode_flat_files;
use futures::stream::{FuturesOrdered, StreamExt};
use header_accumulator::era_validator::era_validate;
use header_accumulator::types::ExtHeaderRecord;
use header_accumulator::{era_validator::era_validate, errors::EraValidateError};
use sf_protos::ethereum::r#type::v2::Block;

use crate::store;
pub const MAX_EPOCH_SIZE: usize = 8192;
pub const FINAL_EPOCH: usize = 1896;
pub const MERGE_BLOCK: usize = 15537394;

/// verifies flat flies stored in directory against a header accumulator
///
pub fn verify_eras(
directory: &String,
pub async fn verify_eras(
store_url: &String,
master_acc_file: Option<&String>,
start_epoch: usize,
end_epoch: Option<usize>,
decompress: Option<bool>,
) -> Result<Vec<usize>, EraValidateError> {
) -> Result<Vec<usize>, anyhow::Error> {
let mut validated_epochs = Vec::new();
for epoch in start_epoch..=end_epoch.unwrap_or(start_epoch + 1) {
let blocks = get_blocks_from_dir(epoch, directory, decompress)?;
let blocks = get_blocks_from_dir(epoch, store_url, decompress).await?;
let (successful_headers, _): (Vec<_>, Vec<_>) = blocks
.iter()
.cloned()
Expand Down Expand Up @@ -46,15 +48,16 @@ pub fn verify_eras(
Ok(validated_epochs)
}

fn get_blocks_from_dir(
async fn get_blocks_from_dir(
epoch: usize,
directory: &String,
store_url: &String,
decompress: Option<bool>,
) -> Result<Vec<Block>, EraValidateError> {
) -> Result<Vec<Block>, anyhow::Error> {
let start_100_block = epoch * MAX_EPOCH_SIZE;
let end_100_block = (epoch + 1) * MAX_EPOCH_SIZE;

let mut blocks = extract_100s_blocks(directory, start_100_block, end_100_block, decompress)?;
let mut blocks =
extract_100s_blocks(store_url, start_100_block, end_100_block, decompress).await?;

if epoch < FINAL_EPOCH {
blocks = blocks[0..MAX_EPOCH_SIZE].to_vec();
Expand All @@ -65,37 +68,37 @@ fn get_blocks_from_dir(
Ok(blocks)
}

fn extract_100s_blocks(
directory: &String,
async fn extract_100s_blocks(
store_url: &String,
start_block: usize,
end_block: usize,
decompress: Option<bool>,
) -> Result<Vec<Block>, EraValidateError> {
) -> Result<Vec<Block>, anyhow::Error> {
// Flat files are stored in 100 block files
// So we need to find the 100 block file that contains the start block and the 100 block file that contains the end block
let start_100_block = (start_block / 100) * 100;
let end_100_block = (((end_block as f32) / 100.0).ceil() as usize) * 100;

let mut blocks: Vec<Block> = Vec::new();
let zst_extension = if decompress.unwrap() { ".zst" } else { "" };
let blocks_store = store::new(store_url).expect("failed to create blocks store");

let mut futs = FuturesOrdered::new();

let mut zst_extension = "";
if decompress.unwrap() {
zst_extension = ".zst";
}
for block_number in (start_100_block..end_100_block).step_by(100) {
let block_file_name =
directory.to_owned() + &format!("/{:010}.dbin{}", block_number, zst_extension);
let block_file_name = format!("{:010}.dbin{}", block_number, zst_extension);
futs.push_back(blocks_store.read_blocks(block_file_name, store::ReadOptions { decompress }))
}

let mut all_blocks = Vec::new();

let decoded_blocks = match decode_flat_files(block_file_name, None, None, decompress) {
Ok(blocks) => blocks,
Err(e) => {
log::error!("Error decoding flat files: {:?}", e);
return Err(EraValidateError::FlatFileDecodeError);
}
};
blocks.extend(decoded_blocks);
while let Some(res) = futs.next().await {
match res {
Ok(blocks) => all_blocks.extend(blocks),
Err(e) => println!("{:?}", e),
}
}

// Return only the requested blocks
Ok(blocks[start_block - start_100_block..end_block - start_100_block].to_vec())

Ok(all_blocks[start_block - start_100_block..end_block - start_100_block].to_vec())
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod era_verifier;
pub mod s3;
pub mod store;
pub mod stream;
pub mod utils;
31 changes: 23 additions & 8 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ struct Cli {
enum Commands {
/// Decode and validates flat files from a directory.
EraValidate {
#[clap(short, long)]
#[clap(short = 'b', long)]
// directory where flat files are located
dir: String,
store_url: String,

#[clap(short, long)]
// master accumulator file. default Portal Network file will be used if none provided
Expand All @@ -41,7 +41,8 @@ enum Commands {
},
}

fn main() {
#[tokio::main]
async fn main() {
let cli = Cli::parse();

match cli.debug {
Expand All @@ -55,19 +56,33 @@ fn main() {
match &cli.command {
Some(Commands::EraValidate {
decompress,
dir,
store_url,
master_acc_file,
start_epoch,
end_epoch,
}) => {
let result = verify_eras(
dir,
println!(
"Starting era validation {} - {}",
start_epoch,
end_epoch.map(|x| x.to_string()).unwrap_or("".to_string())
);

match verify_eras(
store_url,
master_acc_file.as_ref(),
*start_epoch,
*end_epoch,
*decompress,
);
log::info!("epochs validated: {:?}", result);
)
.await
{
Ok(result) => {
println!("Epochs validated: {:?}", result);
}
Err(e) => {
log::error!("error: {:#}", e);
}
}
}
None => {}
}
Expand Down
2 changes: 1 addition & 1 deletion src/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub async fn s3_fetch(
let bytes = result.bytes().await.unwrap();

// Use `as_ref` to get a &[u8] from `bytes` and pass it to `handle_buf`
match handle_buf(bytes.as_ref()) {
match handle_buf(bytes.as_ref(), Some(false)) {
Ok(blocks) => {
let (successful_headers, _): (Vec<_>, Vec<_>) = blocks
.iter()
Expand Down
Loading
Loading