Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: out of range error in module ObjectStore base on OBKV #1089

Merged
merged 6 commits into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 61 additions & 17 deletions components/object_store/src/obkv/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,25 +166,34 @@ impl ObkvObjectMeta {
}

/// Compute the convered parts based on given range parameter
pub fn compute_covered_parts(&self, range: Range<usize>) -> Result<ConveredParts> {
pub fn compute_covered_parts(&self, range: Range<usize>) -> Result<Option<ConveredParts>> {
ensure!(
range.end <= self.size,
OutOfRange {
end: range.end,
object_size: self.size,
}
);

// if the range is empty, return empty parts
if range.is_empty() {
return Ok(None);
}

let batch_size = self.part_size;
let start_index = range.start / batch_size;
let start_offset = range.start % batch_size;
let end_index = range.end / batch_size;
let end_offset = range.end % batch_size;

Ok(ConveredParts {
let inclusive_end = range.end - 1;

let end_index = inclusive_end / batch_size;
let end_offset = inclusive_end % batch_size;

Ok(Some(ConveredParts {
part_keys: &self.parts[start_index..=end_index],
start_offset,
end_offset,
})
}))
}
}

Expand Down Expand Up @@ -213,7 +222,7 @@ impl<T: TableKv> MetaManager<T> {
pub async fn save(&self, meta: ObkvObjectMeta) -> Result<()> {
let mut batch = T::WriteBatch::default();
let encode_bytes = meta.encode()?;
batch.insert(meta.location.as_bytes(), &encode_bytes);
batch.insert_or_update(meta.location.as_bytes(), &encode_bytes);
self.client
.as_ref()
.write(WriteContext::default(), OBJECT_STORE_META, batch)
Expand Down Expand Up @@ -301,7 +310,7 @@ mod test {

#[test]
fn test_estimate_size() {
let meta = build_test_meta();
let meta = build_test_meta0();

let expect = meta.estimate_size_of_json();
let json = &serde_json::to_string(&meta).unwrap();
Expand All @@ -312,50 +321,73 @@ mod test {

#[test]
fn test_compute_convered_parts() {
let meta = build_test_meta();
let meta = build_test_meta0();

let range1 = Range { start: 0, end: 1 };
let expect = meta.compute_covered_parts(range1).unwrap();
let expect = meta.compute_covered_parts(range1).unwrap().unwrap();
assert!(expect.part_keys.len() == 1);
assert!(expect.start_offset == 0);
assert!(expect.end_offset == 0);

let range1 = Range {
start: 0,
end: 1024,
};
let expect = meta.compute_covered_parts(range1).unwrap().unwrap();
assert!(expect.part_keys.len() == 1);
assert!(expect.start_offset == 0);
assert!(expect.end_offset == 1);
assert!(expect.end_offset == 1023);

let range1 = Range {
start: 0,
end: 8190,
};
let expect = meta.compute_covered_parts(range1).unwrap();
let expect = meta.compute_covered_parts(range1).unwrap().unwrap();
assert!(expect.part_keys.len() == 8);
assert!(expect.start_offset == 0);
assert!(expect.end_offset == 1022);
assert!(expect.end_offset == 1021);

let range1 = Range {
start: 1023,
end: 1025,
};
let expect = meta.compute_covered_parts(range1).unwrap();
let expect = meta.compute_covered_parts(range1).unwrap().unwrap();
assert!(expect.part_keys.len() == 2);
assert!(expect.start_offset == 1023);
assert!(expect.end_offset == 1);
assert!(expect.end_offset == 0);

let range1 = Range {
start: 8189,
end: 8190,
};
let expect = meta.compute_covered_parts(range1).unwrap();
let expect = meta.compute_covered_parts(range1).unwrap().unwrap();
assert!(expect.part_keys.len() == 1);
assert!(expect.start_offset == 1021);
assert!(expect.end_offset == 1022);
assert!(expect.end_offset == 1021);

let range1 = Range {
start: 8189,
end: 8199,
};
let expect = meta.compute_covered_parts(range1);
assert!(expect.is_err());

let meta = build_test_meta1();
let range1 = Range {
start: 0,
end: 1024,
};
let expect = meta.compute_covered_parts(range1).unwrap().unwrap();
assert!(expect.part_keys.len() == 1);
assert!(expect.start_offset == 0);
assert!(expect.end_offset == 1023);

let range1 = Range { start: 0, end: 0 };
let expect = meta.compute_covered_parts(range1).unwrap();
assert!(expect.is_none());
}

fn build_test_meta() -> ObkvObjectMeta {
fn build_test_meta0() -> ObkvObjectMeta {
ObkvObjectMeta {
location: String::from("/test/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxfdsfjlajflk"),
last_modified: 123456789,
Expand All @@ -375,4 +407,16 @@ mod test {
version: String::from("123456fsdalfkassa;l;kjfaklasadffsd"),
}
}

fn build_test_meta1() -> ObkvObjectMeta {
MichaelLeeHZ marked this conversation as resolved.
Show resolved Hide resolved
ObkvObjectMeta {
location: String::from("/test/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxfdsfjlajflk"),
last_modified: 123456789,
size: 1024,
unique_id: Some(String::from("1245689u438uferjalfjkda")),
part_size: 1024,
parts: vec![String::from("/test/xx/0")],
version: String::from("123456fsdalfkassa;l;kjfaklasadffsd"),
}
}
}
97 changes: 49 additions & 48 deletions components/object_store/src/obkv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,68 +459,69 @@ impl<T: TableKv> ObjectStore for ObkvObjectStore<T> {
source,
})?;

let mut range_buffer = Vec::with_capacity(range.end - range.start);
let covered_parts = meta
.compute_covered_parts(range)
.compute_covered_parts(range.clone())
.box_err()
.map_err(|source| StoreError::NotFound {
path: location.to_string(),
source,
})?;

let keys: Vec<&[u8]> = covered_parts
.part_keys
.iter()
.map(|key| key.as_bytes())
.collect();
let values =
self.client
.get_batch(table_name, keys)
.map_err(|source| StoreError::NotFound {
path: location.to_string(),
source: Box::new(source),
})?;

if covered_parts.part_keys.len() != values.len() {
DataPartsLength {
part_len: covered_parts.part_keys.len(),
value_len: values.len(),
}
.fail()
.map_err(|source| StoreError::Generic {
store: OBKV,
source: Box::new(source),
})?
}

for (index, (key, value)) in covered_parts.part_keys.iter().zip(values).enumerate() {
if let Some(bytes) = value {
let mut begin = 0;
let mut end = bytes.len();
if index == 0 {
begin = covered_parts.start_offset;
}
// the last one
if index == covered_parts.part_keys.len() - 1 {
end = covered_parts.end_offset;
}
range_buffer.extend_from_slice(&bytes[begin..end]);
} else {
DataPartNotFound { part_key: key }
.fail()
if let Some(covered_parts) = covered_parts {
let mut range_buffer = Vec::with_capacity(range.end - range.start);
let keys: Vec<&[u8]> = covered_parts
.part_keys
.iter()
.map(|key| key.as_bytes())
.collect();
let values =
self.client
.get_batch(table_name, keys)
.map_err(|source| StoreError::NotFound {
path: location.to_string(),
source: Box::new(source),
})?;

if covered_parts.part_keys.len() != values.len() {
DataPartsLength {
part_len: covered_parts.part_keys.len(),
value_len: values.len(),
}
.fail()
.map_err(|source| StoreError::Generic {
store: OBKV,
source: Box::new(source),
})?
}
}

debug!(
"ObkvObjectStore get_range operation, location:{location}, table:{table_name}, cost:{:?}",
instant.elapsed()
);
for (index, (key, value)) in covered_parts.part_keys.iter().zip(values).enumerate() {
if let Some(bytes) = value {
let mut begin = 0;
let mut end = bytes.len() - 1;
if index == 0 {
begin = covered_parts.start_offset;
}
// the last one
if index == covered_parts.part_keys.len() - 1 {
end = covered_parts.end_offset;
}
range_buffer.extend_from_slice(&bytes[begin..=end]);
} else {
DataPartNotFound { part_key: key }
.fail()
.map_err(|source| StoreError::NotFound {
path: location.to_string(),
source: Box::new(source),
})?;
}
}

Ok(range_buffer.into())
debug!("ObkvObjectStore get_range operation, location:{location}, table:{table_name}, cost:{:?}", instant.elapsed());

return Ok(range_buffer.into());
} else {
return Ok(Bytes::new());
}
}

/// Return the metadata for the specified location
Expand Down