Skip to content

Commit

Permalink
refactored TableColumnCsvExtractor to the csv_extractors file.
Browse files Browse the repository at this point in the history
Added TableColumnCsvExtractor to sample_csv_data_loader.py also fixed other issue with sample_csv_data_loader.py relating to index.

Finally, modified sample data so that popular table shows by default again.

I would recommend that we actually replace sample_data_loader with sample_csv_data_loader in the future.
  • Loading branch information
samshuster committed Mar 4, 2020
1 parent 9727740 commit bc14897
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 119 deletions.
94 changes: 94 additions & 0 deletions databuilder/extractor/csv_extractor.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import csv
import importlib
from collections import defaultdict

from pyhocon import ConfigTree # noqa: F401
from typing import Any, Iterator # noqa: F401

from databuilder.extractor.base_extractor import Extractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata


class CsvExtractor(Extractor):
Expand Down Expand Up @@ -61,3 +63,95 @@ def extract(self):
def get_scope(self):
# type: () -> str
return 'extractor.csv'


class CSVTableColumnExtractor(Extractor):
# Config keys
TABLE_FILE_LOCATION = 'table_file_location'
COLUMN_FILE_LOCATION = 'column_file_location'

"""
An Extractor that combines Table and Column CSVs.
"""
def init(self, conf):
# type: (ConfigTree) -> None
"""
:param conf:
"""
self.conf = conf
self.table_file_location = conf.get_string(CSVTableColumnExtractor.TABLE_FILE_LOCATION)
self.column_file_location = conf.get_string(CSVTableColumnExtractor.COLUMN_FILE_LOCATION)
self._load_csv()

def _get_key(self, db, cluster, schema, tbl):
return TableMetadata.TABLE_KEY_FORMAT.format(db=db,
cluster=cluster,
schema=schema,
tbl=tbl)

def _load_csv(self):
# type: () -> None
"""
Create an iterator to execute sql.
"""

with open(self.column_file_location, 'r') as fin:
self.columns = [dict(i) for i in csv.DictReader(fin)]

parsed_columns = defaultdict(list)
for column_dict in self.columns:
db = column_dict['database']
cluster = column_dict['cluster']
schema = column_dict['schema']
table = column_dict['table_name']
id = self._get_key(db, cluster, schema, table)
column = ColumnMetadata(
name=column_dict['name'],
description=column_dict['description'],
col_type=column_dict['col_type'],
sort_order=int(column_dict['sort_order'])
)
parsed_columns[id].append(column)

# Create Table Dictionary
with open(self.table_file_location, 'r') as fin:
tables = [dict(i) for i in csv.DictReader(fin)]

results = []
for table_dict in tables:
db = table_dict['database']
cluster = table_dict['cluster']
schema = table_dict['schema']
table = table_dict['name']
id = self._get_key(db, cluster, schema, table)
columns = parsed_columns[id]
if columns is None:
columns = []
table = TableMetadata(database=table_dict['database'],
cluster=table_dict['cluster'],
schema=table_dict['schema'],
name=table_dict['name'],
description=table_dict['description'],
columns=columns,
is_view=table_dict['is_view'],
tags=table_dict['tags']
)
results.append(table)
self._iter = iter(results)

def extract(self):
# type: () -> Any
"""
Yield the csv result one at a time.
convert the result to model if a model_class is provided
"""
try:
return next(self._iter)
except StopIteration:
return None
except Exception as e:
raise e

