Skip to content

Commit

Permalink
add label type and fmt pylance using isort/black
Browse files Browse the repository at this point in the history
  • Loading branch information
changhiskhan committed Sep 7, 2022
1 parent 57d7151 commit 294434d
Show file tree
Hide file tree
Showing 11 changed files with 239 additions and 145 deletions.
7 changes: 3 additions & 4 deletions python/benchmarks/bench_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,19 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import multiprocessing as mp
import os
import pathlib
import time
from abc import ABC, abstractmethod
from functools import wraps
import multiprocessing as mp
from typing import Iterable, Union

import click
import pandas as pd
import time

import pyarrow as pa
import pyarrow.fs
import pyarrow.dataset as ds
import pyarrow.fs
import pyarrow.parquet as pq

import lance
Expand Down
66 changes: 38 additions & 28 deletions python/benchmarks/coco.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,38 @@

import duckdb
import pandas as pd

import lance
import pyarrow.compute as pc
import pyarrow.dataset as ds
from bench_utils import download_uris, get_uri, get_dataset, BenchmarkSuite
from bench_utils import BenchmarkSuite, download_uris, get_dataset, get_uri
from parse_coco import CocoConverter

import lance

coco_benchmarks = BenchmarkSuite("coco")


@coco_benchmarks.benchmark("label_distribution", key=['fmt', 'flavor'])
@coco_benchmarks.benchmark("label_distribution", key=["fmt", "flavor"])
def label_distribution(base_uri: str, fmt: str, flavor: str = None):
if fmt == 'raw':
if fmt == "raw":
return _label_distribution_raw(base_uri)
elif fmt == 'lance':
elif fmt == "lance":
uri = get_uri(base_uri, "coco", fmt, flavor)
dataset = get_dataset(uri)
return _label_distribution_lance(dataset)
elif fmt == 'parquet':
elif fmt == "parquet":
uri = get_uri(base_uri, "coco", fmt, flavor)
dataset = get_dataset(uri)
return _label_distribution_duckdb(dataset)
raise NotImplementedError()


@coco_benchmarks.benchmark("filter_data", key=['fmt', 'flavor'])
@coco_benchmarks.benchmark("filter_data", key=["fmt", "flavor"])
def filter_data(base_uri: str, fmt: str, flavor: str = None):
if fmt == 'raw':
if fmt == "raw":
return _filter_data_raw(base_uri)
elif fmt == 'lance':
elif fmt == "lance":
return _filter_data_lance(base_uri, flavor=flavor)
elif fmt == 'parquet':
elif fmt == "parquet":
return _filter_data_parquet(base_uri, flavor=flavor)
raise NotImplementedError()

Expand All @@ -55,40 +55,50 @@ def _filter_data_raw(base_uri: str, klass="cat", offset=20, limit=50):
df = c.read_metadata()
mask = df.annotations.apply(lambda ann: any([a["name"] == klass for a in ann]))
filtered = df.loc[mask, ["image_uri", "annotations"]]
limited = filtered[offset:offset + limit]
limited = filtered[offset : offset + limit]
limited.assign(image=download_uris(limited.image_uri))
return limited


def _filter_data_lance(base_uri: str, klass="cat", offset=20, limit=50, flavor=None):
uri = get_uri(base_uri, "coco", "lance", flavor)
index_scanner = lance.scanner(uri, columns=['image_id', 'annotations.name'])
query = (f"SELECT distinct image_id FROM ("
f" SELECT image_id, UNNEST(annotations) as ann FROM index_scanner"
f") WHERE ann.name == '{klass}'")
index_scanner = lance.scanner(uri, columns=["image_id", "annotations.name"])
query = (
f"SELECT distinct image_id FROM ("
f" SELECT image_id, UNNEST(annotations) as ann FROM index_scanner"
f") WHERE ann.name == '{klass}'"
)
filtered_ids = duckdb.query(query).arrow().column("image_id").combine_chunks()
scanner = lance.scanner(uri, ['image_id', 'image', 'annotations.name'],
# filter=pc.field("image_id").isin(filtered_ids),
limit=50, offset=20)
scanner = lance.scanner(
uri,
["image_id", "image", "annotations.name"],
# filter=pc.field("image_id").isin(filtered_ids),
limit=50,
offset=20,
)
return scanner.to_table().to_pandas()


