Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Import CSV #3643

Merged
merged 10 commits into from
Nov 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/faq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ visualizations.
https://github.com/airbnb/superset/issues?q=label%3Aexample+is%3Aclosed


Can I upload and visualize csv data?
-------------------------------------

Yes, using the ``Upload a CSV`` button under the ``Sources``
menu item. This brings up a form that allows you specify required information. After creating the table from CSV, it can then be loadede like any other on the ``Sources -> Tables``page.


Why are my queries timing out?
------------------------------

Expand Down
16 changes: 14 additions & 2 deletions superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@
SECRET_KEY = '\2\1thisismyscretkey\1\2\e\y\y\h' # noqa

# The SQLAlchemy connection string.
SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(DATA_DIR, 'superset.db')
# SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(DATA_DIR, 'superset.db')
# SQLALCHEMY_DATABASE_URI = 'mysql://myapp@localhost/myapp'
# SQLALCHEMY_DATABASE_URI = 'postgresql://root:password@localhost/myapp'
SQLALCHEMY_DATABASE_URI = 'postgresql://root:password@localhost/myapp'

# In order to hook up a custom password store for all SQLACHEMY connections
# implement a function that takes a single argument of type 'sqla.engine.url',
Expand Down Expand Up @@ -186,6 +186,10 @@
ENABLE_CORS = False
CORS_OPTIONS = {}

# Allowed format types for upload on Database view
# TODO: Add processing of other spreadsheet formats (xls, xlsx etc)
ALLOWED_EXTENSIONS = set(['csv'])

