Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
XiangpengHao committed Dec 22, 2024
2 parents 12a4c7d + 350ea26 commit 4c66204
Show file tree
Hide file tree
Showing 35 changed files with 1,229 additions and 155 deletions.
2 changes: 1 addition & 1 deletion arrow-array/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ chrono = { workspace = true }
chrono-tz = { version = "0.10", optional = true }
num = { version = "0.4.1", default-features = false, features = ["std"] }
half = { version = "2.1", default-features = false, features = ["num-traits"] }
hashbrown = { version = "0.14.2", default-features = false }
hashbrown = { version = "0.15.1", default-features = false }

[features]
ffi = ["arrow-schema/ffi", "arrow-data/ffi"]
Expand Down
79 changes: 70 additions & 9 deletions arrow-array/src/array/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,18 @@ use std::sync::Arc;

use super::ByteArrayType;

/// [Variable-size Binary View Layout]: An array of variable length bytes view arrays.
/// [Variable-size Binary View Layout]: An array of variable length bytes views.
///
/// This array type is used to store variable length byte data (e.g. Strings, Binary)
/// and has efficient operations such as `take`, `filter`, and comparison.
///
/// [Variable-size Binary View Layout]: https://arrow.apache.org/docs/format/Columnar.html#variable-size-binary-view-layout
///
/// This is different from [`GenericByteArray`] as it stores both an offset and
/// length meaning that take / filter operations can be implemented without
/// copying the underlying data. In addition, it stores an inlined prefix which
/// can be used to speed up comparisons.
/// This is different from [`GenericByteArray`], which also stores variable
/// length byte data, as it represents strings with an offset and length. `take`
/// and `filter` like operations are implemented by manipulating the "views"
/// (`u128`) without modifying the bytes. Each view also stores an inlined
/// prefix which speed up comparisons.
///
/// # See Also
///
Expand All @@ -50,11 +54,18 @@ use super::ByteArrayType;
///
/// [`ByteView`]: arrow_data::ByteView
///
/// # Notes
/// # Use the [`eq`] kernel to compare the logical content.
///
/// Comparing two `GenericByteViewArray` using PartialEq compares by structure
/// (the `u128`s) and contents of the buffers, not by logical content. As there
/// are many different buffer layouts to represent the same data (e.g. different
/// offsets, different buffer sizes, etc) two arrays with the same data may not
/// compare equal.
///
/// To compare the logical content of two `GenericByteViewArray`s, use the [`eq`]
/// kernel.
///
/// Comparing two `GenericByteViewArray` using PartialEq compares by structure,
/// not by value. as there are many different buffer layouts to represent the
/// same data (e.g. different offsets, different buffer sizes, etc).
/// [`eq`]: https://docs.rs/arrow/latest/arrow/compute/kernels/cmp/fn.eq.html
///
/// # Layout: "views" and buffers
///
Expand Down Expand Up @@ -86,6 +97,52 @@ use super::ByteArrayType;
/// view and the entire string is stored in one of the buffers. See [`ByteView`]
/// to access the fields of the these views.
///
/// As with other arrays, the optimized kernels in [`arrow_compute`] are likely
/// the easiest and fastest way to work with this data. However, it is possible
/// to access the views and buffers directly for more control.
///
/// For example
///
/// ```rust
/// # use arrow_array::StringViewArray;
/// # use arrow_array::Array;
/// use arrow_data::ByteView;
/// let array = StringViewArray::from(vec![
/// "hello",
/// "this string is longer than 12 bytes",
/// "this string is also longer than 12 bytes"
/// ]);
///
/// // ** Examine the first view (short string) **
/// assert!(array.is_valid(0)); // Check for nulls
/// let short_view: u128 = array.views()[0]; // "hello"
/// // get length of the string
/// let len = short_view as u32;
/// assert_eq!(len, 5); // strings less than 12 bytes are stored in the view
/// // SAFETY: `view` is a valid view
/// let value = unsafe {
/// StringViewArray::inline_value(&short_view, len as usize)
/// };
/// assert_eq!(value, b"hello");
///
/// // ** Examine the third view (long string) **
/// assert!(array.is_valid(12)); // Check for nulls
/// let long_view: u128 = array.views()[2]; // "this string is also longer than 12 bytes"
/// let len = long_view as u32;
/// assert_eq!(len, 40); // strings longer than 12 bytes are stored in the buffer
/// let view = ByteView::from(long_view); // use ByteView to access the fields
/// assert_eq!(view.length, 40);
/// assert_eq!(view.buffer_index, 0);
/// assert_eq!(view.offset, 35); // data starts after the first long string
/// // Views for long strings store a 4 byte prefix
/// let prefix = view.prefix.to_le_bytes();
/// assert_eq!(&prefix, b"this");
/// let value = array.value(2); // get the string value (see `value` implementation for how to access the bytes directly)
/// assert_eq!(value, "this string is also longer than 12 bytes");
/// ```
///
/// [`arrow_compute`]: https://docs.rs/arrow/latest/arrow/compute/index.html
///
/// Unlike [`GenericByteArray`], there are no constraints on the offsets other
/// than they must point into a valid buffer. However, they can be out of order,
/// non continuous and overlapping.
Expand Down Expand Up @@ -694,6 +751,8 @@ where

