Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(hive): Use parquet rather than textfile when uploading CSV files to Hive #14240

Merged
merged 2 commits into from
Apr 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 26 additions & 57 deletions superset/db_engine_specs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,50 +613,41 @@ def set_or_update_query_limit(cls, sql: str, limit: int) -> str:
parsed_query = sql_parse.ParsedQuery(sql)
return parsed_query.set_or_update_query_limit(limit)

@staticmethod
def csv_to_df(**kwargs: Any) -> pd.DataFrame:
"""Read csv into Pandas DataFrame
:param kwargs: params to be passed to DataFrame.read_csv
:return: Pandas DataFrame containing data from csv
"""
kwargs["encoding"] = "utf-8"
kwargs["iterator"] = True
chunks = pd.read_csv(**kwargs)
df = pd.concat(chunk for chunk in chunks)
return df

@classmethod
def df_to_sql(cls, df: pd.DataFrame, **kwargs: Any) -> None:
"""Upload data from a Pandas DataFrame to a database. For
regular engines this calls the DataFrame.to_sql() method. Can be
overridden for engines that don't work well with to_sql(), e.g.
BigQuery.
:param df: Dataframe with data to be uploaded
:param kwargs: kwargs to be passed to to_sql() method
"""
df.to_sql(**kwargs)

