Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix wrong package type exception #21

Merged
merged 2 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed

- Check status before meta information, [PR-20](https://github.com/panda-official/DriftCLI/pull/20)
- Fix wrong package type exception, [PR-21](https://github.com/panda-official/DriftCLI/pull/21)

## 0.10.0 - 2024-03-26

Expand Down
55 changes: 39 additions & 16 deletions drift_cli/export_impl/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,21 +159,45 @@ async def _export_csv(
**kwargs,
):
Path.mkdir(Path(dest), exist_ok=True, parents=True)
it = client.walk(topic, to_timestamp(kwargs["start"]), to_timestamp(kwargs["stop"]))
try:
it = client.walk(
topic, to_timestamp(kwargs["start"]), to_timestamp(kwargs["stop"])
)

def _next():
try:
return next(it)
except StopIteration:
return None

good = False

while True:
pkg = await asyncio.get_running_loop().run_in_executor(pool, _next)
if pkg is None:
break
if pkg.status_code == 0:
good = True
break

def _next():
try:
return next(it)
except StopIteration:
return None
if not good:
progress.console.print(f"[ERROR] No good packages found in {topic}")
return

pkg = await asyncio.get_running_loop().run_in_executor(pool, _next)
if pkg is None or pkg.meta.type == MetaInfo.TIME_SERIES:
await _export_csv_timeseries(pool, client, topic, dest, progress, sem, **kwargs)
elif pkg.meta.type == MetaInfo.TYPED_DATA:
await _export_csv_typed_data(pool, client, topic, dest, progress, sem, **kwargs)
else:
raise RuntimeError(f"Can't export topic {topic} to csv")
if pkg is None or pkg.meta.type == MetaInfo.TIME_SERIES:
await _export_csv_timeseries(
pool, client, topic, dest, progress, sem, **kwargs
)
elif pkg.meta.type == MetaInfo.TYPED_DATA:
await _export_csv_typed_data(
pool, client, topic, dest, progress, sem, **kwargs
)
else:
progress.console.print(
f"[ERROR] {topic} is not a time series or typed data"
)
except DriftClientError as err:
progress.console.print(f"[ERROR] {err}")


async def _export_csv_timeseries(
Expand Down Expand Up @@ -324,9 +348,8 @@ async def export_raw(client: DriftClient, dest: str, parallel: int, **kwargs):
"""
sem = asyncio.Semaphore(parallel)
with Progress() as progress:
with ThreadPoolExecutor(max_workers=8) as pool:
topics = filter_topics(client.get_topics(), kwargs.pop("topics", ""))

with ThreadPoolExecutor() as pool:
topics = filter_topics(client.get_topics(), kwargs.pop("topics", []))
task = _export_csv if kwargs.get("csv", False) else _export_topic
task = _export_jpeg if kwargs.get("jpeg", False) else task

Expand Down
4 changes: 3 additions & 1 deletion tests/export_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,15 +283,17 @@ def test__export_raw_data_start_stop_required(runner, conf, export_path):
def test__export_raw_data_image(runner, client, conf, export_path):
"""Should skip no image"""
pkg = DriftPackage()
pkg.status = 0
pkg.meta.type = MetaInfo.IMAGE

client.walk.side_effect = [
Iterator([DriftDataPackage(pkg.SerializeToString())])
] * 2

result = runner(
f"-c {conf} -p 1 export raw test {export_path} --start 2022-01-01 --stop 2022-01-02 --csv"
)
assert "[RuntimeError] Can't export topic topic1 to csv" in result.output
assert "[ERROR] topic1 is not a time series or typed data" in result.output


@pytest.mark.usefixtures("set_alias")
Expand Down
Loading