Skip to content

Commit

Permalink
Add perf test for nested field
Browse files Browse the repository at this point in the history
Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 committed Jan 18, 2024
1 parent 1ab9305 commit 2d975c9
Show file tree
Hide file tree
Showing 10 changed files with 356 additions and 17 deletions.
50 changes: 50 additions & 0 deletions benchmarks/perf-tool/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,26 @@ Ingests a dataset of multiple context types into the cluster.
| ----------- | ----------- | ----------- |
| took | Total time to ingest the dataset into the index.| ms |

#### ingest_nested_field

Ingests a dataset with nested field into the cluster.

##### Parameters

| Parameter Name | Description | Default |
| ----------- |------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ----------- |
| index_name | Name of index to ingest into | No default |
| field_name | Name of field to ingest into | No default |
| dataset_path | Path to data-set | No default |
| attributes_dataset_name | Name of dataset with additional attributes inside the main dataset | No default |
| attribute_spec | Definition of attributes, format is: [{ name: [name_val], type: [type_val]}] Order is important and must match order of attributes column in dataset file. It should contains { name: 'parent_id', type: 'int'} | No default |

##### Metrics

| Metric Name | Description | Unit |
| ----------- | ----------- | ----------- |
| took | Total time to ingest the dataset into the index.| ms |

#### query

Runs a set of queries against an index.
Expand Down Expand Up @@ -330,6 +350,36 @@ Runs a set of queries with filter against an index.
| recall@R | ratio of top R results from the ground truth neighbors that are in the K results returned by the plugin | float 0.0-1.0 |
| recall@K | ratio of results returned that were ground truth nearest neighbors | float 0.0-1.0 |


#### query_nested_field

Runs a set of queries with nested field against an index.

##### Parameters

| Parameter Name | Description | Default |
| ----------- |-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------|
| k | Number of neighbors to return on search | 100 |
| r | r value in Recall@R | 1 |
| index_name | Name of index to search | No default |
| field_name | Name field to search | No default |
| calculate_recall | Whether to calculate recall values | False |
| dataset_format | Format the dataset is in. Currently hdf5 and bigann is supported. The hdf5 file must be organized in the same way that the ann-benchmarks organizes theirs. | 'hdf5' |
| dataset_path | Path to dataset | No default |
| neighbors_format | Format the neighbors dataset is in. Currently hdf5 and bigann is supported. The hdf5 file must be organized in the same way that the ann-benchmarks organizes theirs. | 'hdf5' |
| neighbors_path | Path to neighbors dataset | No default |
| neighbors_dataset | Name of filter dataset inside the neighbors dataset | No default |
| query_count | Number of queries to create from data-set | Size of the data-set |

##### Metrics

| Metric Name | Description | Unit |
| ----------- | ----------- | ----------- |
| took | Took times returned per query aggregated as total, p50, p90 and p99 (when applicable) | ms |
| memory_kb | Native memory k-NN is using at the end of the query workload | KB |
| recall@R | ratio of top R results from the ground truth neighbors that are in the K results returned by the plugin | float 0.0-1.0 |
| recall@K | ratio of results returned that were ground truth nearest neighbors | float 0.0-1.0 |

#### get_stats

Gets the index stats.
Expand Down
25 changes: 11 additions & 14 deletions benchmarks/perf-tool/add-parent-doc-id-to-dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,18 +152,18 @@ def run(self, source_path, target_path) -> None:
cpus = multiprocessing.cpu_count()
total_clients = min(8, cpus) # 1 # 10
hdf5Data_train = HDF5DataSet(target_path, "train")
train_vectors = hdf5Data_train.read(0, 1000000)
train_vectors = hdf5Data_train.read(0, hdf5Data_train.size())
hdf5Data_train.close()
print(f'Train vector size: {len(train_vectors)}')

hdf5Data_test = HDF5DataSet(target_path, "test")
total_queries = 10000 # 10000
total_queries = hdf5Data_test.size() # 10000
dis = [] * total_queries

for i in range(total_queries):
dis.insert(i, [])

queries_per_client = int(total_queries / total_clients)
queries_per_client = int(total_queries / total_clients + 0.5)
if queries_per_client == 0:
queries_per_client = total_queries

Expand All @@ -176,7 +176,7 @@ def run(self, source_path, target_path) -> None:
if start_index + queries_per_client <= total_queries:
end_index = int(start_index + queries_per_client)
else:
end_index = total_queries - start_index
end_index = total_queries

print(f'Start Index: {start_index}, end Index: {end_index}')
print(f'client is : {client}')
Expand All @@ -199,7 +199,6 @@ def run(self, source_path, target_path) -> None:
i = 0
for d in calculatedDis:
if d:
print("Dis is not null")
dis[i] = d
j = j + 1
i = i + 1
Expand Down Expand Up @@ -238,14 +237,12 @@ def queryTask(train_vectors, test_vectors, startIndex, endIndex, process_number,
i = startIndex
for test in test_vectors:
distances = []
parent_ids = {}
values = {}
for value in train_vectors:
parent_ids[value.id] = value.parent_id
values[value.id] = value
distances.append({
"dis": calculateL2Distance(test.vector, value.vector),
"id": value.id
"id": value.parent_id
})

distances.sort(key=lambda vector: vector['dis'])
Expand All @@ -258,15 +255,15 @@ def queryTask(train_vectors, test_vectors, startIndex, endIndex, process_number,
for sub_i in range(len(distances)):
id = distances[sub_i]['id']
# Check if the number has been seen before
if len(nested) < 1000 and parent_ids[id] not in seen_set_nested:
if len(nested) < 1000 and id not in seen_set_nested:
# If not seen before, mark it as seen
seen_set_nested.add(parent_ids[id])
seen_set_nested.add(id)
nested.append(distances[sub_i])
if len(restricted) < 1000 and parent_ids[id] not in seen_set_restricted and values[id].apply_restricted_filter():
seen_set_restricted.add(parent_ids[id])
if len(restricted) < 1000 and id not in seen_set_restricted and values[id].apply_restricted_filter():
seen_set_restricted.add(id)
restricted.append(distances[sub_i])
if len(relaxed) < 1000 and parent_ids[id] not in seen_set_relaxed and values[id].apply_relaxed_filter():
seen_set_relaxed.add(parent_ids[id])
if len(relaxed) < 1000 and id not in seen_set_relaxed and values[id].apply_relaxed_filter():
seen_set_relaxed.add(id)
relaxed.append(distances[sub_i])

all_distances[i]['nested'] = nested
Expand Down
Binary file added benchmarks/perf-tool/dataset/data-nested.hdf5
Binary file not shown.
Binary file not shown.
6 changes: 5 additions & 1 deletion benchmarks/perf-tool/okpt/test/steps/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from okpt.test.steps.steps import CreateIndexStep, DisableRefreshStep, RefreshIndexStep, DeleteIndexStep, \
TrainModelStep, DeleteModelStep, ForceMergeStep, ClearCacheStep, IngestStep, IngestMultiFieldStep, \
QueryStep, QueryWithFilterStep, GetStatsStep, WarmupStep
IngestNestedFieldStep, QueryStep, QueryWithFilterStep, QueryNestedFieldStep, GetStatsStep, WarmupStep


def create_step(step_config: StepConfig) -> Step:
Expand All @@ -30,10 +30,14 @@ def create_step(step_config: StepConfig) -> Step:
return IngestStep(step_config)
elif step_config.step_name == IngestMultiFieldStep.label:
return IngestMultiFieldStep(step_config)
elif step_config.step_name == IngestNestedFieldStep.label:
return IngestNestedFieldStep(step_config)
elif step_config.step_name == QueryStep.label:
return QueryStep(step_config)
elif step_config.step_name == QueryWithFilterStep.label:
return QueryWithFilterStep(step_config)
elif step_config.step_name == QueryNestedFieldStep.label:
return QueryNestedFieldStep(step_config)
elif step_config.step_name == ForceMergeStep.label:
return ForceMergeStep(step_config)
elif step_config.step_name == ClearCacheStep.label:
Expand Down
155 changes: 153 additions & 2 deletions benchmarks/perf-tool/okpt/test/steps/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,6 @@ def action(doc_id):
for i in range(0, self.doc_count, self.bulk_size):
partition = self.dataset.read(self.bulk_size)
self._handle_data_bulk(partition, action, i)

self.dataset.reset()

return {}
Expand Down Expand Up @@ -379,6 +378,7 @@ def __init__(self, step_config: StepConfig):
step_config.config, {}, [])

self.partition_attr = self.attributes_dataset.read(self.doc_count)
self.action_buffer = None

def _handle_data_bulk(self, partition, action, i):
if partition is None:
Expand Down Expand Up @@ -429,6 +429,118 @@ def bulk_transform_with_attributes(self, partition: np.ndarray, partition_attr,
return actions


class IngestNestedFieldStep(BaseIngestStep):
"""See base class."""

label = 'ingest_nested_field'

def __init__(self, step_config: StepConfig):
super().__init__(step_config)

dataset_path = parse_string_param('dataset_path', step_config.config,
{}, None)

self.attributes_dataset_name = parse_string_param('attributes_dataset_name',
step_config.config, {}, None)

self.attributes_dataset = parse_dataset('hdf5', dataset_path,
Context.CUSTOM, self.attributes_dataset_name)

self.attribute_spec = parse_list_param('attribute_spec',
step_config.config, {}, [])

self.partition_attr = self.attributes_dataset.read(self.doc_count)

if self.dataset.size() != self.doc_count:
raise ValueError("custom doc_count is not supported for nested field")
self.action_buffer = None
self.action_parent_id = None
self.count = 0

def _handle_data_bulk(self, partition, action, i):
if partition is None:
return
body = self.bulk_transform_with_nested(partition, self.partition_attr, self.field_name,
action, i, self.attribute_spec)
if len(body) > 0:
bulk_index(self.opensearch, self.index_name, body)

def bulk_transform_with_nested(self, partition: np.ndarray, partition_attr, field_name: str,
action, offset: int, attributes_def) -> List[Dict[str, Any]]:
"""Partitions and transforms a list of vectors into OpenSearch's bulk
injection format.
Args:
partition: An array of vectors to transform.
partition_attr: dictionary of additional data to transform
field_name: field name for action
action: Bulk API action.
offset: to start counting from
attributes_def: definition of additional doc fields
Returns:
An array of transformed vectors in bulk format.
"""
# offset is index of start row. We need number of parent doc - 1.
# The number of parent document can be calculated by using partition_attr data.
# We need to keep the last parent doc aside so that additional data can be added later.
parent_id_idx = next((index for (index, d) in enumerate(attributes_def) if d.get('name') == 'parent_id'), None)
if parent_id_idx is None:
raise ValueError("parent_id should be provided as attribute spec")
if attributes_def[parent_id_idx]['type'] != 'int':
raise ValueError("parent_id should be int type")

first_index = offset
last_index = offset + len(partition) - 1
num_of_actions = int(partition_attr[last_index][parent_id_idx].decode()) - int(partition_attr[first_index][parent_id_idx].decode())
if self.action_buffer is None:
self.action_buffer = {"nested_field": []}
self.action_parent_id = int(partition_attr[first_index][parent_id_idx].decode())

actions = []
_ = [
actions.extend([action(i + self.action_parent_id), None])
for i in range(num_of_actions)
]

idx = 1
part_list = partition.tolist()
for i in range(len(partition)):
self.count += 1
nested = {field_name: part_list[i]}
attr_idx = i + offset
attr_def_idx = 0
current_parent_id = None
for attribute in attributes_def:
attr_def_name = attribute['name']
attr_def_type = attribute['type']
if attr_def_name == "parent_id":
current_parent_id = int(partition_attr[attr_idx][attr_def_idx].decode())
attr_def_idx += 1
continue

if attr_def_type == 'str':
val = partition_attr[attr_idx][attr_def_idx].decode()
if val != 'None':
nested[attr_def_name] = val
elif attr_def_type == 'int':
val = int(partition_attr[attr_idx][attr_def_idx].decode())
nested[attr_def_name] = val
attr_def_idx += 1

if self.action_parent_id == current_parent_id:
self.action_buffer["nested_field"].append(nested)
else:
actions.extend([action(self.action_parent_id), self.action_buffer])
self.action_buffer = {"nested_field": []}
self.action_buffer["nested_field"].append(nested)
self.action_parent_id = current_parent_id
idx += 2

if self.count == self.doc_count:
actions.extend([action(self.action_parent_id), self.action_buffer])

return actions


class BaseQueryStep(OpenSearchStep):
"""See base class."""

Expand Down Expand Up @@ -469,7 +581,7 @@ def _action(self):
break
query_responses.append(
query_index(self.opensearch, self.index_name,
self.get_body(query[0]) , [self.field_name]))
self.get_body(query[0]) , self.get_exclude_fields()))

