Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use u64 range instead of usize, for better wasm32 support #6961

Merged
merged 8 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1058,7 +1058,7 @@ impl TryFrom<Blob> for ObjectMeta {
Ok(Self {
location: Path::parse(value.name)?,
last_modified: value.properties.last_modified,
size: value.properties.content_length as usize,
size: value.properties.content_length,
XiangpengHao marked this conversation as resolved.
Show resolved Hide resolved
e_tag: value.properties.e_tag,
version: None, // For consistency with S3 and GCP which don't include this
})
Expand Down
12 changes: 6 additions & 6 deletions object_store/src/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ use crate::{PutPayload, Result};
#[derive(Debug)]
pub struct ChunkedStore {
inner: Arc<dyn ObjectStore>,
chunk_size: usize,
chunk_size: u64,
}

impl ChunkedStore {
/// Creates a new [`ChunkedStore`] with the specified chunk_size
pub fn new(inner: Arc<dyn ObjectStore>, chunk_size: usize) -> Self {
pub fn new(inner: Arc<dyn ObjectStore>, chunk_size: u64) -> Self {
Self { inner, chunk_size }
}
}
Expand Down Expand Up @@ -100,7 +100,7 @@ impl ObjectStore for ChunkedStore {
if exhausted {
return None;
}
while buffer.len() < chunk_size {
while buffer.len() < chunk_size as usize {
XiangpengHao marked this conversation as resolved.
Show resolved Hide resolved
match stream.next().await {
None => {
exhausted = true;
Expand All @@ -125,7 +125,7 @@ impl ObjectStore for ChunkedStore {
};
}
// Return the chunked values as the next value in the stream
let slice = buffer.split_to(chunk_size).freeze();
let slice = buffer.split_to(chunk_size as usize).freeze();
Some((Ok(slice), (stream, buffer, exhausted, chunk_size)))
},
)
Expand All @@ -138,7 +138,7 @@ impl ObjectStore for ChunkedStore {
})
}

async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
self.inner.get_range(location, range).await
}

Expand Down Expand Up @@ -203,7 +203,7 @@ mod tests {

let mut remaining = 1001;
while let Some(next) = s.next().await {
let size = next.unwrap().len();
let size = next.unwrap().len() as u64;
let expected = remaining.min(chunk_size);
assert_eq!(size, expected);
remaining -= expected;
Expand Down
10 changes: 5 additions & 5 deletions object_store/src/client/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ impl<T: GetClient> GetClientExt for T {

struct ContentRange {
/// The range of the object returned
range: Range<usize>,
range: Range<u64>,
/// The total size of the object being requested
size: usize,
size: u64,
}

impl ContentRange {
Expand All @@ -84,7 +84,7 @@ impl ContentRange {
let (start_s, end_s) = range.split_once('-')?;

let start = start_s.parse().ok()?;
let end: usize = end_s.parse().ok()?;
let end: u64 = end_s.parse().ok()?;

Some(Self {
size,
Expand Down Expand Up @@ -140,8 +140,8 @@ enum GetResultError {

#[error("Requested {expected:?}, got {actual:?}")]
UnexpectedRange {
expected: Range<usize>,
actual: Range<usize>,
expected: Range<u64>,
actual: Range<u64>,
},
}

Expand Down
2 changes: 1 addition & 1 deletion object_store/src/client/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub struct ListPrefix {
#[serde(rename_all = "PascalCase")]
pub struct ListContents {
pub key: String,
pub size: usize,
pub size: u64,
pub last_modified: DateTime<Utc>,
#[serde(rename = "ETag")]
pub e_tag: Option<String>,
Expand Down
4 changes: 2 additions & 2 deletions object_store/src/http/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ impl MultiStatusResponse {
})?)
}

fn size(&self) -> Result<usize> {
fn size(&self) -> Result<u64> {
let size = self
.prop_stat
.prop
Expand Down Expand Up @@ -462,7 +462,7 @@ pub(crate) struct Prop {
last_modified: DateTime<Utc>,

#[serde(rename = "getcontentlength")]
content_length: Option<usize>,
content_length: Option<u64>,

#[serde(rename = "resourcetype")]
resource_type: ResourceType,
Expand Down
10 changes: 5 additions & 5 deletions object_store/src/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ pub async fn put_get_delete_list(storage: &DynObjectStore) {
let range_result = storage.get_range(&location, range.clone()).await;

let bytes = range_result.unwrap();
assert_eq!(bytes, data.slice(range.clone()));
assert_eq!(bytes, data.slice(range.start as usize..range.end as usize));

let opts = GetOptions {
range: Some(GetRange::Bounded(2..5)),
Expand Down Expand Up @@ -190,11 +190,11 @@ pub async fn put_get_delete_list(storage: &DynObjectStore) {
let ranges = vec![0..1, 2..3, 0..5];
let bytes = storage.get_ranges(&location, &ranges).await.unwrap();
for (range, bytes) in ranges.iter().zip(bytes) {
assert_eq!(bytes, data.slice(range.clone()))
assert_eq!(bytes, data.slice(range.start as usize..range.end as usize));
}

let head = storage.head(&location).await.unwrap();
assert_eq!(head.size, data.len());
assert_eq!(head.size, data.len() as u64);

storage.delete(&location).await.unwrap();

Expand Down Expand Up @@ -934,7 +934,7 @@ pub async fn list_with_delimiter(storage: &DynObjectStore) {
let object = &result.objects[0];

assert_eq!(object.location, expected_location);
assert_eq!(object.size, data.len());
assert_eq!(object.size, data.len() as u64);

// ==================== check: prefix-list `mydb/wb/000/000/001` (partial filename doesn't match) ====================
let prefix = Path::from("mydb/wb/000/000/001");
Expand Down Expand Up @@ -1085,7 +1085,7 @@ pub async fn multipart(storage: &dyn ObjectStore, multipart: &dyn MultipartStore
.unwrap();

let meta = storage.head(&path).await.unwrap();
assert_eq!(meta.size, chunk_size * 2);
assert_eq!(meta.size, chunk_size as u64 * 2);

// Empty case
let path = Path::from("test_empty_multipart");
Expand Down
18 changes: 9 additions & 9 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@
//!
//! // Buffer the entire object in memory
//! let object: Bytes = result.bytes().await.unwrap();
//! assert_eq!(object.len(), meta.size);
//! assert_eq!(object.len() as u64, meta.size);
//!
//! // Alternatively stream the bytes from object storage
//! let stream = object_store.get(&path).await.unwrap().into_stream();
Expand Down Expand Up @@ -630,7 +630,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
/// in the given byte range.
///
/// See [`GetRange::Bounded`] for more details on how `range` gets interpreted
async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
let options = GetOptions {
range: Some(range.into()),
..Default::default()
Expand All @@ -640,7 +640,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {

/// Return the bytes that are stored at the specified location
/// in the given byte ranges
async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> Result<Vec<Bytes>> {
async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
coalesce_ranges(
ranges,
|range| self.get_range(location, range),
Expand Down Expand Up @@ -820,14 +820,14 @@ macro_rules! as_ref_impl {
self.as_ref().get_opts(location, options).await
}

async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
self.as_ref().get_range(location, range).await
}

async fn get_ranges(
&self,
location: &Path,
ranges: &[Range<usize>],
ranges: &[Range<u64>],
) -> Result<Vec<Bytes>> {
self.as_ref().get_ranges(location, ranges).await
}
Expand Down Expand Up @@ -904,7 +904,7 @@ pub struct ObjectMeta {
/// The last modified time
pub last_modified: DateTime<Utc>,
/// The size in bytes of the object
XiangpengHao marked this conversation as resolved.
Show resolved Hide resolved
pub size: usize,
pub size: u64,
/// The unique identifier for the object
///
/// <https://datatracker.ietf.org/doc/html/rfc9110#name-etag>
Expand Down Expand Up @@ -1019,7 +1019,7 @@ pub struct GetResult {
/// The [`ObjectMeta`] for this object
pub meta: ObjectMeta,
/// The range of bytes returned by this request
XiangpengHao marked this conversation as resolved.
Show resolved Hide resolved
pub range: Range<usize>,
pub range: Range<u64>,
/// Additional object attributes
pub attributes: Attributes,
}
Expand Down Expand Up @@ -1060,7 +1060,7 @@ impl GetResult {
path: path.clone(),
})?;

let mut buffer = Vec::with_capacity(len);
let mut buffer = Vec::with_capacity(len as usize);
XiangpengHao marked this conversation as resolved.
Show resolved Hide resolved
file.take(len as _)
.read_to_end(&mut buffer)
.map_err(|source| local::Error::UnableToReadBytes { source, path })?;
Expand Down Expand Up @@ -1093,7 +1093,7 @@ impl GetResult {
match self.payload {
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
GetResultPayload::File(file, path) => {
const CHUNK_SIZE: usize = 8 * 1024;
const CHUNK_SIZE: u64 = 8 * 1024;
local::chunked_stream(file, path, self.range, CHUNK_SIZE)
}
GetResultPayload::Stream(s) => s,
Expand Down
4 changes: 2 additions & 2 deletions object_store/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,12 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
Ok(permit_get_result(r, permit))
}

async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.get_range(location, range).await
}

async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> Result<Vec<Bytes>> {
async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.get_ranges(location, ranges).await
}
Expand Down
Loading
Loading