def get_scope(self):
# type: () -> str
return 'extractor.csvtablecolumn'
25 changes: 14 additions & 11 deletions example/sample_data/sample_column_usage.csv
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
database,cluster,schema,table_name,column_name,user_email,read_count
hive,gold,test_schema,test_table1,col1,[email protected],100
hive,gold,test_schema,test_table3,col1,[email protected],10
hive,gold,test_schema,test_table3,col1,[email protected],10
hive,gold,test_schema,test_table3,col1,[email protected],10
hive,gold,test_schema,test_table3,col1,[email protected],10
hive,gold,test_schema,test_table3,col1,[email protected],10
hive,gold,test_schema,test_view1,col1,[email protected],10
hive,gold,test_schema,test_view1,col1,[email protected],10
hive,gold,test_schema,test_table2,col1,[email protected],10
hive,gold,test_schema,test_table2,col1,[email protected],10
hive,gold,test_schema,test_table2,col1,[email protected],10
hive,gold,test_schema,test_table1,col1,[email protected],500
hive,gold,test_schema,test_table1,col1,[email protected],100
hive,gold,test_schema,test_table1,col1,[email protected],100
hive,gold,test_schema,test_table1,col1,[email protected],100
hive,gold,test_schema,test_table1,col1,[email protected],100
hive,gold,test_schema,test_table1,col1,[email protected],100
hive,gold,test_schema,test_table1,col1,[email protected],100
hive,gold,test_schema,test_table1,col1,[email protected],100
hive,gold,test_schema,test_table1,col1,[email protected],100
hive,gold,test_schema,test_table1,col1,[email protected],10
hive,gold,test_schema,test_table1,col1,[email protected],10
hive,gold,test_schema,test_table1,col1,[email protected],10
hive,gold,test_schema,test_table2,col1,[email protected],10
hive,gold,test_schema,test_table2,col1,[email protected],10
dynamo,gold,test_schema,test_table2,col1,[email protected],500
58 changes: 46 additions & 12 deletions example/scripts/sample_csv_data_loader.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
import sys
import textwrap
import uuid

import sys
from elasticsearch import Elasticsearch

from databuilder.extractor.csv_extractor import CsvExtractor
from databuilder.extractor.csv_extractor import CSVTableColumnExtractor
from databuilder.extractor.neo4j_es_last_updated_extractor import Neo4jEsLastUpdatedExtractor
from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.publisher import neo4j_csv_publisher
from pyhocon import ConfigFactory

from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer
from elasticsearch import Elasticsearch
from pyhocon import ConfigFactory


es_host = None
neo_host = None
Expand Down Expand Up @@ -177,11 +177,45 @@ def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index
return job


def run_table_column_job(table_path, column_path):
tmp_folder = '/var/tmp/amundsen/table_column'
node_files_folder = '{tmp_folder}/nodes'.format(tmp_folder=tmp_folder)
relationship_files_folder = '{tmp_folder}/relationships'.format(tmp_folder=tmp_folder)
extractor = CSVTableColumnExtractor()
csv_loader = FsNeo4jCSVLoader()
task = DefaultTask(extractor,
loader=csv_loader,
transformer=NoopTransformer())
job_config = ConfigFactory.from_dict({
'extractor.csvtablecolumn.{}'.format(CSVTableColumnExtractor.TABLE_FILE_LOCATION): table_path,
'extractor.csvtablecolumn.{}'.format(CSVTableColumnExtractor.COLUMN_FILE_LOCATION): column_path,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH):
node_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH):
relationship_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.SHOULD_DELETE_CREATED_DIR):
True,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NODE_FILES_DIR):
node_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.RELATION_FILES_DIR):
relationship_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_END_POINT_KEY):
neo4j_endpoint,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_USER):
neo4j_user,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_PASSWORD):
neo4j_password,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.JOB_PUBLISH_TAG):
'unique_tag', # should use unique tag here like {ds}
})
job = DefaultJob(conf=job_config,
task=task,
publisher=Neo4jCsvPublisher())
job.launch()


