diff --git a/databuilder/databuilder/publisher/elasticsearch_publisher.py b/databuilder/databuilder/publisher/elasticsearch_publisher.py index f5996a9c6e..4c49d98608 100644 --- a/databuilder/databuilder/publisher/elasticsearch_publisher.py +++ b/databuilder/databuilder/publisher/elasticsearch_publisher.py @@ -29,6 +29,9 @@ class ElasticsearchPublisher(Publisher): ELASTICSEARCH_ALIAS_CONFIG_KEY = 'alias' ELASTICSEARCH_MAPPING_CONFIG_KEY = 'mapping' + # config to control how many max documents to publish at a time + ELASTICSEARCH_PUBLISHER_BATCH_SIZE = 'batch_size' + # Specifying default mapping for elasticsearch index # Documentation: https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping.html # Setting type to "text" for all fields that would be used in search @@ -137,7 +140,8 @@ def init(self, conf): self.elasticsearch_mapping = self.conf.get(ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY, ElasticsearchPublisher.DEFAULT_ELASTICSEARCH_INDEX_MAPPING) - + self.elasticsearch_batch_size = self.conf.get(ElasticsearchPublisher.ELASTICSEARCH_PUBLISHER_BATCH_SIZE, + 10000) self.file_handler = open(self.file_path, self.file_mode) def _fetch_old_index(self): @@ -172,17 +176,25 @@ def publish_impl(self): # Bulk load JSON format is defined here: # https://www.elastic.co/guide/en/elasticsearch/reference/6.2/docs-bulk.html bulk_actions = [] + cnt = 0 + + # create new index with mapping + self.elasticsearch_client.indices.create(index=self.elasticsearch_new_index, body=self.elasticsearch_mapping) for action in actions: index_row = dict(index=dict(_index=self.elasticsearch_new_index, _type=self.elasticsearch_type)) bulk_actions.append(index_row) bulk_actions.append(action) - - # create new index with mapping - self.elasticsearch_client.indices.create(index=self.elasticsearch_new_index, body=self.elasticsearch_mapping) - - # bulk upload data - self.elasticsearch_client.bulk(bulk_actions) + cnt += 1 + if cnt == self.elasticsearch_batch_size: + self.elasticsearch_client.bulk(bulk_actions) + LOGGER.info('Publish {} of records to ES'.format(str(cnt))) + cnt = 0 + bulk_actions = [] + + # Do the final bulk actions + if bulk_actions: + self.elasticsearch_client.bulk(bulk_actions) # fetch indices that have {elasticsearch_alias} as alias elasticsearch_old_indices = self._fetch_old_index() diff --git a/databuilder/setup.py b/databuilder/setup.py index d3959c7025..004052e112 100644 --- a/databuilder/setup.py +++ b/databuilder/setup.py @@ -2,7 +2,7 @@ from setuptools import setup, find_packages -__version__ = '2.5.9' +__version__ = '2.5.10' requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt') with open(requirements_path) as requirements_file: