diff --git a/python/benchmarks/bench_utils.py b/python/benchmarks/bench_utils.py index 5e3a84b21d..967e48df34 100644 --- a/python/benchmarks/bench_utils.py +++ b/python/benchmarks/bench_utils.py @@ -39,7 +39,7 @@ def read_file(uri) -> bytes: if not urlparse(uri).scheme: - uri = pathlib.Path(uri) + uri = pathlib.Path(uri).expanduser().absolute() fs, key = pyarrow.fs.FileSystem.from_uri(uri) return fs.open_input_file(key).read() @@ -221,16 +221,17 @@ def default_dataset_path(self, fmt, flavor=None): suffix = f"_{flavor}" if flavor else "" return os.path.join(self.uri_root, f"{self.name}{suffix}.{fmt}") - def save_df(self, df, fmt="lance", output_path=None): + def save_df(self, df, fmt="lance", output_path=None, **kwargs): output_path = output_path or self.default_dataset_path(fmt, "links") table = self._convert_metadata_df(df) if fmt == "parquet": - pq.write_table(table, output_path) + pq.write_table(table, output_path, **kwargs) elif fmt == "lance": pa.dataset.write_dataset( table, output_path, format=lance.LanceFileFormat(), + **kwargs, ) return table diff --git a/python/benchmarks/parse_coco.py b/python/benchmarks/parse_coco.py index f9ccea639f..061f38c542 100755 --- a/python/benchmarks/parse_coco.py +++ b/python/benchmarks/parse_coco.py @@ -65,7 +65,7 @@ def read_split(split): return self._instances_to_df(split, json_data) df = pd.concat([read_split(split) for split in ["train", "val"]]) - df['date_captured'] = pd.to_datetime(df.date_captured) # lance GH#98 + df["date_captured"] = pd.to_datetime(df.date_captured) # lance GH#98 return df def _convert_metadata_df(self, df: pd.DataFrame) -> pa.Table: @@ -88,23 +88,22 @@ def _convert_field(self, name, typ, col): offsets = native_arr.offsets values = native_arr.values.to_numpy(zero_copy_only=False) return pa.ListArray.from_arrays( - offsets, self._convert_field( - f'{name}.elements', typ.value_type, values) + offsets, self._convert_field(f"{name}.elements", typ.value_type, values) ) elif pa.types.is_struct(typ): native_arr = pa.array(col) arrays = [] for subfield in typ: sub_arr = native_arr.field(subfield.name) - if name == 'annotations' and subfield.name == 'name': + if name == "annotations" and subfield.name == "name": converted = self._convert_name_column( - sub_arr, native_arr.field('category_id') + sub_arr, native_arr.field("category_id") ) else: converted = self._convert_field( f"{name}.{subfield.name}", subfield.type, - sub_arr.to_numpy(zero_copy_only=False) + sub_arr.to_numpy(zero_copy_only=False), ) arrays.append(converted) return pa.StructArray.from_arrays(arrays, fields=typ) @@ -115,17 +114,21 @@ def _convert_field(self, name, typ, col): def _convert_name_column(self, name_arr, category_id_arr): coco_classes = pd.read_csv("coco_classes.csv", header=0, index_col=None) # let's make sure the actual data matches - check = pd.Series(dict(zip(name_arr.values.to_numpy(False), - category_id_arr.values.to_numpy(False))) - ).to_frame(name='check_id') - joined = coco_classes.set_index('name').join(check, how='right') + check = pd.Series( + dict( + zip( + name_arr.values.to_numpy(False), + category_id_arr.values.to_numpy(False), + ) + ) + ).to_frame(name="check_id") + joined = coco_classes.set_index("name").join(check, how="right") mask = pd.notnull(joined.check_id) filtered = joined[mask] if not (filtered.check_id == filtered.category_id).all(): raise ValueError(f"Category id check failed") dict_arr = pa.DictionaryArray.from_pandas( - pd.Categorical(name_arr.values.to_numpy(False), - coco_classes.name.values) + pd.Categorical(name_arr.values.to_numpy(False), coco_classes.name.values) ) assert not pd.isna(dict_arr.indices.to_numpy()).all() return pa.ListArray.from_arrays(name_arr.offsets, dict_arr) @@ -153,7 +156,7 @@ def get_schema(self): ImageType.from_storage(pa.utf8()), pa.int64(), pa.int64(), - pa.timestamp('ns'), + pa.timestamp("ns"), ImageType.from_storage(pa.utf8()), pa.int64(), pa.dictionary(pa.int8(), pa.utf8()), @@ -218,13 +221,28 @@ def _aggregate_annotations(annotations): ) @click.option("-f", "--fmt", type=str, help="Output format (parquet or lance)") @click.option("-e", "--embedded", type=bool, default=True, help="Embed images") +@click.option( + "-g", + "--group-size", + type=int, + default=1024, + help="set the group size", + show_default=True, +) +@click.option( + "--max-rows-per-file", + type=int, + default=0, + help="set the max rows per file", + show_default=True, +) @click.option( "-o", "--output-path", type=str, help="Output path. Default is {base_uri}/coco_links.{fmt}", ) -def main(base_uri, version, fmt, embedded, output_path): +def main(base_uri, version, fmt, embedded, output_path, group_size: int, max_rows_per_file: int): converter = CocoConverter(base_uri, version=version) df = converter.read_metadata() known_formats = ["lance", "parquet"] @@ -233,11 +251,19 @@ def main(base_uri, version, fmt, embedded, output_path): fmt = [fmt] else: fmt = known_formats + + kwargs = { + "existing_data_behavior": "overwrite_or_ignore", + "partitioning": ["split"], + "partitioning_flavor": "hive", + "max_rows_per_group": group_size, + "max_rows_per_file": max_rows_per_file, + } for f in fmt: if embedded: - converter.make_embedded_dataset(df, f, output_path) + converter.make_embedded_dataset(df, f, output_path, **kwargs) else: - return converter.save_df(df, f, output_path) + return converter.save_df(df, f, output_path, **kwargs) if __name__ == "__main__":