def _filter_data_parquet(base_uri: str, klass="cat", offset=20, limit=50, flavor=None):
uri = get_uri(base_uri, "coco", "parquet", flavor)
dataset = ds.dataset(uri)
query = (f"SELECT distinct image_id FROM ("
f" SELECT image_id, UNNEST(annotations) as ann FROM dataset"
f") WHERE ann.name == '{klass}'")
query = (
f"SELECT distinct image_id FROM ("
f" SELECT image_id, UNNEST(annotations) as ann FROM dataset"
f") WHERE ann.name == '{klass}'"
)
filtered_ids = duckdb.query(query).arrow().column("image_id").to_numpy().tolist()
id_string = ','.join([f"'{x}'" for x in filtered_ids])
return duckdb.query(f"SELECT image, annotations "
f"FROM dataset "
f"WHERE image_id in ({id_string}) "
f"LIMIT 50 OFFSET 20").to_arrow_table()
id_string = ",".join([f"'{x}'" for x in filtered_ids])
return duckdb.query(
f"SELECT image, annotations "
f"FROM dataset "
f"WHERE image_id in ({id_string}) "
f"LIMIT 50 OFFSET 20"
).to_arrow_table()


def _label_distribution_lance(dataset: ds.Dataset):
scanner = lance.scanner(dataset, columns=['annotations.name'])
scanner = lance.scanner(dataset, columns=["annotations.name"])
return _label_distribution_duckdb(scanner)


Expand Down
42 changes: 22 additions & 20 deletions python/benchmarks/oxford_pet.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,51 +6,54 @@
import duckdb
import numpy as np
import pandas as pd

import lance
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.dataset
from bench_utils import BenchmarkSuite, download_uris
from parse_pet import OxfordPetConverter

import lance

oxford_pet_benchmarks = BenchmarkSuite("oxford_pet")


@oxford_pet_benchmarks.benchmark("label_distribution", key=['fmt', 'flavor'])
@oxford_pet_benchmarks.benchmark("label_distribution", key=["fmt", "flavor"])
def label_distribution(base_uri: str, fmt: str, flavor: Optional[str]):
if fmt == "raw":
return get_pets_class_distribution(base_uri)
suffix = '' if not flavor else f'_{flavor}'
ds = _get_dataset(os.path.join(base_uri, f'oxford_pet{suffix}.{fmt}'), fmt)
suffix = "" if not flavor else f"_{flavor}"
ds = _get_dataset(os.path.join(base_uri, f"oxford_pet{suffix}.{fmt}"), fmt)
query = "SELECT class, count(1) FROM ds GROUP BY 1"
return duckdb.query(query).to_df()


@oxford_pet_benchmarks.benchmark("filter_data", key=['fmt', 'flavor'])
@oxford_pet_benchmarks.benchmark("filter_data", key=["fmt", "flavor"])
def filter_data(base_uri: str, fmt: str, flavor: Optional[str]):
if fmt == "raw":
return get_pets_filtered_data(base_uri)
suffix = '' if not flavor else f'_{flavor}'
uri = os.path.join(base_uri, f'oxford_pet{suffix}.{fmt}')
suffix = "" if not flavor else f"_{flavor}"
uri = os.path.join(base_uri, f"oxford_pet{suffix}.{fmt}")
if fmt == "parquet":
ds = _get_dataset(uri, fmt)
query = ("SELECT image, class FROM ds WHERE class='pug' "
"LIMIT 50 OFFSET 20")
query = "SELECT image, class FROM ds WHERE class='pug' " "LIMIT 50 OFFSET 20"
return duckdb.query(query).to_df()
elif fmt == "lance":
scanner = lance.scanner(uri, columns=["image", "class"],
filter=pc.field("class") == "pug",
limit=50, offset=20)
scanner = lance.scanner(
uri,
columns=["image", "class"],
filter=pc.field("class") == "pug",
limit=50,
offset=20,
)
return scanner.to_table().to_pandas()


