Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for multi-part data corpora downloads #677

Merged
merged 1 commit into from
Oct 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions osbenchmark/resources/workload-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -406,10 +406,34 @@
"format": "uri",
"description": "The root URL for these documents."
},
"source-url": {
"type": "string",
"format": "uri",
"description": "The full URL to the document file. This is intended for cases like when a signed URL needs to be used to access the file directly."
},
"source-file": {
"type": "string",
"description": "Name of file containing documents. This file has to be compressed either as bz2, zip or tar.gz and must contain exactly one JSON file with the same name (Examples: documents.json.bz2, documents.zip (which should contain one file called 'documents.json'))."
},
"source-file-parts": {
"type": "array",
"description": "A list of files that should be concatentated to create the source (document) file. This is intended for cases where the document file is large enough that separating it into smaller parts is appropriate (optional).",
"minItems": 1,
"uniqueItems": true,
"items": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "The name of the file corresponding to this part."
},
"size": {
"type": "integer",
"description": "The size of the file corresponding to this part."
}
}
}
},
"source-format": {
"type": "string",
"description": "Defines in which format Benchmark should interpret the data file specified by 'source-file'. Currently, only 'bulk' is supported."
Expand Down
17 changes: 16 additions & 1 deletion osbenchmark/workload/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import random
import re
import sys
import shutil
import tempfile
import urllib.error

Expand Down Expand Up @@ -578,7 +579,17 @@ def prepare_document_set(self, document_set, data_root):
raise exceptions.BenchmarkAssertionError(f"Workload {self.workload_name} specifies documents but no corpus")

try:
self.downloader.download(document_set.base_url, document_set.source_url, target_path, expected_size)
if document_set.document_file_parts:
for part in document_set.document_file_parts:
self.downloader.download(document_set.base_url, None, os.path.join(data_root, part["name"]), part["size"])
with open(target_path, "wb") as outfile:
console.info(f"Concatenating file parts {', '.join([p['name'] for p in document_set.document_file_parts])}"
"into {os.path.basename(target_path)}", flush=True, logger=self.logger)
for part in document_set.document_file_parts:
with open(os.path.join(data_root, part["name"]), "rb") as infile:
shutil.copyfileobj(infile, outfile)
else:
self.downloader.download(document_set.base_url, document_set.source_url, target_path, expected_size)
except exceptions.DataError as e:
if e.message == "Cannot download data because no base URL is provided." and \
self.is_locally_available(target_path):
Expand Down Expand Up @@ -1505,6 +1516,9 @@ def _create_corpora(self, corpora_specs, indices, data_streams):
if source_format in workload.Documents.SUPPORTED_SOURCE_FORMAT:
source_url = self._r(doc_spec, "source-url", mandatory=False)
docs = self._r(doc_spec, "source-file")
document_file_parts = list()
for parts in self._r(doc_spec, "source-file-parts", mandatory=False, default_value=[]):
document_file_parts.append( { "name": self._r(parts, "name"), "size": self._r(parts, "size") } )
if io.is_archive(docs):
document_archive = docs
document_file = io.splitext(docs)[0]
Expand Down Expand Up @@ -1554,6 +1568,7 @@ def _create_corpora(self, corpora_specs, indices, data_streams):

docs = workload.Documents(source_format=source_format,
document_file=document_file,
document_file_parts=document_file_parts,
document_archive=document_archive,
base_url=base_url,
source_url=source_url,
Expand Down
4 changes: 3 additions & 1 deletion osbenchmark/workload/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ class Documents:
SOURCE_FORMAT_BIG_ANN = "big-ann"
SUPPORTED_SOURCE_FORMAT = [SOURCE_FORMAT_BULK, SOURCE_FORMAT_HDF5, SOURCE_FORMAT_BIG_ANN]

def __init__(self, source_format, document_file=None, document_archive=None, base_url=None, source_url=None,
def __init__(self, source_format, document_file=None, document_file_parts=None, document_archive=None, base_url=None, source_url=None,
includes_action_and_meta_data=False,
number_of_documents=0, compressed_size_in_bytes=0, uncompressed_size_in_bytes=0, target_index=None,
target_data_stream=None, target_type=None, meta_data=None):
Expand All @@ -199,6 +199,7 @@ def __init__(self, source_format, document_file=None, document_archive=None, bas
:param source_format: The format of these documents. Mandatory.
:param document_file: The file name of benchmark documents after decompression. Optional (e.g. for percolation we
just need a mapping but no documents)
:param document_file_parts: If the document file is provided as parts, a list of dicts, each holding the filename and file size.
:param document_archive: The file name of the compressed benchmark document name on the remote server. Optional (e.g. for
percolation we just need a mapping but no documents)
:param base_url: The URL from which to load data if they are not available locally. Excludes the file or object name. Optional.
Expand All @@ -223,6 +224,7 @@ def __init__(self, source_format, document_file=None, document_archive=None, bas

self.source_format = source_format
self.document_file = document_file
self.document_file_parts = document_file_parts
self.document_archive = document_archive
self.base_url = base_url
self.source_url = source_url
Expand Down
44 changes: 44 additions & 0 deletions tests/workload/loader_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,50 @@ def test_prepare_bundled_document_set_uncompressed_docs_wrong_size(self, is_file
self.assertEqual(0, prepare_file_offset_table.call_count)


class WorkloadPreparationTests_1(TestCase):
@mock.patch("osbenchmark.utils.io.prepare_file_offset_table")
@mock.patch("osbenchmark.utils.net.download")
@mock.patch("osbenchmark.utils.io.ensure_dir")
@mock.patch("os.path.getsize")
@mock.patch("os.path.isfile")
def test_download_document_file_from_part_files(self, is_file, get_size, ensure_dir, download, prepare_file_offset_table):
# uncompressed file does not exist
# after download uncompressed file exists
# after download uncompressed file exists (main loop)
is_file.side_effect = [False, True, True, True, True]
# uncompressed file size is 2000
get_size.side_effect = [1000, 600, 400, 2000]

prepare_file_offset_table.return_value = 5

p = loader.DocumentSetPreparator(workload_name="unit-test",
downloader=loader.Downloader(offline=False, test_mode=False),
decompressor=loader.Decompressor())

mo = mock.mock_open()
with mock.patch("builtins.open", mo):
p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK,
base_url="http://benchmarks.opensearch.org/corpora/unit-test",
document_file="docs.json",
document_file_parts=[ {"name": "xaa", "size": 1000 },
{"name": "xab", "size": 600 },
{"name": "xac", "size": 400 } ],
# --> We don't provide a document archive here <--
document_archive=None,
number_of_documents=5,
compressed_size_in_bytes=200,
uncompressed_size_in_bytes=2000),
data_root="/tmp")

ensure_dir.assert_called_with("/tmp")
calls = [ mock.call('http://benchmarks.opensearch.org/corpora/unit-test/xaa', '/tmp/xaa', 1000, progress_indicator=mock.ANY),
mock.call('http://benchmarks.opensearch.org/corpora/unit-test/xab', '/tmp/xab', 600, progress_indicator=mock.ANY),
mock.call('http://benchmarks.opensearch.org/corpora/unit-test/xac', '/tmp/xac', 400, progress_indicator=mock.ANY) ]

download.assert_has_calls(calls)
prepare_file_offset_table.assert_called_with("/tmp/docs.json", 'http://benchmarks.opensearch.org/corpora/unit-test',
None, InstanceOf(loader.Downloader))

class TemplateSource(TestCase):
@mock.patch("osbenchmark.utils.io.dirname")
@mock.patch.object(loader.TemplateSource, "read_glob_files")
Expand Down
Loading