results['took'] = [
float(query_response['took']) for query_response in query_responses
Expand Down Expand Up @@ -506,6 +618,8 @@ def _get_measures(self) -> List[str]:
def get_body(self, vec):
pass

def get_exclude_fields(self):
return [self.field_name]

class QueryStep(BaseQueryStep):
"""See base class."""
Expand Down Expand Up @@ -611,6 +725,43 @@ def get_body(self, vec):
else:
raise ConfigurationError('Not supported filter type {}'.format(self.filter_type))

class QueryNestedFieldStep(BaseQueryStep):
"""See base class."""

label = 'query_nested_field'

def __init__(self, step_config: StepConfig):
super().__init__(step_config)

neighbors_dataset = parse_string_param('neighbors_dataset',
step_config.config, {}, None)

self.neighbors = parse_dataset(self.neighbors_format, self.neighbors_path,
Context.CUSTOM, neighbors_dataset)

self.implicit_config = step_config.implicit_config

def get_body(self, vec):
return {
'size': self.k,
'query': {
'nested': {
'path': 'nested_field',
'query': {
'knn': {
'nested_field.' + self.field_name: {
'vector': vec,
'k': self.k
}
}
}
}
}
}

def get_exclude_fields(self):
return ['nested_field.' + self.field_name]

class GetStatsStep(OpenSearchStep):
"""See base class."""

Expand Down
Loading

0 comments on commit 2d975c9

Please sign in to comment.