@oxford_pet_benchmarks.benchmark("area_histogram", key=['fmt', 'flavor'])
@oxford_pet_benchmarks.benchmark("area_histogram", key=["fmt", "flavor"])
def compute_histogram(base_uri: str, fmt: str, flavor: Optional[str]):
if fmt == "raw":
return area_histogram_raw(base_uri)
suffix = '' if not flavor else f'_{flavor}'
uri = os.path.join(base_uri, f'oxford_pet{suffix}.{fmt}')
suffix = "" if not flavor else f"_{flavor}"
uri = os.path.join(base_uri, f"oxford_pet{suffix}.{fmt}")
ds = _get_dataset(uri, fmt)
query = "SELECT histogram(size.width * size.height) FROM ds"
return duckdb.query(query).to_df()
Expand All @@ -74,16 +77,15 @@ def get_pets_filtered_data(base_uri, klass="pug", offset=20, limit=50):
c = OxfordPetConverter(base_uri)
df = c.read_metadata()
filtered = df.loc[df["class"] == klass, ["class", "filename"]]
limited: pd.DataFrame = filtered[offset: offset + limit]
uris = [os.path.join(base_uri, f"images/{x}.jpg")
for x in limited.filename.values]
limited: pd.DataFrame = filtered[offset : offset + limit]
uris = [os.path.join(base_uri, f"images/{x}.jpg") for x in limited.filename.values]
return limited.assign(images=download_uris(pd.Series(uris)))


def area_histogram_raw(base_uri):
c = OxfordPetConverter(base_uri)
df = c.read_metadata()
sz = pd.json_normalize(df['size'])
sz = pd.json_normalize(df["size"])
query = "SELECT histogram(width * height) FROM sz"
return duckdb.query(query).to_df()

Expand Down
25 changes: 19 additions & 6 deletions python/benchmarks/parse_bdd100k.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
from typing import Union

import click
import lance
import pandas as pd
import pyarrow as pa
import pyarrow.fs

from bench_utils import DatasetConverter

import lance


class BDD100kConverter(DatasetConverter):
def __init__(self, uri_root: Union[str, Path]):
Expand All @@ -23,12 +23,18 @@ def read_metadata(self) -> pd.DataFrame:
for split in ["train", "val"]:
annotation = pd.read_json(
os.path.join(
self.uri_root, "bdd100k", "labels", f"bdd100k_labels_images_{split}.json"
self.uri_root,
"bdd100k",
"labels",
f"bdd100k_labels_images_{split}.json",
)
)
annotation["split"] = split
annotation["image_uri"] = annotation["name"].map(
lambda name: os.path.join(self.uri_root, "bdd100k", "images", "100k", split, name))
lambda name: os.path.join(
self.uri_root, "bdd100k", "images", "100k", split, name
)
)
frames.append(annotation)

return pd.concat(frames)
Expand Down Expand Up @@ -82,7 +88,12 @@ def get_schema(self):

@click.command
@click.option("-u", "--base-uri", type=str, required=True, help="Coco dataset root")
@click.option("-f", "--fmt", type=click.Choice(["lance", "parquet"]), help="Output format (parquet or lance)")
@click.option(
"-f",
"--fmt",
type=click.Choice(["lance", "parquet"]),
help="Output format (parquet or lance)",
)
@click.option("-e", "--embedded", type=bool, default=True, help="Embed images")
@click.option(
"-o",
Expand All @@ -103,7 +114,9 @@ def main(base_uri, fmt, embedded, output_path):
partitioning = ["split"]
for f in fmt:
if embedded:
converter.make_embedded_dataset(df, f, output_path, partitioning=partitioning)
converter.make_embedded_dataset(
df, f, output_path, partitioning=partitioning
)
else:
return converter.save_df(df, f, output_path, partitioning=partitioning)

Expand Down
1 change: 0 additions & 1 deletion python/benchmarks/parse_coco.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import click
import pandas as pd
import pyarrow as pa

from bench_utils import DatasetConverter


Expand Down
Loading

0 comments on commit 294434d

Please sign in to comment.