Skip to content

Commit

Permalink
Rename "score" to "_distance" (#1055)
Browse files Browse the repository at this point in the history
* Renamed score col and hardcoded  instances

* Updated variable names and fixed python tests

---------

Co-authored-by: Will Jones <[email protected]>
  • Loading branch information
trueutkarsh and wjones127 authored Jul 28, 2023
1 parent 5331506 commit 6144dac
Show file tree
Hide file tree
Showing 11 changed files with 101 additions and 85 deletions.
12 changes: 7 additions & 5 deletions python/python/tests/test_lance.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ def test_nearest(tmp_path):
top10 = dataset.to_table(
nearest={"column": "emb", "q": arr[0].values, "k": 10, "nprobes": 10}
)
scores = l2sq(arr[0].values, npvals.reshape((100, 32)))
indices = np.argsort(scores)
distances = l2sq(arr[0].values, npvals.reshape((100, 32)))
indices = np.argsort(distances)
assert tbl.take(indices[:10]).to_pandas().equals(top10.to_pandas()[["emb"]])
assert np.allclose(scores[indices[:10]], top10.to_pandas().score.values)
assert np.allclose(distances[indices[:10]], top10.to_pandas()["_distance"].values)


def l2sq(vec, mat):
Expand All @@ -114,8 +114,10 @@ def test_nearest_cosine(tmp_path):
nearest={"column": "vector", "q": q, "k": 10, "metric": "cosine"}
).to_pandas()
for i in range(len(rs)):
assert rs.score[i] == pytest.approx(cosine_distance(rs.vector[i], q), abs=1e-6)
assert 0 <= rs.score[i] <= 1
assert rs["_distance"][i] == pytest.approx(
cosine_distance(rs.vector[i], q), abs=1e-6
)
assert 0 <= rs["_distance"][i] <= 1


def cosine_distance(vec1, vec2):
Expand Down
6 changes: 3 additions & 3 deletions python/python/tests/test_vector_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def run(ds, q=None, assert_func=None):
expected_columns.extend(ds.schema.names)
else:
expected_columns.extend(columns)
for c in ["vector", "score"]:
for c in ["vector", "_distance"]:
if c not in expected_columns:
expected_columns.append(c)

Expand All @@ -96,8 +96,8 @@ def run(ds, q=None, assert_func=None):
assert len(inmem.to_table(filter=filter_)) == len(rs)
else:
assert len(rs) == 15
scores = rs["score"].to_numpy()
assert (scores.max() - scores.min()) > 1e-6
distances = rs["_distance"].to_numpy()
assert (distances.max() - distances.min()) > 1e-6
if assert_func is not None:
assert_func(rs)
return times
Expand Down
52 changes: 29 additions & 23 deletions rust/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ impl Scanner {
self
}

/// The Arrow schema of the output, including projections and vector / score
/// The Arrow schema of the output, including projections and vector / _distance
pub fn schema(&self) -> Result<SchemaRef> {
let schema = self
.output_schema()
Expand All @@ -336,7 +336,7 @@ impl Scanner {
message: format!("Failed to convert vector field: {}", e),
})?;
extra_columns.push(vector_field);
extra_columns.push(ArrowField::new("score", DataType::Float32, false));
extra_columns.push(ArrowField::new("_distance", DataType::Float32, false));
};
if self.with_row_id {
extra_columns.push(ArrowField::new(ROW_ID, DataType::UInt64, false));
Expand Down Expand Up @@ -488,14 +488,14 @@ impl Scanner {
}
}

let knn_node = self.ann(q, index)?; // score, _rowid
let knn_node = self.ann(q, index)?; // _distance, _rowid
let with_vector = self.dataset.schema().project(&[&q.column])?;
let knn_node_with_vector = self.take(knn_node, &with_vector)?;
let mut knn_node = if q.refine_factor.is_some() {
self.flat_knn(knn_node_with_vector, q)?
} else {
knn_node_with_vector
}; // vector, score, _rowid
}; // vector, _distance, _rowid

knn_node = self.knn_combined(&q, index, knn_node).await?;

Expand Down Expand Up @@ -545,8 +545,8 @@ impl Scanner {
let topk_appended = self.flat_knn(scan_node, q)?;

// To do a union, we need to make the schemas match. Right now
// knn_node: score, _rowid, vector
// topk_appended: vector, _rowid, score
// knn_node: _distance, _rowid, vector
// topk_appended: vector, _rowid, _distance
let new_schema = Schema::try_from(&topk_appended.schema().project(&[2, 1, 0])?)?;
let topk_appended = ProjectionExec::try_new(topk_appended, Arc::new(new_schema))?;
assert_eq!(topk_appended.schema(), knn_node.schema());
Expand Down Expand Up @@ -948,7 +948,7 @@ mod test {
),
true,
),
ArrowField::new("score", DataType::Float32, false),
ArrowField::new("_distance", DataType::Float32, false),
])
);

Expand Down Expand Up @@ -1107,7 +1107,7 @@ mod test {
),
true,
),
ArrowField::new("score", DataType::Float32, false),
ArrowField::new("_distance", DataType::Float32, false),
])
);

Expand Down Expand Up @@ -1157,7 +1157,7 @@ mod test {
),
true,
),
ArrowField::new("score", DataType::Float32, false),
ArrowField::new("_distance", DataType::Float32, false),
])
);

