Skip to content

Commit

Permalink
feat: Add a transformer that adds tags to all tables created in a job (
Browse files Browse the repository at this point in the history
…amundsen-io#287)

* initial commit

* imports

* remove debugging print

* imports

* tests

* tests

* print

* lint

* standardize on single quotes

* fix python 2 compatibility
  • Loading branch information
jdavidheiser authored Jun 17, 2020
1 parent 22e26e8 commit c6a2637
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 9 deletions.
10 changes: 6 additions & 4 deletions databuilder/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -638,21 +638,23 @@ job.launch()
```

#### [TemplateVariableSubstitutionTransformer](./databuilder/transformer/template_variable_substitution_transformer.py)
Adds or replaces field in Dict by string.format based on given template and provide record Dict as a template parameter
Adds or replaces field in Dict by string.format based on given template and provide record Dict as a template parameter.

#### [DictToModel](./databuilder/transformer/dict_to_model.py)
Transforms dictionary into model
Transforms dictionary into model.

#### [TimestampStringToEpoch](./databuilder/transformer/timestamp_string_to_epoch.py)
Transforms string timestamp into int epoch
Transforms string timestamp into int epoch.

#### [RemoveFieldTransformer](./databuilder/transformer/remove_field_transformer.py)
Remove fields from the Dict.

#### [TableTagTransformer](./databuilder/transformer/table_tag_transformer.py)
Adds the same set of tags to all tables produced by the job.

#### [GenericTransformer](./databuilder/transformer/generic_transformer.py)
Transforms dictionary based on callback function that user provides.


## List of loader
#### [FsNeo4jCSVLoader](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/loader/file_system_neo4j_csv_loader.py "FsNeo4jCSVLoader")
Write node and relationship CSV file(s) that can be consumed by Neo4jCsvPublisher. It assumes that the record it consumes is instance of Neo4jCsvSerializable.
Expand Down
16 changes: 11 additions & 5 deletions databuilder/databuilder/models/table_metadata.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import copy
from collections import namedtuple
from six import string_types

from typing import Iterable, Any, Union, Iterator, Dict, Set # noqa: F401

Expand Down Expand Up @@ -260,11 +261,8 @@ def __init__(self,
self.columns = columns if columns else []
self.is_view = is_view
self.attrs = None
if isinstance(tags, str):
tags = list(filter(None, tags.split(',')))
if isinstance(tags, list):
tags = [tag.lower().strip() for tag in tags]
self.tags = tags

self.tags = TableMetadata.format_tags(tags)

if kwargs:
self.attrs = copy.deepcopy(kwargs)
Expand Down Expand Up @@ -331,6 +329,14 @@ def _get_col_description_key(self, col, description):
col=col.name,
description_id=description.get_description_id())

@staticmethod
def format_tags(tags):
if isinstance(tags, string_types):
tags = list(filter(None, tags.split(',')))
if isinstance(tags, list):
tags = [tag.lower().strip() for tag in tags]
return tags

def create_next_node(self):
# type: () -> Union[Dict[str, Any], None]
try:
Expand Down
29 changes: 29 additions & 0 deletions databuilder/databuilder/transformer/table_tag_transformer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from pyhocon import ConfigFactory

from databuilder.transformer.base_transformer import Transformer
from databuilder.models.table_metadata import TableMetadata


class TableTagTransformer(Transformer):
"""Simple transformer that adds tags to all table nodes produced as part of a job."""
# Config
TAGS = 'tags'
DEFAULT_CONFIG = ConfigFactory.from_dict({TAGS: None})

def init(self, conf):
conf = conf.with_fallback(TableTagTransformer.DEFAULT_CONFIG)
tags = conf.get_string(TableTagTransformer.TAGS)

self.tags = TableMetadata.format_tags(tags)

def transform(self, record):
if isinstance(record, TableMetadata):
if record.tags:
record.tags += self.tags
else:
record.tags = self.tags
return record

def get_scope(self):
# type: () -> str
return 'transformer.table_tag'
77 changes: 77 additions & 0 deletions databuilder/tests/unit/transformer/test_table_tag_transformer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import unittest

from pyhocon import ConfigFactory

from databuilder.transformer.table_tag_transformer import TableTagTransformer
from databuilder.models.table_metadata import TableMetadata


class TestTableTagTransformer(unittest.TestCase):
def test_single_tag(self):
transformer = TableTagTransformer()
config = ConfigFactory.from_dict({
TableTagTransformer.TAGS: 'foo',
})
transformer.init(conf=config)

result = transformer.transform(TableMetadata(
database='test_db',
cluster='test_cluster',
schema='test_schema',
name='test_table',
description='',
))

self.assertEqual(result.tags, ['foo'])

def test_multiple_tags_comma_delimited(self):
transformer = TableTagTransformer()
config = ConfigFactory.from_dict({
TableTagTransformer.TAGS: 'foo,bar',
})
transformer.init(conf=config)

result = transformer.transform(TableMetadata(
database='test_db',
cluster='test_cluster',
schema='test_schema',
name='test_table',
description='',
))

self.assertEqual(result.tags, ['foo', 'bar'])

def test_add_tag_to_existing_tags(self):
transformer = TableTagTransformer()
config = ConfigFactory.from_dict({
TableTagTransformer.TAGS: 'baz',
})
transformer.init(conf=config)

result = transformer.transform(TableMetadata(
database='test_db',
cluster='test_cluster',
schema='test_schema',
name='test_table',
description='',
tags='foo,bar',
))
self.assertEqual(result.tags, ['foo', 'bar', 'baz'])

def test_tags_not_added_to_other_objects(self):
transformer = TableTagTransformer()
config = ConfigFactory.from_dict({
TableTagTransformer.TAGS: 'new_tag',
})
transformer.init(conf=config)

class NotATable():
tags = 'existing_tag'

result = transformer.transform(NotATable())

self.assertEqual(result.tags, 'existing_tag')


if __name__ == '__main__':
unittest.main()

0 comments on commit c6a2637

Please sign in to comment.