Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename "score" to "_distance" #1055

Merged
merged 3 commits into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions python/python/tests/test_lance.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ def test_nearest(tmp_path):
top10 = dataset.to_table(
nearest={"column": "emb", "q": arr[0].values, "k": 10, "nprobes": 10}
)
scores = l2sq(arr[0].values, npvals.reshape((100, 32)))
indices = np.argsort(scores)
distances = l2sq(arr[0].values, npvals.reshape((100, 32)))
indices = np.argsort(distances)
assert tbl.take(indices[:10]).to_pandas().equals(top10.to_pandas()[["emb"]])
assert np.allclose(scores[indices[:10]], top10.to_pandas().score.values)
assert np.allclose(distances[indices[:10]], top10.to_pandas()["_distance"].values)


def l2sq(vec, mat):
Expand All @@ -114,8 +114,10 @@ def test_nearest_cosine(tmp_path):
nearest={"column": "vector", "q": q, "k": 10, "metric": "cosine"}
).to_pandas()
for i in range(len(rs)):
assert rs.score[i] == pytest.approx(cosine_distance(rs.vector[i], q), abs=1e-6)
assert 0 <= rs.score[i] <= 1
assert rs["_distance"][i] == pytest.approx(
cosine_distance(rs.vector[i], q), abs=1e-6
)
assert 0 <= rs["_distance"][i] <= 1


def cosine_distance(vec1, vec2):
Expand Down
6 changes: 3 additions & 3 deletions python/python/tests/test_vector_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def run(ds, q=None, assert_func=None):
expected_columns.extend(ds.schema.names)
else:
expected_columns.extend(columns)
for c in ["vector", "score"]:
for c in ["vector", "_distance"]:
if c not in expected_columns:
expected_columns.append(c)

Expand All @@ -96,8 +96,8 @@ def run(ds, q=None, assert_func=None):
assert len(inmem.to_table(filter=filter_)) == len(rs)
else:
assert len(rs) == 15
scores = rs["score"].to_numpy()
assert (scores.max() - scores.min()) > 1e-6
distances = rs["_distance"].to_numpy()
assert (distances.max() - distances.min()) > 1e-6
if assert_func is not None:
assert_func(rs)
return times
Expand Down
52 changes: 29 additions & 23 deletions rust/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ impl Scanner {
self
}

/// The Arrow schema of the output, including projections and vector / score
/// The Arrow schema of the output, including projections and vector / _distance
pub fn schema(&self) -> Result<SchemaRef> {
let schema = self
.output_schema()
Expand All @@ -336,7 +336,7 @@ impl Scanner {
message: format!("Failed to convert vector field: {}", e),
})?;
extra_columns.push(vector_field);
extra_columns.push(ArrowField::new("score", DataType::Float32, false));
extra_columns.push(ArrowField::new("_distance", DataType::Float32, false));
};
if self.with_row_id {
extra_columns.push(ArrowField::new(ROW_ID, DataType::UInt64, false));
Expand Down Expand Up @@ -488,14 +488,14 @@ impl Scanner {
}
}

let knn_node = self.ann(q, index)?; // score, _rowid
let knn_node = self.ann(q, index)?; // _distance, _rowid
let with_vector = self.dataset.schema().project(&[&q.column])?;
let knn_node_with_vector = self.take(knn_node, &with_vector)?;
let mut knn_node = if q.refine_factor.is_some() {
self.flat_knn(knn_node_with_vector, q)?
} else {
knn_node_with_vector
}; // vector, score, _rowid
}; // vector, _distance, _rowid

knn_node = self.knn_combined(&q, index, knn_node).await?;

