-
Notifications
You must be signed in to change notification settings - Fork 245
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: parse TFRecords as Arrow data #1166
Conversation
98b3503
to
e6113c9
Compare
} else if !expanded_path.is_dir() { | ||
return Err(Error::IO { | ||
message: format!("{} is not a lance directory", str_path), | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is necessary so we can re-use object store for other purposes. Otherwise it is tightly coupled to the lance directory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great work, especially so quickly. I think my main concern is that there are a lot of unwraps in the proto parsing. Wouldn't it be better for malformed files to lead to errors instead of panics?
|
||
/// Check if a feature has more than 1 value. | ||
fn feature_is_repeated(feature: &tfrecord::Feature) -> bool { | ||
match feature.kind.as_ref().unwrap() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is there an unwrap
here? Could it fail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All protobuf fields are "potentially" missing, but I think it's safe to say this one will always be filled in? What do you think?
rust/src/utils/tfrecord.rs
Outdated
Kind::BytesList(data) => match self.feature_type { | ||
FeatureType::String => FeatureType::String, | ||
FeatureType::Binary => FeatureType::Binary, | ||
FeatureType::Tensor { .. } => { | ||
let val = &data.value[0]; | ||
let tensor_proto = TensorProto::decode(val.as_slice()).unwrap(); | ||
FeatureType::Tensor { | ||
shape: tensor_proto | ||
.tensor_shape | ||
.as_ref() | ||
.unwrap() | ||
.dim | ||
.iter() | ||
.map(|d| d.size) | ||
.collect(), | ||
dtype: tensor_proto.dtype(), | ||
} | ||
} | ||
_ => { | ||
return Err(Error::IO { | ||
message: format!( | ||
"Data type mismatch: expected {:?}, got {:?}", | ||
self.feature_type, | ||
feature.kind.as_ref().unwrap() | ||
), | ||
}) | ||
} | ||
}, | ||
Kind::FloatList(_) => FeatureType::Float, | ||
Kind::Int64List(_) => FeatureType::Integer, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block is very similar to the block in new
. Is there a helper method here? extract_type(feature: &Feature) -> FeatureType
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The full block is subtly different from new()
(doesn't have access to the same parameters), but I did extract the tensor parsing into a helper, which I think makes it simpler. LMK what you think.
c1735fa
to
882d8b8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks ready to me. Awesome feature!
/// Spawn a task in the background | ||
pub fn spawn_background<T>(&self, py: Option<Python<'_>>, task: T) | ||
where | ||
T: Future + Send + 'static, | ||
T::Output: Send + 'static, | ||
{ | ||
if let Some(py) = py { | ||
py.allow_threads(|| { | ||
self.runtime.spawn(task); | ||
}) | ||
} else { | ||
// Python::with_gil is a no-op if the GIL is already held by the thread. | ||
Python::with_gil(|py| { | ||
py.allow_threads(|| { | ||
self.runtime.spawn(task); | ||
}) | ||
}) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this still needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's used in read_tfrecord
. The pattern I've found for now for exporting streams as recordbatchreaders is to shove the stream on a background task and have it push onto the iterator via a channel. It's a little awkward but seems to work okay.
rust/src/utils/tfrecord.rs
Outdated
/// Given a potentially unaligned slice, append the slice to the builder. | ||
fn append_primitive_from_slice<T>( | ||
builder: &mut PrimitiveBuilder<T>, | ||
slice: &[u8], | ||
parse_val: impl Fn(&[u8]) -> T::Native, | ||
) where | ||
T: arrow::datatypes::ArrowPrimitiveType, | ||
{ | ||
let (prefix, middle, suffix) = unsafe { slice.align_to::<T::Native>() }; | ||
for val in prefix.chunks_exact(T::get_byte_width()) { | ||
builder.append_value(parse_val(val)); | ||
} | ||
|
||
builder.append_slice(middle); | ||
|
||
for val in suffix.chunks_exact(T::get_byte_width()) { | ||
builder.append_value(parse_val(val)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just out of curiosity, what's going on here? What's the general policy for unsafe blocks in lance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh this is to convert from a &[u8]
(slice of bytes) to a &[T::Native]
(f32
for a Float32Array, i64
for a Int64Array, etc.). The slice of bytes isn't guaranteed to be aligned, so this breaks it up into three pieces: the unaligned prefix, an aligned middle part, and the unaligned suffix.
What's the general policy for unsafe blocks in lance?
We haven't established a policy, but one we could adopt is: when ever you have an unsafe block, add a comment above explaining why the next line is sound.
Here, the soundness of align_to
depends on whether the bytes that form the middle piece are actually valid values for the T::Native
type. I'm not 100% certain, but I don't think this applies to f32 or i64. Are there any 32-bit values that aren't a valid f32? Or 64-bit values that aren't a valid i64? I could see this being an issue for boolean values represented with a byte; there's only two valid values but 256 possible values, so 244 possible invalid values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see now, I just read up on align_to. It's a nice convenience :). The only other thing I could think of would be endianness. For the half_val
stuff we are fortunate that protobuf will handle endianess for us. However, for the tensor_content
option it looks like tensorflow is just reinterpret casting the data bytes. So if the tfrecord file is created on a machine with one kind of endianess and then read on a machine with a different kind of endianess you will get back garbage.
However, maybe just a comment / to-do PR and we can worry about it later when we have users / use cases for big endian machines.
532f175
to
33d4209
Compare
33d4209
to
0a1ad2e
Compare
Adds utility functions for reading TFRecords files:
Closes: #1165