Expand Down Expand Up @@ -1376,7 +1376,7 @@ mod test {
/// Query: nearest(vec, [...], 10) + filter(i > 10 and i < 20)
///
/// Expected plan:
/// KNNIndex(vec) -> Take(i) -> filter(i) -> take(s, vec) -> projection(s, vec, score)
/// KNNIndex(vec) -> Take(i) -> filter(i) -> take(s, vec) -> projection(s, vec, _distance)
#[tokio::test]
async fn test_ann_with_index() {
let test_dir = tempdir().unwrap();
Expand All @@ -1398,14 +1398,14 @@ mod test {
.iter()
.map(|f| f.name())
.collect::<Vec<_>>(),
vec!["s", "vec", "score"]
vec!["s", "vec", "_distance"]
);

let take = &plan.children()[0];
let take = take.as_any().downcast_ref::<TakeExec>().unwrap();
assert_eq!(
take.schema().field_names(),
["score", "_rowid", "vec", "i", "s"]
["_distance", "_rowid", "vec", "i", "s"]
);
assert_eq!(
take.extra_schema
Expand All @@ -1420,12 +1420,15 @@ mod test {
assert!(filter.as_any().is::<FilterExec>());
assert_eq!(
filter.schema().field_names(),
["score", "_rowid", "vec", "i"]
["_distance", "_rowid", "vec", "i"]
);

let take = &filter.children()[0];
let take = take.as_any().downcast_ref::<TakeExec>().unwrap();
assert_eq!(take.schema().field_names(), ["score", "_rowid", "vec", "i"]);
assert_eq!(
take.schema().field_names(),
["_distance", "_rowid", "vec", "i"]
);
assert_eq!(
take.extra_schema
.fields
Expand All @@ -1438,7 +1441,7 @@ mod test {
// TODO: Two continuous take execs, we can merge them into one.
let take = &take.children()[0];
let take = take.as_any().downcast_ref::<TakeExec>().unwrap();
assert_eq!(take.schema().field_names(), ["score", "_rowid", "vec"]);
assert_eq!(take.schema().field_names(), ["_distance", "_rowid", "vec"]);
assert_eq!(
take.extra_schema
.fields
Expand All @@ -1450,7 +1453,7 @@ mod test {

let knn = &take.children()[0];
assert!(knn.as_any().is::<KNNIndexExec>());
assert_eq!(knn.schema().field_names(), ["score", "_rowid"]);
assert_eq!(knn.schema().field_names(), ["_distance", "_rowid"]);
}

/// Test KNN index with refine factor
Expand All @@ -1459,7 +1462,7 @@ mod test {
///
/// Expected plan:
/// KNNIndex(vec) -> Take(vec) -> KNNFlat(vec, 10) -> Take(i) -> Filter(i)
/// -> take(s, vec) -> projection(s, vec, score)
/// -> take(s, vec) -> projection(s, vec, _distance)
#[tokio::test]
async fn test_knn_with_refine() {
let test_dir = tempdir().unwrap();
Expand All @@ -1482,14 +1485,14 @@ mod test {
.iter()
.map(|f| f.name())
.collect::<Vec<_>>(),
vec!["s", "vec", "score"]
vec!["s", "vec", "_distance"]
);

let take = &plan.children()[0];
let take = take.as_any().downcast_ref::<TakeExec>().unwrap();
assert_eq!(
take.schema().field_names(),
["score", "_rowid", "vec", "i", "s"]
["_distance", "_rowid", "vec", "i", "s"]
);
assert_eq!(
take.extra_schema
Expand All @@ -1504,12 +1507,15 @@ mod test {
assert!(filter.as_any().is::<FilterExec>());
assert_eq!(
filter.schema().field_names(),
["score", "_rowid", "vec", "i"]
["_distance", "_rowid", "vec", "i"]
);

let take = &filter.children()[0];
let take = take.as_any().downcast_ref::<TakeExec>().unwrap();
assert_eq!(take.schema().field_names(), ["score", "_rowid", "vec", "i"]);
assert_eq!(
take.schema().field_names(),
["_distance", "_rowid", "vec", "i"]
);
assert_eq!(
take.extra_schema
.fields
Expand All @@ -1525,7 +1531,7 @@ mod test {

let take = &flat.children()[0];
let take = take.as_any().downcast_ref::<TakeExec>().unwrap();
assert_eq!(take.schema().field_names(), ["score", "_rowid", "vec"]);
assert_eq!(take.schema().field_names(), ["_distance", "_rowid", "vec"]);
assert_eq!(
take.extra_schema
.fields
Expand All @@ -1537,7 +1543,7 @@ mod test {

let knn = &take.children()[0];
assert!(knn.as_any().is::<KNNIndexExec>());
assert_eq!(knn.schema().field_names(), ["score", "_rowid"]);
assert_eq!(knn.schema().field_names(), ["_distance", "_rowid"]);
}

#[tokio::test]
Expand Down
2 changes: 1 addition & 1 deletion rust/src/index/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ use crate::{
};
pub use traits::*;

pub(crate) const SCORE_COL: &str = "score";
pub(crate) const DIST_COL: &str = "_distance";
const INDEX_FILE_NAME: &str = "index.idx";

/// Query parameters for the vector indices
Expand Down
15 changes: 9 additions & 6 deletions rust/src/index/vector/diskann/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::{
index::{
vector::{
graph::{GraphReadParams, PersistedGraph},
SCORE_COL,
DIST_COL,
},
Index,
},
Expand Down Expand Up @@ -218,16 +218,16 @@ impl VectorIndex for DiskANNIndex {
let state = greedy_search(&self.graph, 0, query.key.values(), query.k, query.k * 2).await?;
let schema = Arc::new(Schema::new(vec![
Field::new(ROW_ID, DataType::UInt64, false),
Field::new(SCORE_COL, DataType::Float32, false),
Field::new(DIST_COL, DataType::Float32, false),
]));

let mut candidates = Vec::with_capacity(query.k);
for (score, row) in state.candidates {
for (distance, row) in state.candidates {
if candidates.len() == query.k {
break;
}
if !self.deletion_cache.as_ref().is_deleted(row as u64).await? {
candidates.push((score, row));
candidates.push((distance, row));
}
}

Expand All @@ -236,11 +236,14 @@ impl VectorIndex for DiskANNIndex {
.take(query.k)
.map(|(_, id)| *id as u64)
.collect();
let scores: Float32Array = candidates.iter().take(query.k).map(|(d, _)| **d).collect();
let distances: Float32Array = candidates.iter().take(query.k).map(|(d, _)| **d).collect();

let batch = RecordBatch::try_new(
schema,
vec![Arc::new(row_ids) as ArrayRef, Arc::new(scores) as ArrayRef],
vec![
Arc::new(row_ids) as ArrayRef,
Arc::new(distances) as ArrayRef,
],
)?;
Ok(batch)
}
Expand Down
24 changes: 13 additions & 11 deletions rust/src/index/vector/flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use arrow_select::{concat::concat_batches, take::take};
use futures::future;
use futures::stream::{repeat_with, Stream, StreamExt, TryStreamExt};

use super::{Query, SCORE_COL};
use super::{Query, DIST_COL};
use crate::arrow::*;
use crate::{Error, Result};

Expand All @@ -41,9 +41,9 @@ pub async fn flat_search(
.map(|(batch, mt)| async move {
let k = query.key.clone();
let mut batch = batch?;
if batch.column_by_name(SCORE_COL).is_some() {
// Ignore the score calculated from inner vector index.
batch = batch.drop_column(SCORE_COL)?;
if batch.column_by_name(DIST_COL).is_some() {
// Ignore the distance calculated from inner vector index.
batch = batch.drop_column(DIST_COL)?;
}
let vectors = batch
.column_by_name(&query.column)
Expand All @@ -52,7 +52,7 @@ pub async fn flat_search(
})?
.clone();
let flatten_vectors = as_fixed_size_list_array(vectors.as_ref()).values().clone();
let scores = tokio::task::spawn_blocking(move || {
let distances = tokio::task::spawn_blocking(move || {
mt.batch_func()(
k.values(),
as_primitive_array::<Float32Type>(flatten_vectors.as_ref()).values(),
Expand All @@ -62,19 +62,21 @@ pub async fn flat_search(
.await? as ArrayRef;

// TODO: use heap
let indices = sort_to_indices(&scores, None, Some(query.k))?;
let batch_with_score = batch
.try_with_column(ArrowField::new(SCORE_COL, DataType::Float32, false), scores)?;
let struct_arr = StructArray::from(batch_with_score);
let indices = sort_to_indices(&distances, None, Some(query.k))?;
let batch_with_distance = batch.try_with_column(
ArrowField::new(DIST_COL, DataType::Float32, false),
distances,
)?;
let struct_arr = StructArray::from(batch_with_distance);
let selected_arr = take(&struct_arr, &indices, None)?;
Ok::<RecordBatch, Error>(as_struct_array(&selected_arr).into())
})
.buffer_unordered(16)
.try_collect::<Vec<_>>()
.await?;
let batch = concat_batches(&batches[0].schema(), &batches)?;
let scores = batch.column_by_name(SCORE_COL).unwrap();
let indices = sort_to_indices(scores, None, Some(query.k))?;
let distances = batch.column_by_name(DIST_COL).unwrap();
let indices = sort_to_indices(distances, None, Some(query.k))?;

let struct_arr = StructArray::from(batch);
let selected_arr = take(&struct_arr, &indices, None)?;
Expand Down
13 changes: 8 additions & 5 deletions rust/src/index/vector/ivf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,16 +149,19 @@ impl VectorIndex for IVFIndex {
.await?;
let batch = concat_batches(&batches[0].schema(), &batches)?;

let score_col = batch.column_by_name("score").ok_or_else(|| Error::IO {
message: format!("score column does not exist in batch: {}", batch.schema()),
let dist_col = batch.column_by_name("_distance").ok_or_else(|| Error::IO {
message: format!(
"_distance column does not exist in batch: {}",
batch.schema()
),
})?;

// TODO: Use a heap sort to get the top-k.
let limit = query.k * query.refine_factor.unwrap_or(1) as usize;
let selection = sort_to_indices(score_col, None, Some(limit))?;
let selection = sort_to_indices(dist_col, None, Some(limit))?;
let struct_arr = StructArray::from(batch);
let taken_scores = take(&struct_arr, &selection, None)?;
Ok(as_struct_array(&taken_scores).into())
let taken_distances = take(&struct_arr, &selection, None)?;
Ok(as_struct_array(&taken_distances).into())
}

fn is_loadable(&self) -> bool {
Expand Down
Loading

0 comments on commit 6144dac

Please sign in to comment.