Skip to content

Commit

Permalink
Add import_csv_pandas and import_csv_dask utility primitives
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Nov 7, 2023
1 parent 486e720 commit 7a2cb59
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
pip install "setuptools>=64" --upgrade
# Install package in editable mode.
pip install --use-pep517 --prefer-binary --editable=.[test,develop]
pip install --use-pep517 --prefer-binary --editable=.[io,test,develop]
- name: Run linter and software tests
run: |
Expand Down
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
## Unreleased

- Add SQL runner utility primitives to `io.sql` namespace
- Add `import_csv_pandas` and `import_csv_dask` utility primitives


## 2023/11/06 v0.0.2
Expand Down
55 changes: 55 additions & 0 deletions cratedb_toolkit/util/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,61 @@ def ensure_repository_az(
"""
self.run_sql(sql)

def import_csv_pandas(
self, filepath: t.Union[str, Path], tablename: str, index=False, chunksize=1000, if_exists="replace"
):
"""
Import CSV data using pandas.
"""
import pandas as pd
from crate.client.sqlalchemy.support import insert_bulk

Check warning on line 204 in cratedb_toolkit/util/database.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/util/database.py#L203-L204

Added lines #L203 - L204 were not covered by tests

df = pd.read_csv(filepath)
with self.engine.connect() as connection:
return df.to_sql(

Check warning on line 208 in cratedb_toolkit/util/database.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/util/database.py#L206-L208

Added lines #L206 - L208 were not covered by tests
tablename, connection, index=index, chunksize=chunksize, if_exists=if_exists, method=insert_bulk
)

def import_csv_dask(
self,
filepath: t.Union[str, Path],
tablename: str,
index=False,
chunksize=1000,
if_exists="replace",
npartitions: int = None,
progress: bool = False,
):
"""
Import CSV data using Dask.
"""
import dask.dataframe as dd
import pandas as pd
from crate.client.sqlalchemy.support import insert_bulk

Check warning on line 227 in cratedb_toolkit/util/database.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/util/database.py#L225-L227

Added lines #L225 - L227 were not covered by tests

# Set a few defaults.
# TODO: Use amount of CPU cores instead?
npartitions = npartitions or 4

Check warning on line 231 in cratedb_toolkit/util/database.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/util/database.py#L231

Added line #L231 was not covered by tests

if progress:
from dask.diagnostics import ProgressBar

Check warning on line 234 in cratedb_toolkit/util/database.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/util/database.py#L233-L234

Added lines #L233 - L234 were not covered by tests

pbar = ProgressBar()
pbar.register()

Check warning on line 237 in cratedb_toolkit/util/database.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/util/database.py#L236-L237

Added lines #L236 - L237 were not covered by tests

# Mangle data.
df = pd.read_csv(filepath)
ddf = dd.from_pandas(df, npartitions=npartitions)
return ddf.to_sql(

Check warning on line 242 in cratedb_toolkit/util/database.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/util/database.py#L240-L242

Added lines #L240 - L242 were not covered by tests
tablename,
uri=self.dburi,
index=index,
chunksize=chunksize,
if_exists=if_exists,
method=insert_bulk,
parallel=True,
)


def sa_is_empty(thing):
"""
Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ develop = [
"ruff==0.1.3",
"validate-pyproject<0.16",
]
io = [
"dask<=2023.10.1,>=2020",
"pandas<3,>=2",
]
release = [
"build<2",
"twine<5",
Expand Down
2 changes: 1 addition & 1 deletion release/oci/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ COPY . /src

# Install package.
RUN --mount=type=cache,id=pip,target=/root/.cache/pip \
pip install --use-pep517 --prefer-binary '/src'
pip install --use-pep517 --prefer-binary '/src[io]'

# Uninstall Git again.
RUN apt-get --yes remove --purge git && apt-get --yes autoremove
Expand Down

0 comments on commit 7a2cb59

Please sign in to comment.