@classmethod
def create_table_from_csv( # pylint: disable=too-many-arguments
def df_to_sql(
cls,
filename: str,
table: Table,
database: "Database",
csv_to_df_kwargs: Dict[str, Any],
df_to_sql_kwargs: Dict[str, Any],
table: Table,
df: pd.DataFrame,
to_sql_kwargs: Dict[str, Any],
) -> None:
"""
Create table from contents of a csv. Note: this method does not create
metadata for the table.
Upload data from a Pandas DataFrame to a database.

For regular engines this calls the `pandas.DataFrame.to_sql` method. Can be
overridden for engines that don't work well with this method, e.g. Hive and
BigQuery.

Note this method does not create metadata for the table.

:param database: The database to upload the data to
:param table: The table to upload the data to
:param df: The dataframe with data to be uploaded
:param to_sql_kwargs: The kwargs to be passed to pandas.DataFrame.to_sql` method
"""
df = cls.csv_to_df(filepath_or_buffer=filename, **csv_to_df_kwargs)

engine = cls.get_engine(database)
to_sql_kwargs["name"] = table.table

if table.schema:
# only add schema when it is preset and non empty
df_to_sql_kwargs["schema"] = table.schema

# Only add schema when it is preset and non empty.
to_sql_kwargs["schema"] = table.schema

if engine.dialect.supports_multivalues_insert:
df_to_sql_kwargs["method"] = "multi"
cls.df_to_sql(df=df, con=engine, **df_to_sql_kwargs)
to_sql_kwargs["method"] = "multi"

df.to_sql(con=engine, **to_sql_kwargs)

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
Expand All @@ -669,28 +660,6 @@ def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
"""
return None

@classmethod
def create_table_from_excel( # pylint: disable=too-many-arguments
cls,
filename: str,
table: Table,
database: "Database",
excel_to_df_kwargs: Dict[str, Any],
df_to_sql_kwargs: Dict[str, Any],
) -> None:
"""
Create table from contents of a excel. Note: this method does not create
metadata for the table.
"""
df = pd.read_excel(io=filename, **excel_to_df_kwargs)
engine = cls.get_engine(database)
if table.schema:
# only add schema when it is preset and non empty
df_to_sql_kwargs["schema"] = table.schema
if engine.dialect.supports_multivalues_insert:
df_to_sql_kwargs["method"] = "multi"
cls.df_to_sql(df=df, con=engine, **df_to_sql_kwargs)

@classmethod
def get_all_datasource_names(
cls, database: "Database", datasource_type: str
Expand Down
54 changes: 34 additions & 20 deletions superset/db_engine_specs/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from superset.db_engine_specs.base import BaseEngineSpec
from superset.errors import SupersetErrorType
from superset.sql_parse import Table
from superset.utils import core as utils

if TYPE_CHECKING:
Expand Down Expand Up @@ -228,16 +229,26 @@ def epoch_ms_to_dttm(cls) -> str:
return "TIMESTAMP_MILLIS({col})"

@classmethod
def df_to_sql(cls, df: pd.DataFrame, **kwargs: Any) -> None:
def df_to_sql(
cls,
database: "Database",
table: Table,
df: pd.DataFrame,
to_sql_kwargs: Dict[str, Any],
) -> None:
"""
Upload data from a Pandas DataFrame to BigQuery. Calls
`DataFrame.to_gbq()` which requires `pandas_gbq` to be installed.
Upload data from a Pandas DataFrame to a database.

:param df: Dataframe with data to be uploaded
:param kwargs: kwargs to be passed to to_gbq() method. Requires that `schema`,
`name` and `con` are present in kwargs. `name` and `schema` are combined
and passed to `to_gbq()` as `destination_table`.
Calls `pandas_gbq.DataFrame.to_gbq` which requires `pandas_gbq` to be installed.

Note this method does not create metadata for the table.

:param database: The database to upload the data to
:param table: The table to upload the data to
:param df: The dataframe with data to be uploaded
:param to_sql_kwargs: The kwargs to be passed to pandas.DataFrame.to_sql` method
"""

try:
import pandas_gbq
from google.oauth2 import service_account
Expand All @@ -248,22 +259,25 @@ def df_to_sql(cls, df: pd.DataFrame, **kwargs: Any) -> None:
"to upload data to BigQuery"
)

if not ("name" in kwargs and "schema" in kwargs and "con" in kwargs):
raise Exception("name, schema and con need to be defined in kwargs")
if not table.schema:
raise Exception("The table schema must be defined")

gbq_kwargs = {}
gbq_kwargs["project_id"] = kwargs["con"].engine.url.host
gbq_kwargs["destination_table"] = f"{kwargs.pop('schema')}.{kwargs.pop('name')}"
engine = cls.get_engine(database)
to_gbq_kwargs = {"destination_table": str(table), "project_id": engine.url.host}

# Add credentials if they are set on the SQLAlchemy dialect.
creds = engine.dialect.credentials_info

# add credentials if they are set on the SQLAlchemy Dialect:
creds = kwargs["con"].dialect.credentials_info
if creds:
credentials = service_account.Credentials.from_service_account_info(creds)
gbq_kwargs["credentials"] = credentials
to_gbq_kwargs[
"credentials"
] = service_account.Credentials.from_service_account_info(creds)

# Only pass through supported kwargs
# Only pass through supported kwargs.
supported_kwarg_keys = {"if_exists"}

for key in supported_kwarg_keys:
if key in kwargs:
gbq_kwargs[key] = kwargs[key]
pandas_gbq.to_gbq(df, **gbq_kwargs)
if key in to_sql_kwargs:
to_gbq_kwargs[key] = to_sql_kwargs[key]

pandas_gbq.to_gbq(df, **to_gbq_kwargs)
160 changes: 72 additions & 88 deletions superset/db_engine_specs/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@
import logging
import os
import re
import tempfile
import time
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple, TYPE_CHECKING
from urllib import parse

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from flask import g
from sqlalchemy import Column, text
from sqlalchemy.engine.base import Engine
Expand Down Expand Up @@ -54,6 +58,15 @@


def upload_to_s3(filename: str, upload_prefix: str, table: Table) -> str:
"""
Upload the file to S3.

:param filename: The file to upload
:param upload_prefix: The S3 prefix
:param table: The table that will be created
:returns: The S3 location of the table
"""

# Optional dependency
import boto3 # pylint: disable=import-error

Expand Down Expand Up @@ -152,89 +165,37 @@ def fetch_data(
return []

@classmethod
def get_create_table_stmt( # pylint: disable=too-many-arguments
cls,
table: Table,
schema_definition: str,
location: str,
delim: str,
header_line_count: Optional[int],
null_values: Optional[List[str]],
) -> text:
tblproperties = []
# available options:
# https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
# TODO(bkyryliuk): figure out what to do with the skip rows field.
params: Dict[str, str] = {
"delim": delim,
"location": location,
}
if header_line_count is not None and header_line_count >= 0:
header_line_count += 1
tblproperties.append("'skip.header.line.count'=:header_line_count")
params["header_line_count"] = str(header_line_count)
if null_values:
# hive only supports 1 value for the null format
tblproperties.append("'serialization.null.format'=:null_value")
params["null_value"] = null_values[0]

if tblproperties:
tblproperties_stmt = f"tblproperties ({', '.join(tblproperties)})"
sql = f"""CREATE TABLE {str(table)} ( {schema_definition} )
ROW FORMAT DELIMITED FIELDS TERMINATED BY :delim
STORED AS TEXTFILE LOCATION :location
{tblproperties_stmt}"""
else:
sql = f"""CREATE TABLE {str(table)} ( {schema_definition} )
ROW FORMAT DELIMITED FIELDS TERMINATED BY :delim
STORED AS TEXTFILE LOCATION :location"""
return sql, params

@classmethod
def create_table_from_csv( # pylint: disable=too-many-arguments, too-many-locals
def df_to_sql(
cls,
filename: str,
table: Table,
database: "Database",
csv_to_df_kwargs: Dict[str, Any],
df_to_sql_kwargs: Dict[str, Any],
table: Table,
df: pd.DataFrame,
to_sql_kwargs: Dict[str, Any],
) -> None:
"""Uploads a csv file and creates a superset datasource in Hive."""
if_exists = df_to_sql_kwargs["if_exists"]
if if_exists == "append":
raise SupersetException("Append operation not currently supported")
"""
Upload data from a Pandas DataFrame to a database.

def convert_to_hive_type(col_type: str) -> str:
"""maps tableschema's types to hive types"""
tableschema_to_hive_types = {
"boolean": "BOOLEAN",
"integer": "BIGINT",
"number": "DOUBLE",
"string": "STRING",
}
return tableschema_to_hive_types.get(col_type, "STRING")
The data is stored via the binary Parquet format which is both less problematic
and more performant than a text file. More specifically storing a table as a
CSV text file has severe limitations including the fact that the Hive CSV SerDe
does not support multiline fields.

upload_prefix = config["CSV_TO_HIVE_UPLOAD_DIRECTORY_FUNC"](
database, g.user, table.schema
)
Note this method does not create metadata for the table.

# Optional dependency
from tableschema import ( # pylint: disable=import-error
Table as TableSchemaTable,
)
:param database: The database to upload the data to
:param: table The table to upload the data to
:param df: The dataframe with data to be uploaded
:param to_sql_kwargs: The kwargs to be passed to pandas.DataFrame.to_sql` method
"""

hive_table_schema = TableSchemaTable(filename).infer()
column_name_and_type = []
for column_info in hive_table_schema["fields"]:
column_name_and_type.append(
"`{}` {}".format(
column_info["name"], convert_to_hive_type(column_info["type"])
)
)
schema_definition = ", ".join(column_name_and_type)
engine = cls.get_engine(database)

if to_sql_kwargs["if_exists"] == "append":
raise SupersetException("Append operation not currently supported")

# ensure table doesn't already exist
if if_exists == "fail":
if to_sql_kwargs["if_exists"] == "fail":

# Ensure table doesn't already exist.
if table.schema:
table_exists = not database.get_df(
f"SHOW TABLES IN {table.schema} LIKE '{table.table}'"
Expand All @@ -243,24 +204,47 @@ def convert_to_hive_type(col_type: str) -> str:
table_exists = not database.get_df(
f"SHOW TABLES LIKE '{table.table}'"
).empty

if table_exists:
raise SupersetException("Table already exists")
elif to_sql_kwargs["if_exists"] == "replace":
engine.execute(f"DROP TABLE IF EXISTS {str(table)}")

engine = cls.get_engine(database)
def _get_hive_type(dtype: np.dtype) -> str:
hive_type_by_dtype = {
np.dtype("bool"): "BOOLEAN",
np.dtype("float64"): "DOUBLE",
np.dtype("int64"): "BIGINT",
np.dtype("object"): "STRING",
}

if if_exists == "replace":
engine.execute(f"DROP TABLE IF EXISTS {str(table)}")
location = upload_to_s3(filename, upload_prefix, table)
sql, params = cls.get_create_table_stmt(
table,
schema_definition,
location,
csv_to_df_kwargs["sep"].encode().decode("unicode_escape"),
int(csv_to_df_kwargs.get("header", 0)),
csv_to_df_kwargs.get("na_values"),
return hive_type_by_dtype.get(dtype, "STRING")

schema_definition = ", ".join(
f"`{name}` {_get_hive_type(dtype)}" for name, dtype in df.dtypes.items()
)
engine = cls.get_engine(database)
engine.execute(text(sql), **params)

with tempfile.NamedTemporaryFile(
dir=config["UPLOAD_FOLDER"], suffix=".parquet"
) as file:
pq.write_table(pa.Table.from_pandas(df), where=file.name)

engine.execute(
text(
f"""
CREATE TABLE {str(table)} ({schema_definition})
STORED AS PARQUET
LOCATION :location
"""
),
location=upload_to_s3(
filename=file.name,
upload_prefix=config["CSV_TO_HIVE_UPLOAD_DIRECTORY_FUNC"](
database, g.user, table.schema
),
table=table,
),
)

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
Expand Down
Loading