/// A [`GenericByteViewArray`] of `[u8]`
///
/// See [`GenericByteViewArray`] for format and layout details.
///
/// # Example
/// ```
/// use arrow_array::BinaryViewArray;
Expand Down Expand Up @@ -733,6 +792,8 @@ impl From<Vec<Option<&[u8]>>> for BinaryViewArray {

/// A [`GenericByteViewArray`] that stores utf8 data
///
/// See [`GenericByteViewArray`] for format and layout details.
///
/// # Example
/// ```
/// use arrow_array::StringViewArray;
Expand Down
21 changes: 21 additions & 0 deletions arrow-array/src/builder/boolean_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ impl BooleanBuilder {
self.null_buffer_builder.append_n_non_nulls(v.len());
}

/// Appends n `additional` bits of value `v` into the buffer
#[inline]
pub fn append_n(&mut self, additional: usize, v: bool) {
self.values_builder.append_n(additional, v);
self.null_buffer_builder.append_n_non_nulls(additional);
}

/// Appends values from a slice of type `T` and a validity boolean slice.
///
/// Returns an error if the slices are of different lengths
Expand Down Expand Up @@ -325,4 +332,18 @@ mod tests {
&[false, false, true, false, false, true, true, false]
)
}

#[test]
fn test_boolean_array_builder_append_n() {
let mut builder = BooleanBuilder::new();
builder.append_n(3, true);
builder.append_n(2, false);
let array = builder.finish();
assert_eq!(3, array.true_count());
assert_eq!(2, array.false_count());
assert_eq!(0, array.null_count());

let values = array.iter().map(|x| x.unwrap()).collect::<Vec<_>>();
assert_eq!(&values, &[true, true, true, false, false])
}
}
61 changes: 57 additions & 4 deletions arrow-array/src/builder/generic_bytes_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use arrow_buffer::NullBufferBuilder;
use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
use arrow_data::ArrayDataBuilder;
use std::any::Any;
use std::fmt::Write;
use std::sync::Arc;

/// Builder for [`GenericByteArray`]
Expand Down Expand Up @@ -287,10 +286,9 @@ impl<T: ByteArrayType, V: AsRef<T::Native>> Extend<Option<V>> for GenericByteBui
/// assert_eq!(array.value(0), "foobarbaz");
/// assert_eq!(array.value(1), "v2");
/// ```
///
pub type GenericStringBuilder<O> = GenericByteBuilder<GenericStringType<O>>;

impl<O: OffsetSizeTrait> Write for GenericStringBuilder<O> {
impl<O: OffsetSizeTrait> std::fmt::Write for GenericStringBuilder<O> {
fn write_str(&mut self, s: &str) -> std::fmt::Result {
self.value_builder.append_slice(s.as_bytes());
Ok(())
Expand Down Expand Up @@ -318,13 +316,50 @@ impl<O: OffsetSizeTrait> Write for GenericStringBuilder<O> {
/// assert_eq!(array.value(0), b"foo");
/// assert_eq!(array.value(1), b"\x00\x01\x02");
/// ```
///
/// # Example incrementally writing bytes with `write_bytes`
///
/// ```
/// # use std::io::Write;
/// # use arrow_array::builder::GenericBinaryBuilder;
/// let mut builder = GenericBinaryBuilder::<i32>::new();
///
/// // Write data in multiple `write_bytes` calls
/// write!(builder, "foo").unwrap();
/// write!(builder, "bar").unwrap();
/// // The next call to append_value finishes the current string
/// // including all previously written strings.
/// builder.append_value("baz");
///
/// // Write second value with a single write call
/// write!(builder, "v2").unwrap();
/// // finish the value by calling append_value with an empty string
/// builder.append_value("");
///
/// let array = builder.finish();
/// assert_eq!(array.value(0), "foobarbaz".as_bytes());
/// assert_eq!(array.value(1), "v2".as_bytes());
/// ```
pub type GenericBinaryBuilder<O> = GenericByteBuilder<GenericBinaryType<O>>;

impl<O: OffsetSizeTrait> std::io::Write for GenericBinaryBuilder<O> {
fn write(&mut self, bs: &[u8]) -> std::io::Result<usize> {
self.value_builder.append_slice(bs);
Ok(bs.len())
}

fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::array::Array;
use crate::GenericStringArray;
use std::fmt::Write as _;
use std::io::Write as _;

fn _test_generic_binary_builder<O: OffsetSizeTrait>() {
let mut builder = GenericBinaryBuilder::<O>::new();
Expand Down Expand Up @@ -527,7 +562,7 @@ mod tests {
}

#[test]
fn test_write() {
fn test_write_str() {
let mut builder = GenericStringBuilder::<i32>::new();
write!(builder, "foo").unwrap();
builder.append_value("");
Expand All @@ -540,4 +575,22 @@ mod tests {
let r: Vec<_> = a.iter().flatten().collect();
assert_eq!(r, &["foo", "bar\n", "fizbuz"])
}

#[test]
fn test_write_bytes() {
let mut builder = GenericBinaryBuilder::<i32>::new();
write!(builder, "foo").unwrap();
builder.append_value("");
writeln!(builder, "bar").unwrap();
builder.append_value("");
write!(builder, "fiz").unwrap();
write!(builder, "buz").unwrap();
builder.append_value("");
let a = builder.finish();
let r: Vec<_> = a.iter().flatten().collect();
assert_eq!(
r,
&["foo".as_bytes(), "bar\n".as_bytes(), "fizbuz".as_bytes()]
)
}
}
21 changes: 16 additions & 5 deletions arrow-csv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,25 @@ use arrow_schema::ArrowError;
fn map_csv_error(error: csv::Error) -> ArrowError {
match error.kind() {
csv::ErrorKind::Io(error) => ArrowError::CsvError(error.to_string()),
csv::ErrorKind::Utf8 { pos: _, err } => ArrowError::CsvError(format!(
"Encountered UTF-8 error while reading CSV file: {err}"
csv::ErrorKind::Utf8 { pos, err } => ArrowError::CsvError(format!(
"Encountered UTF-8 error while reading CSV file: {}{}",
err,
pos.as_ref()
.map(|pos| format!(" at line {}", pos.line()))
.unwrap_or_default(),
)),
csv::ErrorKind::UnequalLengths {
expected_len, len, ..
pos,
expected_len,
len,
} => ArrowError::CsvError(format!(
"Encountered unequal lengths between records on CSV file. Expected {len} \
records, found {expected_len} records"
"Encountered unequal lengths between records on CSV file. Expected {} \
records, found {} records{}",
len,
expected_len,
pos.as_ref()
.map(|pos| format!(" at line {}", pos.line()))
.unwrap_or_default(),
)),
_ => ArrowError::CsvError("Error reading CSV file".to_string()),
}
Expand Down
16 changes: 16 additions & 0 deletions arrow-csv/src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2591,6 +2591,22 @@ mod tests {
}
}

#[test]
fn test_record_length_mismatch() {
let csv = "\
a,b,c\n\
1,2,3\n\
4,5\n\
6,7,8";
let mut read = Cursor::new(csv.as_bytes());
let result = Format::default()
.with_header(true)
.infer_schema(&mut read, None);
assert!(result.is_err());
// Include line number in the error message to help locate and fix the issue
assert_eq!(result.err().unwrap().to_string(), "Csv error: Encountered unequal lengths between records on CSV file. Expected 2 records, found 3 records at line 3");
}

#[test]
fn test_comment() {
let schema = Schema::new(vec![
Expand Down
2 changes: 1 addition & 1 deletion arrow-flight/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ flight-sql-experimental = ["arrow-arith", "arrow-data", "arrow-ord", "arrow-row"
tls = ["tonic/tls"]

# Enable CLI tools
cli = ["anyhow", "arrow-cast/prettyprint", "clap", "tracing-log", "tracing-subscriber", "tonic/tls-webpki-roots"]
cli = ["anyhow", "arrow-array/chrono-tz", "arrow-cast/prettyprint", "clap", "tracing-log", "tracing-subscriber", "tonic/tls-webpki-roots"]

[dev-dependencies]
arrow-cast = { workspace = true, features = ["prettyprint"] }
Expand Down
53 changes: 35 additions & 18 deletions arrow-flight/examples/flight_sql_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1006,30 +1006,36 @@ mod tests {
async fn test_auth() {
test_all_clients(|mut client| async move {
// no handshake
assert!(client
.prepare("select 1;".to_string(), None)
.await
.unwrap_err()
.to_string()
.contains("No authorization header"));
assert_contains(
client
.prepare("select 1;".to_string(), None)
.await
.unwrap_err()
.to_string(),
"No authorization header",
);

// Invalid credentials
assert!(client
.handshake("admin", "password2")
.await
.unwrap_err()
.to_string()
.contains("Invalid credentials"));
assert_contains(
client
.handshake("admin", "password2")
.await
.unwrap_err()
.to_string(),
"Invalid credentials",
);

// Invalid Tokens
client.handshake("admin", "password").await.unwrap();
client.set_token("wrong token".to_string());
assert!(client
.prepare("select 1;".to_string(), None)
.await
.unwrap_err()
.to_string()
.contains("invalid token"));
assert_contains(
client
.prepare("select 1;".to_string(), None)
.await
.unwrap_err()
.to_string(),
"invalid token",
);

client.clear_token();

Expand All @@ -1039,4 +1045,15 @@ mod tests {
})
.await
}

fn assert_contains(actual: impl AsRef<str>, searched_for: impl AsRef<str>) {
let actual = actual.as_ref();
let searched_for = searched_for.as_ref();
assert!(
actual.contains(searched_for),
"Expected '{}' to contain '{}'",
actual,
searched_for
);
}
}
Loading

0 comments on commit 4c66204

Please sign in to comment.