Skip to content

Commit

Permalink
Minor fixes for benchmarks (#115)
Browse files Browse the repository at this point in the history
  • Loading branch information
changhiskhan authored Aug 16, 2022
1 parent c11939d commit beb1d92
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 74 deletions.
71 changes: 41 additions & 30 deletions python/benchmarks/bench_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,12 @@ def get_dataset(uri: str) -> ds.Dataset:
"""
Return a pyarrow Dataset stored at the given uri
"""
if uri.endswith('.lance'):
if uri.endswith(".lance"):
return lance.dataset(uri)
return ds.dataset(uri)


def get_uri(base_uri: str, dataset_name: str, fmt: str,
flavor: str = None) -> str:
def get_uri(base_uri: str, dataset_name: str, fmt: str, flavor: str = None) -> str:
"""
Return the uri to the dataset with the given specifications
Expand All @@ -92,7 +91,6 @@ def get_uri(base_uri: str, dataset_name: str, fmt: str,


class BenchmarkSuite:

def __init__(self, name: str):
self.name = name
self._benchmarks = {}
Expand All @@ -114,26 +112,38 @@ def list_benchmarks(self):

def create_main(self):
@click.command
@click.option('-u', '--base-uri', required=True, type=str,
help="Base uri to the benchmark dataset catalog")
@click.option('-f', '--format', 'fmt',
help="'lance', 'parquet', or 'raw'. Omit for all")
@click.option('--flavor', type=str,
help="external if parquet/lance had external images version")
@click.option('-b', '--benchmark', type=str,
help="which benchmark to run. Omit for all")
@click.option('-r', '--repeats', type=int,
help="number of times to run each benchmark")
@click.option('-o', '--output', type=str,
help="save timing results to directory")
@click.option(
"-u",
"--base-uri",
required=True,
type=str,
help="Base uri to the benchmark dataset catalog",
)
@click.option(
"-f", "--format", "fmt", help="'lance', 'parquet', or 'raw'. Omit for all"
)
@click.option(
"--flavor",
type=str,
help="external if parquet/lance had external images version",
)
@click.option(
"-b", "--benchmark", type=str, help="which benchmark to run. Omit for all"
)
@click.option(
"-r", "--repeats", type=int, help="number of times to run each benchmark"
)
@click.option(
"-o", "--output", type=str, help="save timing results to directory"
)
def main(base_uri, fmt, flavor, benchmark, repeats, output):
if fmt:
fmt = fmt.strip().lower()
assert fmt in KNOWN_FORMATS
fmt = [fmt]
else:
fmt = KNOWN_FORMATS
base_uri = f'{base_uri}/datasets/{self.name}'
base_uri = f"{base_uri}/datasets/{self.name}"

def run_benchmark(bmark):
b = bmark.repeat(repeats or 1)
Expand All @@ -153,7 +163,6 @@ def run_benchmark(bmark):


class Benchmark:

def __init__(self, name, func, key=None, num_runs=1):
self.name = name
self.func = func
Expand Down Expand Up @@ -183,7 +192,9 @@ def timeit_wrapper(*args, **kwargs):
end_time = time.perf_counter()
total_time = end_time - start_time
# first item in the args, ie `args[0]` is `self`
print(f"Function {func.__name__}{args} {kwargs} Took {total_time:.4f} seconds")
print(
f"Function {func.__name__}{args} {kwargs} Took {total_time:.4f} seconds"
)
key = tuple([name] + [kwargs.get(k) for k in self.key])
self._timings.setdefault(key, []).append(total_time)
return result
Expand All @@ -194,7 +205,6 @@ def timeit_wrapper(*args, **kwargs):


class DatasetConverter(ABC):

def __init__(self, name, uri_root):
self.name = name
self.uri_root = uri_root
Expand All @@ -205,32 +215,33 @@ def read_metadata(self) -> pd.DataFrame:

def default_dataset_path(self, fmt, flavor=None):
suffix = f"_{flavor}" if flavor else ""
return os.path.join(self.uri_root,
f'{self.name}{suffix}.{fmt}')
return os.path.join(self.uri_root, f"{self.name}{suffix}.{fmt}")

def save_df(self, df, fmt='lance', output_path=None):
def save_df(self, df, fmt="lance", output_path=None):
output_path = output_path or self.default_dataset_path(fmt, "links")
table = pa.Table.from_pandas(df, self.get_schema())
if fmt == 'parquet':
if fmt == "parquet":
pq.write_table(table, output_path)
elif fmt == 'lance':
elif fmt == "lance":
lance.write_table(table, output_path)
return table

@abstractmethod
def image_uris(self, table):
pass

def make_embedded_dataset(self, table: pa.Table, fmt='lance', output_path=None):
def make_embedded_dataset(
self, table: pa.Table, fmt="lance", output_path=None, **kwargs
):
output_path = output_path or self.default_dataset_path(fmt)
uris = self.image_uris(table)
images = download_uris(pd.Series(uris))
arr = pa.BinaryArray.from_pandas(images)
embedded = table.append_column(pa.field("image", pa.binary()), arr)
if fmt == 'parquet':
pq.write_table(embedded, output_path)
elif fmt == 'lance':
lance.write_table(embedded, output_path)
if fmt == "parquet":
pq.write_table(embedded, output_path, **kwargs)
elif fmt == "lance":
lance.write_table(embedded, output_path, **kwargs)
return embedded

@abstractmethod
Expand Down
136 changes: 92 additions & 44 deletions python/benchmarks/parse_coco.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,26 @@


class CocoConverter(DatasetConverter):

def __init__(self, uri_root, version='2017'):
def __init__(self, uri_root, version="2017"):
super(CocoConverter, self).__init__("coco", uri_root)
self.version = version

def _get_instances_json(self, split):
uri = os.path.join(self.uri_root, 'annotations',
f'instances_{split}{self.version}.json')
uri = os.path.join(
self.uri_root, "annotations", f"instances_{split}{self.version}.json"
)
fs, path = pa.fs.FileSystem.from_uri(uri)
with fs.open_input_file(path) as fobj:
return json.load(fobj)

def _instances_to_df(self, split, instances_json):
df = pd.DataFrame(instances_json["annotations"])
df['segmentation'] = df.segmentation.apply(_convert_segmentation)
category_df = (pd.DataFrame(instances_json["categories"])
.rename({'id': 'category_id'}, axis=1))
df["segmentation"] = df.segmentation.apply(_convert_segmentation)
category_df = pd.DataFrame(instances_json["categories"]).rename(
{"id": "category_id"}, axis=1
)
annotations_df = df.merge(category_df, on="category_id")
annotations_df['iscrowd'] = annotations_df.iscrowd.astype(bool)
# annotations_df['iscrowd'] = annotations_df.iscrowd.astype(bool)
anno_df = (
pd.DataFrame(
{
Expand All @@ -45,12 +46,12 @@ def _instances_to_df(self, split, instances_json):
.groupby("image_id")
.agg(list)
).reset_index()
images_df = (pd.DataFrame(instances_json["images"])
.rename({'id': 'image_id'}, axis=1))
images_df = pd.DataFrame(instances_json["images"]).rename(
{"id": "image_id"}, axis=1
)
images_df["split"] = split
images_df["image_uri"] = images_df["file_name"].apply(
lambda fname: os.path.join(self.uri_root,
f"{split}{self.version}", fname)
lambda fname: os.path.join(self.uri_root, f"{split}{self.version}", fname)
)
# TODO join images_df.license to instances_json['license']
return images_df.merge(anno_df, on="image_id")
Expand All @@ -60,59 +61,106 @@ def read_split(split):
json_data = self._get_instances_json(split)
return self._instances_to_df(split, json_data)

df = pd.concat([read_split(split) for split in ['train', 'val']])
df = pd.concat([read_split(split) for split in ["train", "val"]])
# df['date_captured'] = pd.to_datetime(df.date_captured) # lance GH#98
return df

def image_uris(self, table):
return table["image_uri"].to_numpy()

def get_schema(self):
names = ['license', 'file_name', 'coco_url', 'height', 'width',
'date_captured', 'flickr_url', 'image_id', 'split', 'image_uri',
'annotations']
types = [pa.int64(), pa.utf8(), pa.utf8(), pa.int64(), pa.int64(),
pa.utf8(), pa.utf8(), pa.int64(), pa.utf8(), pa.utf8(),
self._ann_schema()]
return pa.schema([pa.field(name, dtype)
for name, dtype in zip(names, types)])
names = [
"license",
"file_name",
"coco_url",
"height",
"width",
"date_captured",
"flickr_url",
"image_id",
"split",
"image_uri",
"annotations",
]
types = [
pa.int64(),
pa.utf8(),
pa.utf8(),
pa.int64(),
pa.int64(),
pa.utf8(),
pa.utf8(),
pa.int64(),
pa.utf8(),
pa.utf8(),
self._ann_schema(),
]
return pa.schema([pa.field(name, dtype) for name, dtype in zip(names, types)])

@staticmethod
def _ann_schema():
segmentation_type = pa.struct([
pa.field("polygon", pa.list_(pa.list_(pa.int32()))),
pa.field("coco_rle", pa.struct([
pa.field('counts', pa.list_(pa.int32())),
pa.field('size', pa.list_(pa.int32()))])),
])
names = ['segmentation', 'area', 'iscrowd', 'bbox', 'category_id', 'id',
'supercategory', 'name']
types = [segmentation_type, pa.float64(), pa.bool_(),
pa.list_(pa.float32()), pa.int16(), pa.int64(),
pa.utf8(), pa.utf8()]
schema = pa.list_(pa.struct([pa.field(name, dtype)
for name, dtype in zip(names, types)]))
segmentation_type = pa.struct(
[
pa.field("polygon", pa.list_(pa.list_(pa.int32()))),
pa.field(
"coco_rle",
pa.struct(
[
pa.field("counts", pa.list_(pa.int32())),
pa.field("size", pa.list_(pa.int32())),
]
),
),
]
)
names = [
"segmentation",
"area",
"iscrowd",
"bbox",
"category_id",
"id",
"supercategory",
"name",
]
types = [
segmentation_type,
pa.float64(),
pa.int8(),
pa.list_(pa.float32()),
pa.int16(),
pa.int64(),
pa.utf8(),
pa.utf8(),
]
schema = pa.list_(
pa.struct([pa.field(name, dtype) for name, dtype in zip(names, types)])
)
return schema


def _convert_segmentation(s):
if isinstance(s, list):
return {'polygon': s}
return {"polygon": s}
return s


@click.command
@click.option('-u', '--base-uri', type=str, help='Coco dataset root')
@click.option('-v', '--version', type=str, default='2017',
help='Dataset version. Default 2017')
@click.option('-f', '--fmt', type=str,
help='Output format (parquet or lance)')
@click.option('-o', '--output-path', type=str,
help='Output path. Default is {base_uri}/coco_links.{fmt}')
@click.option("-u", "--base-uri", type=str, help="Coco dataset root")
@click.option(
"-v", "--version", type=str, default="2017", help="Dataset version. Default 2017"
)
@click.option("-f", "--fmt", type=str, help="Output format (parquet or lance)")
@click.option(
"-o",
"--output-path",
type=str,
help="Output path. Default is {base_uri}/coco_links.{fmt}",
)
def main(base_uri, version, fmt, output_path):
converter = CocoConverter(base_uri, version=version)
df = converter.read_metadata()
known_formats = ['lance', 'parquet']
known_formats = ["lance", "parquet"]
if fmt is not None:
assert fmt in known_formats
fmt = [fmt]
Expand All @@ -122,5 +170,5 @@ def main(base_uri, version, fmt, output_path):
return converter.save_df(df, f, output_path)


if __name__ == '__main__':
if __name__ == "__main__":
main()

0 comments on commit beb1d92

Please sign in to comment.