Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate errors to python and drop redundant lists iterations #8

Merged
merged 1 commit into from
Sep 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 32 additions & 32 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,16 @@ fn generate_groupby(
nas: i64,
seed: i64,
batch_size: i64,
) -> PyArrowType<RecordBatch> {
let distr_k = Uniform::<i64>::try_from(1..=k).unwrap();
let distr_nk = Uniform::<i64>::try_from(1..=(n / k)).unwrap();
let distr_5 = Uniform::<i64>::try_from(1..=5).unwrap();
let distr_15 = Uniform::<i64>::try_from(1..=15).unwrap();
let distr_float = Uniform::<f64>::try_from(0.0..=100.0).unwrap();
let distr_nas = Uniform::<i64>::try_from(0..=100).unwrap();
) -> PyResult<PyArrowType<RecordBatch>> {
let distr_k = Uniform::<i64>::try_from(1..=k)?;
let distr_nk = Uniform::<i64>::try_from(1..=(n / k))?;
let distr_5 = Uniform::<i64>::try_from(1..=5)?;
let distr_15 = Uniform::<i64>::try_from(1..=15)?;
let distr_float = Uniform::<f64>::try_from(0.0..=100.0)?;
let distr_nas = Uniform::<i64>::try_from(0..=100)?;
let mut rng = ChaCha8Rng::seed_from_u64(seed as u64);

let item_capacity = batch_size as usize;
let item_capacity = batch_size as usize; // validataion is on the python side

let mut id1_builder = StringBuilder::with_capacity(item_capacity, item_capacity * 8 * 5); // id{:03}, utf8
let mut id2_builder = StringBuilder::with_capacity(item_capacity, item_capacity * 8 * 5); // id{:03}, utf8
Expand Down Expand Up @@ -139,10 +139,9 @@ fn generate_groupby(
Arc::new(v2_builder.finish()),
Arc::new(v3_builder.finish()),
],
)
.unwrap();
).unwrap();

PyArrowType(batch)
Ok(PyArrowType(batch))
}