Expand Down Expand Up @@ -545,8 +545,8 @@ impl Scanner {
let topk_appended = self.flat_knn(scan_node, q)?;

// To do a union, we need to make the schemas match. Right now
// knn_node: score, _rowid, vector
// topk_appended: vector, _rowid, score
// knn_node: _distance, _rowid, vector
// topk_appended: vector, _rowid, _distance
let new_schema = Schema::try_from(&topk_appended.schema().project(&[2, 1, 0])?)?;
let topk_appended = ProjectionExec::try_new(topk_appended, Arc::new(new_schema))?;
assert_eq!(topk_appended.schema(), knn_node.schema());
Expand Down Expand Up @@ -948,7 +948,7 @@ mod test {
),
true,
),
ArrowField::new("score", DataType::Float32, false),
ArrowField::new("_distance", DataType::Float32, false),
])
);

Expand Down Expand Up @@ -1107,7 +1107,7 @@ mod test {
),
true,
),
ArrowField::new("score", DataType::Float32, false),
ArrowField::new("_distance", DataType::Float32, false),
])
);

Expand Down Expand Up @@ -1157,7 +1157,7 @@ mod test {
),
true,
),
ArrowField::new("score", DataType::Float32, false),
ArrowField::new("_distance", DataType::Float32, false),
])
);