if __name__ == "__main__":
run_csv_job('example/sample_data/sample_table.csv', 'test_table_metadata',
'databuilder.models.table_metadata.TableMetadata')
run_csv_job('example/sample_data/sample_col.csv', 'test_col_metadata',
'databuilder.models.standalone_column_model.StandaloneColumnMetadata')
run_table_column_job('example/sample_data/sample_table.csv', 'example/sample_data/sample_col.csv')
run_csv_job('example/sample_data/sample_table_column_stats.csv', 'test_table_column_stats',
'databuilder.models.table_stats.TableColumnStats')
run_csv_job('example/sample_data/sample_watermark.csv', 'test_watermark_metadata',
Expand Down Expand Up @@ -217,7 +251,7 @@ def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index
with user, a, b, c, read, own, follow, manager
where user.full_name is not null
return user.email as email, user.first_name as first_name, user.last_name as last_name,
user.full_name as name, user.github_username as github_username, user.team_name as team_name,
user.full_name as full_name, user.github_username as github_username, user.team_name as team_name,
user.employee_type as employee_type, manager.email as manager_email, user.slack_id as slack_id,
user.is_active as is_active,
REDUCE(sum_r = 0, r in COLLECT(DISTINCT read)| sum_r + r.read_count) AS total_read,
Expand Down
97 changes: 1 addition & 96 deletions example/scripts/sample_data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,20 @@
from sqlalchemy.ext.declarative import declarative_base
import textwrap
import uuid
from collections import defaultdict

from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.neo4j_es_last_updated_extractor import Neo4jEsLastUpdatedExtractor
from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.models.table_metadata import ColumnMetadata, TableMetadata
from databuilder.publisher import neo4j_csv_publisher
from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer

from databuilder.extractor.csv_extractor import CSVTableColumnExtractor

es_host = None
neo_host = None
Expand Down Expand Up @@ -506,98 +503,6 @@ def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index
return job


class CSVTableColumnExtractor(Extractor):
# Config keys
TABLE_FILE_LOCATION = 'table_file_location'
COLUMN_FILE_LOCATION = 'column_file_location'

"""
An Extractor that combines Table and Column CSVs.
"""
def init(self, conf):
# type: (ConfigTree) -> None
"""
:param conf:
"""
self.conf = conf
self.table_file_location = conf.get_string(CSVTableColumnExtractor.TABLE_FILE_LOCATION)
self.column_file_location = conf.get_string(CSVTableColumnExtractor.COLUMN_FILE_LOCATION)
self._load_csv()

def _get_key(self, db, cluster, schema, tbl):
return TableMetadata.TABLE_KEY_FORMAT.format(db=db,
cluster=cluster,
schema=schema,
tbl=tbl)

def _load_csv(self):
# type: () -> None
"""
Create an iterator to execute sql.
"""

with open(self.column_file_location, 'r') as fin:
self.columns = [dict(i) for i in csv.DictReader(fin)]

parsed_columns = defaultdict(list)
for column_dict in self.columns:
db = column_dict['database']
cluster = column_dict['cluster']
schema = column_dict['schema']
table = column_dict['table_name']
id = self._get_key(db, cluster, schema, table)
column = ColumnMetadata(
name=column_dict['name'],
description=column_dict['description'],
col_type=column_dict['col_type'],
sort_order=int(column_dict['sort_order'])
)
parsed_columns[id].append(column)

# Create Table Dictionary
with open(self.table_file_location, 'r') as fin:
tables = [dict(i) for i in csv.DictReader(fin)]

results = []
for table_dict in tables:
db = table_dict['database']
cluster = table_dict['cluster']
schema = table_dict['schema']
table = table_dict['name']
id = self._get_key(db, cluster, schema, table)
columns = parsed_columns[id]
if columns is None:
columns = []
table = TableMetadata(database=table_dict['database'],
cluster=table_dict['cluster'],
schema=table_dict['schema'],
name=table_dict['name'],
description=table_dict['description'],
columns=columns,
is_view=table_dict['is_view'],
tags=table_dict['tags']
)
results.append(table)
self._iter = iter(results)

def extract(self):
# type: () -> Any
"""
Yield the csv result one at a time.
convert the result to model if a model_class is provided
"""
try:
return next(self._iter)
except StopIteration:
return None
except Exception as e:
raise e

def get_scope(self):
# type: () -> str
return 'extractor.csvtablecolumn'


def create_table_column_job(table_path, column_path):
tmp_folder = '/var/tmp/amundsen/table_column'
node_files_folder = '{tmp_folder}/nodes'.format(tmp_folder=tmp_folder)
Expand Down

0 comments on commit bc14897

Please sign in to comment.