# CSV Options: key/value pairs that will be passed as argument to DataFrame.to_csv method
# note: index option should not be overridden
CSV_EXPORT = {
Expand Down Expand Up @@ -296,6 +300,14 @@ class CeleryConfig(object):
# in SQL Lab by using the "Run Async" button/feature
RESULTS_BACKEND = None

# The S3 bucket where you want to store your external hive tables created
# from CSV files. For example, 'companyname-superset'
CSV_TO_HIVE_UPLOAD_S3_BUCKET = None

# The directory within the bucket specified above that will
# contain all the external tables
CSV_TO_HIVE_UPLOAD_DIRECTORY = 'EXTERNAL_HIVE_TABLES/'

# A dictionary of items that gets merged into the Jinja context for
# SQL Lab. The existing context gets updated with this dictionary,
# meaning values for existing keys get overwritten by the content of this
Expand Down
109 changes: 108 additions & 1 deletion superset/db_engine_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,30 @@
from __future__ import unicode_literals

from collections import defaultdict, namedtuple
import csv
import inspect
import logging
import os
import re
import textwrap
import time

import boto3
from flask import g
from flask_babel import lazy_gettext as _
import pandas
from sqlalchemy import select
from sqlalchemy.engine import create_engine
from sqlalchemy.engine.url import make_url
from sqlalchemy.sql import text
import sqlparse
from werkzeug.utils import secure_filename

from superset import cache_util, conf, utils
from superset import app, cache_util, conf, db, utils
from superset.utils import QueryStatus, SupersetTemplateException

config = app.config

tracking_url_trans = conf.get('TRACKING_URL_TRANSFORMER')

Grain = namedtuple('Grain', 'name label function')
Expand Down Expand Up @@ -73,6 +82,65 @@ def extra_table_metadata(cls, database, table_name, schema_name):
"""Returns engine-specific table metadata"""
return {}

@staticmethod
def csv_to_df(**kwargs):
kwargs['filepath_or_buffer'] = \
app.config['UPLOAD_FOLDER'] + kwargs['filepath_or_buffer']
kwargs['encoding'] = 'utf-8'
kwargs['iterator'] = True
chunks = pandas.read_csv(**kwargs)
df = pandas.DataFrame()
df = pandas.concat(chunk for chunk in chunks)
return df

@staticmethod
def df_to_db(df, table, **kwargs):
df.to_sql(**kwargs)
table.user_id = g.user.id
table.schema = kwargs['schema']
table.fetch_metadata()
db.session.add(table)
db.session.commit()

@staticmethod
def create_table_from_csv(form, table):
def _allowed_file(filename):
# Only allow specific file extensions as specified in the config
extension = os.path.splitext(filename)[1]
return extension and extension[1:] in app.config['ALLOWED_EXTENSIONS']

filename = secure_filename(form.csv_file.data.filename)
if not _allowed_file(filename):
raise Exception('Invalid file type selected')
kwargs = {
'filepath_or_buffer': filename,
'sep': form.sep.data,
'header': form.header.data if form.header.data else 0,
'index_col': form.index_col.data,
'mangle_dupe_cols': form.mangle_dupe_cols.data,
'skipinitialspace': form.skipinitialspace.data,
'skiprows': form.skiprows.data,
'nrows': form.nrows.data,
'skip_blank_lines': form.skip_blank_lines.data,
'parse_dates': form.parse_dates.data,
'infer_datetime_format': form.infer_datetime_format.data,
'chunksize': 10000,
}
df = BaseEngineSpec.csv_to_df(**kwargs)

df_to_db_kwargs = {
'table': table,
'df': df,
'name': form.name.data,
'con': create_engine(form.con.data, echo=False),
'schema': form.schema.data,
'if_exists': form.if_exists.data,
'index': form.index.data,
'index_label': form.index_label.data,
'chunksize': 10000,
}
BaseEngineSpec.df_to_db(**df_to_db_kwargs)

@classmethod
def escape_sql(cls, sql):
"""Escapes the raw SQL"""
Expand Down Expand Up @@ -721,6 +789,45 @@ def fetch_result_sets(cls, db, datasource_type, force=False):
return BaseEngineSpec.fetch_result_sets(
db, datasource_type, force=force)

@staticmethod
def create_table_from_csv(form, table):
"""Uploads a csv file and creates a superset datasource in Hive."""
def get_column_names(filepath):
with open(filepath, 'rb') as f:
return csv.reader(f).next()

table_name = form.name.data
filename = form.csv_file.data.filename

bucket_path = app.config['CSV_TO_HIVE_UPLOAD_BUCKET']

if not bucket_path:
logging.info('No upload bucket specified')
raise Exception(
'No upload bucket specified. You can specify one in the config file.')

upload_prefix = app.config['CSV_TO_HIVE_UPLOAD_DIRECTORY']
dest_path = os.path.join(table_name, filename)

upload_path = app.config['UPLOAD_FOLDER'] + \
secure_filename(form.csv_file.data.filename)
column_names = get_column_names(upload_path)
schema_definition = ', '.join(
[s + ' STRING ' for s in column_names])

s3 = boto3.client('s3')
location = os.path.join('s3a://', bucket_path, upload_prefix, table_name)
s3.upload_file(
upload_path, 'airbnb-superset',
os.path.join(upload_prefix, table_name, filename))
sql = """CREATE EXTERNAL TABLE {table_name} ( {schema_definition} )
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS
TEXTFILE LOCATION '{location}'""".format(**locals())

logging.info(form.con.data)
engine = create_engine(form.con.data)
engine.execute(sql)

@classmethod
def convert_dttm(cls, target_type, dttm):
tt = target_type.upper()
Expand Down
123 changes: 123 additions & 0 deletions superset/forms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""Contains the logic to create cohesive forms on the explore view"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
from flask_appbuilder.forms import DynamicForm
from flask_babel import lazy_gettext as _
from flask_wtf.file import FileAllowed, FileField, FileRequired
from wtforms import (
BooleanField, IntegerField, SelectField, StringField)
from wtforms.validators import DataRequired, NumberRange, Optional

from superset import app

config = app.config


class CsvToDatabaseForm(DynamicForm):
name = StringField(
_('Table Name'),
description=_('Name of table to be created from csv data.'),
validators=[DataRequired()],
widget=BS3TextFieldWidget())
csv_file = FileField(
_('CSV File'),
description=_('Select a CSV file to be uploaded to a database.'),
validators=[
FileRequired(), FileAllowed(['csv'], _('CSV Files Only!'))])

con = SelectField(
_('Database'),
description=_('database in which to add above table.'),
validators=[DataRequired()],
choices=[])
sep = StringField(
_('Delimiter'),
description=_('Delimiter used by CSV file (for whitespace use \s+).'),
validators=[DataRequired()],
widget=BS3TextFieldWidget())
if_exists = SelectField(
_('Table Exists'),
description=_(
'If table exists do one of the following: '
'Fail (do nothing), Replace (drop and recreate table) '
'or Append (insert data).'),
choices=[
('fail', _('Fail')), ('replace', _('Replace')),
('append', _('Append'))],
validators=[DataRequired()])

schema = StringField(
_('Schema'),
description=_('Specify a schema (if database flavour supports this).'),
validators=[Optional()],
widget=BS3TextFieldWidget(),
filters=[lambda x: x or None])
header = IntegerField(
_('Header Row'),
description=_(
'Row containing the headers to use as '
'column names (0 is first line of data). '
'Leave empty if there is no header row.'),
validators=[Optional()],
widget=BS3TextFieldWidget(),
filters=[lambda x: x or None])
index_col = IntegerField(
_('Index Column'),
description=_(
'Column to use as the row labels of the '
'dataframe. Leave empty if no index column.'),
validators=[Optional(), NumberRange(0, 1E+20)],
widget=BS3TextFieldWidget(),
filters=[lambda x: x or None])
mangle_dupe_cols = BooleanField(
_('Mangle Duplicate Columns'),
description=_('Specify duplicate columns as "X.0, X.1".'))
skipinitialspace = BooleanField(
_('Skip Initial Space'),
description=_('Skip spaces after delimiter.'))
skiprows = IntegerField(
_('Skip Rows'),
description=_('Number of rows to skip at start of file.'),
validators=[Optional(), NumberRange(0, 1E+20)],
widget=BS3TextFieldWidget(),
filters=[lambda x: x or None])
nrows = IntegerField(
_('Rows to Read'),
description=_('Number of rows of file to read.'),
validators=[Optional(), NumberRange(0, 1E+20)],
widget=BS3TextFieldWidget(),
filters=[lambda x: x or None])
skip_blank_lines = BooleanField(
_('Skip Blank Lines'),
description=_(
'Skip blank lines rather than interpreting them '
'as NaN values.'))
parse_dates = BooleanField(
_('Parse Dates'),
description=_('Parse date values.'))
infer_datetime_format = BooleanField(
_('Infer Datetime Format'),
description=_(
'Use Pandas to interpret the datetime format '
'automatically.'))
decimal = StringField(
_('Decimal Character'),
description=_('Character to interpret as decimal point.'),
validators=[Optional()],
widget=BS3TextFieldWidget(),
filters=[lambda x: x or '.'])
index = BooleanField(
_('Dataframe Index'),
description=_('Write dataframe index as a column.'))
index_label = StringField(
_('Column Label(s)'),
description=_(
'Column label for index column(s). If None is given '
'and Dataframe Index is True, Index Names are used.'),
validators=[Optional()],
widget=BS3TextFieldWidget(),
filters=[lambda x: x or None])
Loading