Expand Down Expand Up @@ -1376,7 +1376,7 @@ mod test {
/// Query: nearest(vec, [...], 10) + filter(i > 10 and i < 20)
///
/// Expected plan:
/// KNNIndex(vec) -> Take(i) -> filter(i) -> take(s, vec) -> projection(s, vec, score)
/// KNNIndex(vec) -> Take(i) -> filter(i) -> take(s, vec) -> projection(s, vec, _distance)
#[tokio::test]
async fn test_ann_with_index() {
let test_dir = tempdir().unwrap();
Expand All @@ -1398,14 +1398,14 @@ mod test {
.iter()
.map(|f| f.name())
.collect::<Vec<_>>(),
vec!["s", "vec", "score"]
vec!["s", "vec", "_distance"]
);

let take = &plan.children()[0];
let take = take.as_any().downcast_ref::<TakeExec>().unwrap();
assert_eq!(
take.schema().field_names(),
["score", "_rowid", "vec", "i", "s"]
["_distance", "_rowid", "vec", "i", "s"]
);
assert_eq!(
take.extra_schema
Expand All @@ -1420,12 +1420,15 @@ mod test {
assert!(filter.as_any().is::<FilterExec>());
assert_eq!(
filter.schema().field_names(),
["score", "_rowid", "vec", "i"]
["_distance", "_rowid", "vec", "i"]
);

let take = &filter.children()[0];
let take = take.as_any().downcast_ref::<TakeExec>().unwrap();
assert_eq!(take.schema().field_names(), ["score", "_rowid", "vec", "i"]);
assert_eq!(
take.schema().field_names(),
["_distance", "_rowid", "vec", "i"]
);
assert_eq!(
take.extra_schema
.fields
Expand All @@ -1438,7 +1441,7 @@ mod test {
// TODO: Two continuous take execs, we can merge them into one.
let take = &take.children()[0];
let take = take.as_any().downcast_ref::<TakeExec>().unwrap();
assert_eq!(take.schema().field_names(), ["score", "_rowid", "vec"]);
assert_eq!(take.schema().field_names(), ["_distance", "_rowid", "vec"]);
assert_eq!(
take.extra_schema
.fields
Expand All @@ -1450,7 +1453,7 @@ mod test {

let knn = &take.children()[0];
assert!(knn.as_any().is::<KNNIndexExec>());
assert_eq!(knn.schema().field_names(), ["score", "_rowid"]);
assert_eq!(knn.schema().field_names(), ["_distance", "_rowid"]);
}

/// Test KNN index with refine factor
Expand All @@ -1459,7 +1462,7 @@ mod test {
///
/// Expected plan:
/// KNNIndex(vec) -> Take(vec) -> KNNFlat(vec, 10) -> Take(i) -> Filter(i)
/// -> take(s, vec) -> projection(s, vec, score)
/// -> take(s, vec) -> projection(s, vec, _distance)
#[tokio::test]
async fn test_knn_with_refine() {
let test_dir = tempdir().unwrap();
Expand All @@ -1482,14 +1485,14 @@ mod test {
.iter()
.map(|f| f.name())
.collect::<Vec<_>>(),
vec!["s", "vec", "score"]
vec!["s", "vec", "_distance"]
);

let take = &plan.children()[0];
let take = take.as_any().downcast_ref::<TakeExec>().unwrap();
assert_eq!(
take.schema().field_names(),
["score", "_rowid", "vec", "i", "s"]
["_distance", "_rowid", "vec", "i", "s"]
);
assert_eq!(
take.extra_schema
Expand All @@ -1504,12 +1507,15 @@ mod test {
assert!(filter.as_any().is::<FilterExec>());
assert_eq!(
filter.schema().field_names(),
["score", "_rowid", "vec", "i"]
["_distance", "_rowid", "vec", "i"]
);

let take = &filter.children()[0];
let take = take.as_any().downcast_ref::<TakeExec>().unwrap();
assert_eq!(take.schema().field_names(), ["score", "_rowid", "vec", "i"]);
assert_eq!(
take.schema().field_names(),
["_distance", "_rowid", "vec", "i"]
);
assert_eq!(
take.extra_schema
.fields
Expand All @@ -1525,7 +1531,7 @@ mod test {

let take = &flat.children()[0];
let take = take.as_any().downcast_ref::<TakeExec>().unwrap();
assert_eq!(take.schema().field_names(), ["score", "_rowid", "vec"]);
assert_eq!(take.schema().field_names(), ["_distance", "_rowid", "vec"]);
assert_eq!(
take.extra_schema
.fields
Expand All @@ -1537,7 +1543,7 @@ mod test {

let knn = &take.children()[0];
assert!(knn.as_any().is::<KNNIndexExec>());
assert_eq!(knn.schema().field_names(), ["score", "_rowid"]);
assert_eq!(knn.schema().field_names(), ["_distance", "_rowid"]);
}

#[tokio::test]
Expand Down
2 changes: 1 addition & 1 deletion rust/src/index/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ use crate::{
};
pub use traits::*;

pub(crate) const SCORE_COL: &str = "score";
pub(crate) const DIST_COL: &str = "_distance";
const INDEX_FILE_NAME: &str = "index.idx";

/// Query parameters for the vector indices
Expand Down
15 changes: 9 additions & 6 deletions rust/src/index/vector/diskann/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::{
index::{
vector::{
graph::{GraphReadParams, PersistedGraph},
SCORE_COL,
DIST_COL,
},
Index,
},
Expand Down Expand Up @@ -218,16 +218,16 @@ impl VectorIndex for DiskANNIndex {
let state = greedy_search(&self.graph, 0, query.key.values(), query.k, query.k * 2).await?;
let schema = Arc::new(Schema::new(vec![
Field::new(ROW_ID, DataType::UInt64, false),
Field::new(SCORE_COL, DataType::Float32, false),
Field::new(DIST_COL, DataType::Float32, false),
]));

let mut candidates = Vec::with_capacity(query.k);
for (score, row) in state.candidates {
for (distance, row) in state.candidates {
if candidates.len() == query.k {
break;
}
if !self.deletion_cache.as_ref().is_deleted(row as u64).await? {
candidates.push((score, row));
candidates.push((distance, row));
}
}

Expand All @@ -236,11 +236,14 @@ impl VectorIndex for DiskANNIndex {
.take(query.k)
.map(|(_, id)| *id as u64)
.collect();
let scores: Float32Array = candidates.iter().take(query.k).map(|(d, _)| **d).collect();
let distances: Float32Array = candidates.iter().take(query.k).map(|(d, _)| **d).collect();

let batch = RecordBatch::try_new(
schema,
vec![Arc::new(row_ids) as ArrayRef, Arc::new(scores) as ArrayRef],
vec![
Arc::new(row_ids) as ArrayRef,
Arc::new(distances) as ArrayRef,
],
)?;
Ok(batch)
}
Expand Down
24 changes: 13 additions & 11 deletions rust/src/index/vector/flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use arrow_select::{concat::concat_batches, take::take};
use futures::future;
use futures::stream::{repeat_with, Stream, StreamExt, TryStreamExt};

use super::{Query, SCORE_COL};
use super::{Query, DIST_COL};
use crate::arrow::*;
use crate::{Error, Result};

Expand All @@ -41,9 +41,9 @@ pub async fn flat_search(
.map(|(batch, mt)| async move {
let k = query.key.clone();
let mut batch = batch?;
if batch.column_by_name(SCORE_COL).is_some() {
// Ignore the score calculated from inner vector index.
batch = batch.drop_column(SCORE_COL)?;
if batch.column_by_name(DIST_COL).is_some() {
// Ignore the distance calculated from inner vector index.
batch = batch.drop_column(DIST_COL)?;
}
let vectors = batch
.column_by_name(&query.column)
Expand All @@ -52,7 +52,7 @@ pub async fn flat_search(
})?
.clone();
let flatten_vectors = as_fixed_size_list_array(vectors.as_ref()).values().clone();
let scores = tokio::task::spawn_blocking(move || {
let distances = tokio::task::spawn_blocking(move || {
mt.batch_func()(
k.values(),
as_primitive_array::<Float32Type>(flatten_vectors.as_ref()).values(),
Expand All @@ -62,19 +62,21 @@ pub async fn flat_search(
.await? as ArrayRef;

// TODO: use heap
let indices = sort_to_indices(&scores, None, Some(query.k))?;
let batch_with_score = batch
.try_with_column(ArrowField::new(SCORE_COL, DataType::Float32, false), scores)?;
let struct_arr = StructArray::from(batch_with_score);
let indices = sort_to_indices(&distances, None, Some(query.k))?;
let batch_with_distance = batch.try_with_column(
ArrowField::new(DIST_COL, DataType::Float32, false),
distances,
)?;
let struct_arr = StructArray::from(batch_with_distance);
let selected_arr = take(&struct_arr, &indices, None)?;
Ok::<RecordBatch, Error>(as_struct_array(&selected_arr).into())
})
.buffer_unordered(16)
.try_collect::<Vec<_>>()
.await?;
let batch = concat_batches(&batches[0].schema(), &batches)?;
let scores = batch.column_by_name(SCORE_COL).unwrap();
let indices = sort_to_indices(scores, None, Some(query.k))?;
let distances = batch.column_by_name(DIST_COL).unwrap();
let indices = sort_to_indices(distances, None, Some(query.k))?;

let struct_arr = StructArray::from(batch);
let selected_arr = take(&struct_arr, &indices, None)?;
Expand Down
13 changes: 8 additions & 5 deletions rust/src/index/vector/ivf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,16 +149,19 @@ impl VectorIndex for IVFIndex {
.await?;
let batch = concat_batches(&batches[0].schema(), &batches)?;

let score_col = batch.column_by_name("score").ok_or_else(|| Error::IO {
message: format!("score column does not exist in batch: {}", batch.schema()),
let dist_col = batch.column_by_name("_distance").ok_or_else(|| Error::IO {
message: format!(
"_distance column does not exist in batch: {}",
batch.schema()
),
})?;

// TODO: Use a heap sort to get the top-k.
let limit = query.k * query.refine_factor.unwrap_or(1) as usize;
let selection = sort_to_indices(score_col, None, Some(limit))?;
let selection = sort_to_indices(dist_col, None, Some(limit))?;
let struct_arr = StructArray::from(batch);
let taken_scores = take(&struct_arr, &selection, None)?;
Ok(as_struct_array(&taken_scores).into())
let taken_distances = take(&struct_arr, &selection, None)?;
Ok(as_struct_array(&taken_distances).into())
}

fn is_loadable(&self) -> bool {
Expand Down
Loading