Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: parse TFRecords as Arrow data #1166

Merged
merged 14 commits into from
Aug 30, 2023
Merged

Conversation

wjones127
Copy link
Contributor

@wjones127 wjones127 commented Aug 23, 2023

Adds utility functions for reading TFRecords files:

import pyarrow as pa

def infer_tfrecord_schema(
    uri: str, 
    tensor_features: Optional[List[str]],
    string_features: Optional[List[str]]
) -> pa.Schema:
    """Infer the schema for a TFRecord dataset"""
    ...

def read_tfrecord(uri: str, schema: pa.Schema) -> pa.RecordBatchReader:
    """
    Read a TFRecord file as a stream of record batches. This can be fed directly
    into lance.write_dataset().
    """
    ...

Closes: #1165

@wjones127 wjones127 changed the title wip: parse TFRecords as Arrow data feat: parse TFRecords as Arrow data Aug 24, 2023
@wjones127 wjones127 force-pushed the wjones127/tfrecord-to-arrow branch from 98b3503 to e6113c9 Compare August 25, 2023 17:20
Comment on lines -286 to -304
} else if !expanded_path.is_dir() {
return Err(Error::IO {
message: format!("{} is not a lance directory", str_path),
});
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is necessary so we can re-use object store for other purposes. Otherwise it is tightly coupled to the lance directory.

@wjones127 wjones127 marked this pull request as ready for review August 25, 2023 17:50
@wjones127 wjones127 requested review from eddyxu and westonpace August 25, 2023 17:50
Copy link
Contributor

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great work, especially so quickly. I think my main concern is that there are a lot of unwraps in the proto parsing. Wouldn't it be better for malformed files to lead to errors instead of panics?

python/Cargo.toml Show resolved Hide resolved
python/src/lib.rs Outdated Show resolved Hide resolved
rust/src/utils/tfrecord.rs Show resolved Hide resolved
python/python/tests/test_tf.py Show resolved Hide resolved
rust/src/utils/tfrecord.rs Outdated Show resolved Hide resolved

/// Check if a feature has more than 1 value.
fn feature_is_repeated(feature: &tfrecord::Feature) -> bool {
match feature.kind.as_ref().unwrap() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is there an unwrap here? Could it fail?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All protobuf fields are "potentially" missing, but I think it's safe to say this one will always be filled in? What do you think?

rust/src/utils/tfrecord.rs Outdated Show resolved Hide resolved
Comment on lines 202 to 222
Kind::BytesList(data) => match self.feature_type {
FeatureType::String => FeatureType::String,
FeatureType::Binary => FeatureType::Binary,
FeatureType::Tensor { .. } => {
let val = &data.value[0];
let tensor_proto = TensorProto::decode(val.as_slice()).unwrap();
FeatureType::Tensor {
shape: tensor_proto
.tensor_shape
.as_ref()
.unwrap()
.dim
.iter()
.map(|d| d.size)
.collect(),
dtype: tensor_proto.dtype(),
}
}
_ => {
return Err(Error::IO {
message: format!(
"Data type mismatch: expected {:?}, got {:?}",
self.feature_type,
feature.kind.as_ref().unwrap()
),
})
}
},
Kind::FloatList(_) => FeatureType::Float,
Kind::Int64List(_) => FeatureType::Integer,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block is very similar to the block in new. Is there a helper method here? extract_type(feature: &Feature) -> FeatureType?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The full block is subtly different from new() (doesn't have access to the same parameters), but I did extract the tensor parsing into a helper, which I think makes it simpler. LMK what you think.

rust/src/io/object_store.rs Show resolved Hide resolved
Copy link
Contributor

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks ready to me. Awesome feature!

Comment on lines +78 to +97
/// Spawn a task in the background
pub fn spawn_background<T>(&self, py: Option<Python<'_>>, task: T)
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
if let Some(py) = py {
py.allow_threads(|| {
self.runtime.spawn(task);
})
} else {
// Python::with_gil is a no-op if the GIL is already held by the thread.
Python::with_gil(|py| {
py.allow_threads(|| {
self.runtime.spawn(task);
})
})
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's used in read_tfrecord. The pattern I've found for now for exporting streams as recordbatchreaders is to shove the stream on a background task and have it push onto the iterator via a channel. It's a little awkward but seems to work okay.

Comment on lines 759 to 781
/// Given a potentially unaligned slice, append the slice to the builder.
fn append_primitive_from_slice<T>(
builder: &mut PrimitiveBuilder<T>,
slice: &[u8],
parse_val: impl Fn(&[u8]) -> T::Native,
) where
T: arrow::datatypes::ArrowPrimitiveType,
{
let (prefix, middle, suffix) = unsafe { slice.align_to::<T::Native>() };
for val in prefix.chunks_exact(T::get_byte_width()) {
builder.append_value(parse_val(val));
}

builder.append_slice(middle);

for val in suffix.chunks_exact(T::get_byte_width()) {
builder.append_value(parse_val(val));
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just out of curiosity, what's going on here? What's the general policy for unsafe blocks in lance?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh this is to convert from a &[u8] (slice of bytes) to a &[T::Native] (f32 for a Float32Array, i64 for a Int64Array, etc.). The slice of bytes isn't guaranteed to be aligned, so this breaks it up into three pieces: the unaligned prefix, an aligned middle part, and the unaligned suffix.

What's the general policy for unsafe blocks in lance?
We haven't established a policy, but one we could adopt is: when ever you have an unsafe block, add a comment above explaining why the next line is sound.

Here, the soundness of align_to depends on whether the bytes that form the middle piece are actually valid values for the T::Native type. I'm not 100% certain, but I don't think this applies to f32 or i64. Are there any 32-bit values that aren't a valid f32? Or 64-bit values that aren't a valid i64? I could see this being an issue for boolean values represented with a byte; there's only two valid values but 256 possible values, so 244 possible invalid values.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see now, I just read up on align_to. It's a nice convenience :). The only other thing I could think of would be endianness. For the half_val stuff we are fortunate that protobuf will handle endianess for us. However, for the tensor_content option it looks like tensorflow is just reinterpret casting the data bytes. So if the tfrecord file is created on a machine with one kind of endianess and then read on a machine with a different kind of endianess you will get back garbage.

However, maybe just a comment / to-do PR and we can worry about it later when we have users / use cases for big endian machines.

@wjones127 wjones127 force-pushed the wjones127/tfrecord-to-arrow branch from 532f175 to 33d4209 Compare August 30, 2023 19:01
@wjones127 wjones127 force-pushed the wjones127/tfrecord-to-arrow branch from 33d4209 to 0a1ad2e Compare August 30, 2023 21:00
@wjones127 wjones127 merged commit 3a11144 into main Aug 30, 2023
@wjones127 wjones127 deleted the wjones127/tfrecord-to-arrow branch August 30, 2023 21:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add utility to read TFRecord file as Arrow data
2 participants