From 1329056584db91d90237fbf0727806e599620111 Mon Sep 17 00:00:00 2001 From: Govind Kamat Date: Thu, 3 Oct 2024 19:01:34 +0000 Subject: [PATCH] Add support for multi-part data corpora downloads. Signed-off-by: Govind Kamat --- osbenchmark/resources/workload-schema.json | 24 ++++++++++++ osbenchmark/workload/loader.py | 17 ++++++++- osbenchmark/workload/workload.py | 4 +- tests/workload/loader_test.py | 44 ++++++++++++++++++++++ 4 files changed, 87 insertions(+), 2 deletions(-) diff --git a/osbenchmark/resources/workload-schema.json b/osbenchmark/resources/workload-schema.json index 788fbf902..7d2102e5a 100644 --- a/osbenchmark/resources/workload-schema.json +++ b/osbenchmark/resources/workload-schema.json @@ -406,10 +406,34 @@ "format": "uri", "description": "The root URL for these documents." }, + "source-url": { + "type": "string", + "format": "uri", + "description": "The full URL to the document file. This is intended for cases like when a signed URL needs to be used to access the file directly." + }, "source-file": { "type": "string", "description": "Name of file containing documents. This file has to be compressed either as bz2, zip or tar.gz and must contain exactly one JSON file with the same name (Examples: documents.json.bz2, documents.zip (which should contain one file called 'documents.json'))." }, + "source-file-parts": { + "type": "array", + "description": "A list of files that should be concatentated to create the source (document) file. This is intended for cases where the document file is large enough that separating it into smaller parts is appropriate (optional).", + "minItems": 1, + "uniqueItems": true, + "items": { + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "The name of the file corresponding to this part." + }, + "size": { + "type": "integer", + "description": "The size of the file corresponding to this part." + } + } + } + }, "source-format": { "type": "string", "description": "Defines in which format Benchmark should interpret the data file specified by 'source-file'. Currently, only 'bulk' is supported." diff --git a/osbenchmark/workload/loader.py b/osbenchmark/workload/loader.py index 0d8114234..96298010f 100644 --- a/osbenchmark/workload/loader.py +++ b/osbenchmark/workload/loader.py @@ -28,6 +28,7 @@ import random import re import sys +import shutil import tempfile import urllib.error @@ -578,7 +579,17 @@ def prepare_document_set(self, document_set, data_root): raise exceptions.BenchmarkAssertionError(f"Workload {self.workload_name} specifies documents but no corpus") try: - self.downloader.download(document_set.base_url, document_set.source_url, target_path, expected_size) + if document_set.document_file_parts: + for part in document_set.document_file_parts: + self.downloader.download(document_set.base_url, None, os.path.join(data_root, part["name"]), part["size"]) + with open(target_path, "wb") as outfile: + console.info(f"Concatenating file parts {', '.join([p['name'] for p in document_set.document_file_parts])}" + "into {os.path.basename(target_path)}", flush=True, logger=self.logger) + for part in document_set.document_file_parts: + with open(os.path.join(data_root, part["name"]), "rb") as infile: + shutil.copyfileobj(infile, outfile) + else: + self.downloader.download(document_set.base_url, document_set.source_url, target_path, expected_size) except exceptions.DataError as e: if e.message == "Cannot download data because no base URL is provided." and \ self.is_locally_available(target_path): @@ -1505,6 +1516,9 @@ def _create_corpora(self, corpora_specs, indices, data_streams): if source_format in workload.Documents.SUPPORTED_SOURCE_FORMAT: source_url = self._r(doc_spec, "source-url", mandatory=False) docs = self._r(doc_spec, "source-file") + document_file_parts = list() + for parts in self._r(doc_spec, "source-file-parts", mandatory=False, default_value=[]): + document_file_parts.append( { "name": self._r(parts, "name"), "size": self._r(parts, "size") } ) if io.is_archive(docs): document_archive = docs document_file = io.splitext(docs)[0] @@ -1554,6 +1568,7 @@ def _create_corpora(self, corpora_specs, indices, data_streams): docs = workload.Documents(source_format=source_format, document_file=document_file, + document_file_parts=document_file_parts, document_archive=document_archive, base_url=base_url, source_url=source_url, diff --git a/osbenchmark/workload/workload.py b/osbenchmark/workload/workload.py index 848cb5b3b..d221e22e5 100644 --- a/osbenchmark/workload/workload.py +++ b/osbenchmark/workload/workload.py @@ -190,7 +190,7 @@ class Documents: SOURCE_FORMAT_BIG_ANN = "big-ann" SUPPORTED_SOURCE_FORMAT = [SOURCE_FORMAT_BULK, SOURCE_FORMAT_HDF5, SOURCE_FORMAT_BIG_ANN] - def __init__(self, source_format, document_file=None, document_archive=None, base_url=None, source_url=None, + def __init__(self, source_format, document_file=None, document_file_parts=None, document_archive=None, base_url=None, source_url=None, includes_action_and_meta_data=False, number_of_documents=0, compressed_size_in_bytes=0, uncompressed_size_in_bytes=0, target_index=None, target_data_stream=None, target_type=None, meta_data=None): @@ -199,6 +199,7 @@ def __init__(self, source_format, document_file=None, document_archive=None, bas :param source_format: The format of these documents. Mandatory. :param document_file: The file name of benchmark documents after decompression. Optional (e.g. for percolation we just need a mapping but no documents) + :param document_file_parts: If the document file is provided as parts, a list of dicts, each holding the filename and file size. :param document_archive: The file name of the compressed benchmark document name on the remote server. Optional (e.g. for percolation we just need a mapping but no documents) :param base_url: The URL from which to load data if they are not available locally. Excludes the file or object name. Optional. @@ -223,6 +224,7 @@ def __init__(self, source_format, document_file=None, document_archive=None, bas self.source_format = source_format self.document_file = document_file + self.document_file_parts = document_file_parts self.document_archive = document_archive self.base_url = base_url self.source_url = source_url diff --git a/tests/workload/loader_test.py b/tests/workload/loader_test.py index 285e52772..bff592bb8 100644 --- a/tests/workload/loader_test.py +++ b/tests/workload/loader_test.py @@ -863,6 +863,50 @@ def test_prepare_bundled_document_set_uncompressed_docs_wrong_size(self, is_file self.assertEqual(0, prepare_file_offset_table.call_count) +class WorkloadPreparationTests_1(TestCase): + @mock.patch("osbenchmark.utils.io.prepare_file_offset_table") + @mock.patch("osbenchmark.utils.net.download") + @mock.patch("osbenchmark.utils.io.ensure_dir") + @mock.patch("os.path.getsize") + @mock.patch("os.path.isfile") + def test_download_document_file_from_part_files(self, is_file, get_size, ensure_dir, download, prepare_file_offset_table): + # uncompressed file does not exist + # after download uncompressed file exists + # after download uncompressed file exists (main loop) + is_file.side_effect = [False, True, True, True, True] + # uncompressed file size is 2000 + get_size.side_effect = [1000, 600, 400, 2000] + + prepare_file_offset_table.return_value = 5 + + p = loader.DocumentSetPreparator(workload_name="unit-test", + downloader=loader.Downloader(offline=False, test_mode=False), + decompressor=loader.Decompressor()) + + mo = mock.mock_open() + with mock.patch("builtins.open", mo): + p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, + base_url="http://benchmarks.opensearch.org/corpora/unit-test", + document_file="docs.json", + document_file_parts=[ {"name": "xaa", "size": 1000 }, + {"name": "xab", "size": 600 }, + {"name": "xac", "size": 400 } ], + # --> We don't provide a document archive here <-- + document_archive=None, + number_of_documents=5, + compressed_size_in_bytes=200, + uncompressed_size_in_bytes=2000), + data_root="/tmp") + + ensure_dir.assert_called_with("/tmp") + calls = [ mock.call('http://benchmarks.opensearch.org/corpora/unit-test/xaa', '/tmp/xaa', 1000, progress_indicator=mock.ANY), + mock.call('http://benchmarks.opensearch.org/corpora/unit-test/xab', '/tmp/xab', 600, progress_indicator=mock.ANY), + mock.call('http://benchmarks.opensearch.org/corpora/unit-test/xac', '/tmp/xac', 400, progress_indicator=mock.ANY) ] + + download.assert_has_calls(calls) + prepare_file_offset_table.assert_called_with("/tmp/docs.json", 'http://benchmarks.opensearch.org/corpora/unit-test', + None, InstanceOf(loader.Downloader)) + class TemplateSource(TestCase): @mock.patch("osbenchmark.utils.io.dirname") @mock.patch.object(loader.TemplateSource, "read_glob_files")