/**
Expand Down Expand Up @@ -170,13 +169,13 @@ fn generate_join_dataset_small(
seed: i64,
keys_seed: i64,
batch_size: i64,
) -> PyArrowType<RecordBatch> {
) -> PyResult<PyArrowType<RecordBatch>> {
let mut rng = ChaCha8Rng::seed_from_u64(seed as u64);
let mut keys_rng = ChaCha8Rng::seed_from_u64(keys_seed as u64);
let mut k1: Vec<i64> = (1..=(n * 11 / 10 / 1_000_000)).collect(); // original R line: key1 = split_xlr(N/1e6)
k1.shuffle(&mut keys_rng);

let distr_float = Uniform::<f64>::try_from(1.0..=100.0).unwrap();
let distr_float = Uniform::<f64>::try_from(1.0..=100.0)?;

// original R line (43:44)
// x = key[seq.int(1, n*0.9)],
Expand All @@ -194,16 +193,16 @@ fn generate_join_dataset_small(

kx.append(&mut kr);

let item_capacity = batch_size as usize;
let len_of_max_key = kx.iter().max().unwrap().to_string().len() + 2; // id{}, where {} is a number from a vec
let item_capacity = batch_size as usize; // validation is on the python side
let len_of_max_key = (n * 11 / 10 / 1_000_000).to_string().len() + 2; // id{}, where {} is a number from a vec

let mut id1_builder = Int64Builder::with_capacity(item_capacity);
let mut id4_builder =
StringBuilder::with_capacity(item_capacity, item_capacity * 8 * len_of_max_key); // utf8
let mut v2_builder = Float64Builder::with_capacity(item_capacity);

for _i in 0..batch_size {
let k1 = kx.choose(&mut rng).unwrap();
let k1 = kx.choose(&mut rng).unwrap(); // we know 100% that kx is non empty
id1_builder.append_value(*k1);
id4_builder.append_value(format!("id{}", k1));
v2_builder.append_value(distr_float.sample(&mut rng));
Expand All @@ -225,7 +224,7 @@ fn generate_join_dataset_small(
)
.unwrap();

PyArrowType(batch)
Ok(PyArrowType(batch))
}

/**
Expand Down Expand Up @@ -253,7 +252,7 @@ fn generate_join_dataset_medium(
seed: i64,
keys_seed: i64,
batch_size: i64,
) -> PyArrowType<RecordBatch> {
) -> PyResult<PyArrowType<RecordBatch>> {
let mut rng = ChaCha8Rng::seed_from_u64(seed as u64);
let mut keys_rng = ChaCha8Rng::seed_from_u64(keys_seed as u64);
let mut k1: Vec<i64> = (1..=(n * 11 / 10 / 1_000_000)).collect(); // original R line: key1 = split_xlr(N/1e6)
Expand All @@ -262,7 +261,7 @@ fn generate_join_dataset_medium(
let mut k2: Vec<i64> = (1..=(n * 11 / 10 / 1_000)).collect(); // original R line: key2 = split_xlr(N/1e3)
k2.shuffle(&mut keys_rng);

let distr_float = Uniform::<f64>::try_from(1.0..=100.0).unwrap();
let distr_float = Uniform::<f64>::try_from(1.0..=100.0)?;

// orginial line (43:44)
// x = key[seq.int(1, n*0.9)],
Expand Down Expand Up @@ -292,8 +291,8 @@ fn generate_join_dataset_medium(
k2x.append(&mut k2r);

let item_capacity = batch_size as usize;
let len_of_max_k1_key = k1x.iter().max().unwrap().to_string().len() + 2; // id{}, where {} is a number from a vec
let len_of_max_k2_key = k2x.iter().max().unwrap().to_string().len() + 2; // the same
let len_of_max_k1_key = (n * 11 / 10 / 1_000_000).to_string().len() + 2; // id{}, where {} is a number from a vec
let len_of_max_k2_key = (n * 11 / 10 / 1_000).to_string().len() + 2; // the same

let mut id1_builder = Int64Builder::with_capacity(item_capacity);
let mut id2_builder = Int64Builder::with_capacity(item_capacity);
Expand All @@ -304,8 +303,8 @@ fn generate_join_dataset_medium(
let mut v2_builder = Float64Builder::with_capacity(item_capacity);

for _i in 0..batch_size {
let k1 = k1x.choose(&mut rng).unwrap();
let k2 = k2x.choose(&mut rng).unwrap();
let k1 = k1x.choose(&mut rng).unwrap(); // we know 100% that k1x is non empty
let k2 = k2x.choose(&mut rng).unwrap(); // we know 100% that k2x is non empty

id1_builder.append_value(*k1);
id2_builder.append_value(*k2);
Expand Down Expand Up @@ -334,11 +333,11 @@ fn generate_join_dataset_medium(
)
.unwrap();

PyArrowType(batch)
Ok(PyArrowType(batch))
}

/**
Generate H2O join medium dataset.
Generate H2O join big dataset.
Running this function multiple time with the same seed
will constantly return exactly the same batch!

Expand Down Expand Up @@ -366,7 +365,7 @@ fn generate_join_dataset_big(
seed: i64,
keys_seed: i64,
batch_size: i64,
) -> PyArrowType<RecordBatch> {
) -> PyResult<PyArrowType<RecordBatch>> {
let mut rng = ChaCha8Rng::seed_from_u64(seed as u64);
let mut keys_rng = ChaCha8Rng::seed_from_u64(keys_seed as u64);
let mut k1: Vec<i64> = (1..=(n * 11 / 10 / 1_000_000)).collect(); // original R line: key1 = split_xlr(N/1e6)
Expand All @@ -378,8 +377,8 @@ fn generate_join_dataset_big(
let mut k3: Vec<i64> = (1..=(n * 11 / 10)).collect(); // original R line: key3 = split_xlr(N)
k3.shuffle(&mut keys_rng);

let distr_float = Uniform::<f64>::try_from(1.0..=100.0).unwrap();
let distr_nas = Uniform::<i64>::try_from(0..=100).unwrap();
let distr_float = Uniform::<f64>::try_from(1.0..=100.0)?;
let distr_nas = Uniform::<i64>::try_from(0..=100)?;

// orginial line (43:44)
// x = key[seq.int(1, n*0.9)],
Expand All @@ -404,9 +403,9 @@ fn generate_join_dataset_big(
.to_vec();

let item_capacity = batch_size as usize;
let len_of_max_k1_key = k1.iter().max().unwrap().to_string().len() + 2; // id{}, where {} is a number from a vec
let len_of_max_k2_key = k2.iter().max().unwrap().to_string().len() + 2; // the same
let len_of_max_k3_key = k3.iter().max().unwrap().to_string().len() + 2; // the same
let len_of_max_k1_key = (n * 11 / 10 / 1_000_000).to_string().len() + 2; // id{}, where {} is a number from a vec
let len_of_max_k2_key = (n * 11 / 10 / 1_000).to_string().len() + 2; // the same
let len_of_max_k3_key = (n * 11 / 10).to_string().len() + 2; // the same

let mut id1_builder = Int64Builder::with_capacity(item_capacity);
let mut id2_builder = Int64Builder::with_capacity(item_capacity);
Expand All @@ -420,6 +419,7 @@ fn generate_join_dataset_big(
let mut v2_builder = Float64Builder::with_capacity(item_capacity);

for _i in 0..batch_size {
// We exactly know at the moment that k1, k2, k3 are non empty
let k1 = k1.choose(&mut rng).unwrap();
let k2 = k2.choose(&mut rng).unwrap();
let k3 = k3.choose(&mut rng).unwrap();
Expand Down Expand Up @@ -473,7 +473,7 @@ fn generate_join_dataset_big(
)
.unwrap();

PyArrowType(batch)
Ok(PyArrowType(batch))
}

#[pymodule]
Expand Down