Skip to content

Commit

Permalink
Provide configuration to control steps for ES publisher (amundsen-io#256
Browse files Browse the repository at this point in the history
)

* Provide configuration on control steps for ES publisher

* change name

* address comment
  • Loading branch information
feng-tao authored and Hans Adriaans committed Jun 30, 2022
1 parent 9430208 commit 6a0e29f
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 8 deletions.
26 changes: 19 additions & 7 deletions databuilder/databuilder/publisher/elasticsearch_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ class ElasticsearchPublisher(Publisher):
ELASTICSEARCH_ALIAS_CONFIG_KEY = 'alias'
ELASTICSEARCH_MAPPING_CONFIG_KEY = 'mapping'

# config to control how many max documents to publish at a time
ELASTICSEARCH_PUBLISHER_BATCH_SIZE = 'batch_size'

# Specifying default mapping for elasticsearch index
# Documentation: https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping.html
# Setting type to "text" for all fields that would be used in search
Expand Down Expand Up @@ -137,7 +140,8 @@ def init(self, conf):

self.elasticsearch_mapping = self.conf.get(ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY,
ElasticsearchPublisher.DEFAULT_ELASTICSEARCH_INDEX_MAPPING)

self.elasticsearch_batch_size = self.conf.get(ElasticsearchPublisher.ELASTICSEARCH_PUBLISHER_BATCH_SIZE,
10000)
self.file_handler = open(self.file_path, self.file_mode)

def _fetch_old_index(self):
Expand Down Expand Up @@ -172,17 +176,25 @@ def publish_impl(self):
# Bulk load JSON format is defined here:
# https://www.elastic.co/guide/en/elasticsearch/reference/6.2/docs-bulk.html
bulk_actions = []
cnt = 0

# create new index with mapping
self.elasticsearch_client.indices.create(index=self.elasticsearch_new_index, body=self.elasticsearch_mapping)
for action in actions:
index_row = dict(index=dict(_index=self.elasticsearch_new_index,
_type=self.elasticsearch_type))
bulk_actions.append(index_row)
bulk_actions.append(action)

# create new index with mapping
self.elasticsearch_client.indices.create(index=self.elasticsearch_new_index, body=self.elasticsearch_mapping)

# bulk upload data
self.elasticsearch_client.bulk(bulk_actions)
cnt += 1
if cnt == self.elasticsearch_batch_size:
self.elasticsearch_client.bulk(bulk_actions)
LOGGER.info('Publish {} of records to ES'.format(str(cnt)))
cnt = 0
bulk_actions = []

# Do the final bulk actions
if bulk_actions:
self.elasticsearch_client.bulk(bulk_actions)

# fetch indices that have {elasticsearch_alias} as alias
elasticsearch_old_indices = self._fetch_old_index()
Expand Down
2 changes: 1 addition & 1 deletion databuilder/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from setuptools import setup, find_packages


__version__ = '2.5.9'
__version__ = '2.5.10'

requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt')
with open(requirements_path) as requirements_file:
Expand Down

0 comments on commit 6a0e29f

Please sign in to comment.