Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Index tags #10441

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ path = "bin/elasticsearch/backfill_index.rs"
name = "elasticsearch_backfill_folders_index"
path = "bin/elasticsearch/backfill_folders_index.rs"

[[bin]]
name = "elasticsearch_backfill_document_tags_index"
path = "bin/elasticsearch/backfill_document_tags_index.rs"

[[bin]]
name = "qdrant_create_collection"
path = "bin/qdrant/create_collection.rs"
Expand Down
56 changes: 55 additions & 1 deletion core/bin/core_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use dust::{
run,
search_filter::{Filterable, SearchFilter},
search_stores::search_store::{
ElasticsearchSearchStore, NodesSearchFilter, NodesSearchOptions, SearchStore,
ElasticsearchSearchStore, NodesSearchFilter, NodesSearchOptions, SearchStore, TagsQueryType,
},
sqlite_workers::client::{self, HEARTBEAT_INTERVAL_MS},
stores::{
Expand Down Expand Up @@ -3265,6 +3265,59 @@ async fn nodes_search(
)
}

#[derive(serde::Deserialize)]
#[serde(deny_unknown_fields)]
struct TagsSearchPayload {
query: Option<String>,
query_type: Option<TagsQueryType>,
data_sources: Option<Vec<String>>,
node_ids: Option<Vec<String>>,
limit: Option<u64>,
}

async fn tags_search(
State(state): State<Arc<APIState>>,
Json(payload): Json<TagsSearchPayload>,
) -> (StatusCode, Json<APIResponse>) {
match state
.search_store
.search_tags(
payload.query,
payload.query_type,
payload.data_sources,
payload.node_ids,
payload.limit,
)
.await
{
Ok((tags, total)) => (
StatusCode::OK,
Json(APIResponse {
error: None,
response: Some(json!({
"tags": tags
.into_iter()
.map(|(k, v, ds)| json!({
"tag": k,
"match_count": v,
"data_sources": ds.into_iter()
.map(|(k, _v)| k)
.collect::<Vec<_>>()
}))
.collect::<Vec<serde_json::Value>>(),
"total_nodes": total,
})),
}),
),
Err(e) => error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"internal_server_error",
"Failed to list tags",
Some(e),
),
}
}

#[derive(serde::Deserialize)]
struct DatabaseQueryRunPayload {
query: String,
Expand Down Expand Up @@ -3753,6 +3806,7 @@ fn main() {

//Search
.route("/nodes/search", post(nodes_search))
.route("/tags/search", post(tags_search))

// Misc
.route("/tokenize", post(tokenize))
Expand Down
231 changes: 231 additions & 0 deletions core/bin/elasticsearch/backfill_document_tags_index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
use bb8::Pool;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scripts in bin/ are meant to be frequently reused; temporary backfill scripts should go in migrations

does this one really belong here in your opinion?

if it does, could it become part of backfill_nodes? it'd be better if there are not too many different scripts there

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's temporary, i'll move it to migrations

use bb8_postgres::PostgresConnectionManager;
use clap::Parser;
use dust::{
search_stores::search_store::ElasticsearchSearchStore,
stores::{postgres::PostgresStore, store::Store},
};
use elasticsearch::{http::request::JsonBody, indices::IndicesExistsParts, BulkParts};
use http::StatusCode;
use serde_json::json;
use tokio_postgres::NoTls;

#[derive(Clone, Copy, Debug, clap::ValueEnum)]
enum NodeType {
Document,
Table,
}

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
#[arg(long, help = "The version of the index")]
index_version: u32,

#[arg(long, help = "Skip confirmation")]
skip_confirmation: bool,

#[arg(long, help = "The cursor to start from", default_value = "0")]
start_cursor: i64,

#[arg(long, help = "The batch size", default_value = "100")]
batch_size: usize,

#[arg(long, help = "The type of query to run", default_value = "document")]
query_type: NodeType,
}

/*
* Backfills tags for documents in Elasticsearch using the postgres table `data_sources_documents` and `tables`
*
* Usage:
* cargo run --bin elasticsearch_backfill_document_tags_index -- --index-version <version> [--skip-confirmation] [--start-cursor <cursor>] [--batch-size <batch_size>]
*
*/
#[tokio::main]
async fn main() {
if let Err(e) = run().await {
eprintln!("Error: {}", e);
std::process::exit(1);
}
}

async fn list_data_source_documents(
pool: &Pool<PostgresConnectionManager<NoTls>>,
id_cursor: i64,
batch_size: i64,
query_type: NodeType,
) -> Result<Vec<(i64, String, Vec<String>, String, String)>, Box<dyn std::error::Error>> {
let c = pool.get().await?;

let q = match query_type {
NodeType::Document => {
"SELECT dsd.id,dsd.document_id, dsd.tags_array, ds.data_source_id, ds.internal_id \
FROM data_sources_documents dsd JOIN data_sources ds ON dsd.data_source = ds.id \
WHERE dsd.id > $1 ORDER BY dsd.id ASC LIMIT $2"
}
NodeType::Table => {
"SELECT t.id,t.table_id, t.tags_array, ds.data_source_id, ds.internal_id \
FROM tables t JOIN data_sources ds ON t.data_source = ds.id \
WHERE t.id > $1 ORDER BY t.id ASC LIMIT $2"
}
};

let stmt = c.prepare(q).await?;
let rows = c.query(&stmt, &[&id_cursor, &batch_size]).await?;

let nodes: Vec<(i64, String, Vec<String>, String, String)> = rows
.iter()
.map(|row| {
let id: i64 = row.get::<_, i64>(0);
let document_id: String = row.get::<_, String>(1);
let tags: Vec<String> = row.get::<_, Vec<String>>(2);
let ds_id: String = row.get::<_, String>(3);
let ds_internal_id: String = row.get::<_, String>(4);
(id, document_id, tags, ds_id, ds_internal_id)
})
.collect::<Vec<_>>();
Ok(nodes)
}

async fn run() -> Result<(), Box<dyn std::error::Error>> {
// parse args and env vars
let args = Args::parse();
let index_name = "data_sources_nodes";
let index_version = args.index_version;
let batch_size = args.batch_size;
let start_cursor = args.start_cursor;
let query_type = args.query_type;

let url = std::env::var("ELASTICSEARCH_URL").expect("ELASTICSEARCH_URL must be set");
let username =
std::env::var("ELASTICSEARCH_USERNAME").expect("ELASTICSEARCH_USERNAME must be set");
let password =
std::env::var("ELASTICSEARCH_PASSWORD").expect("ELASTICSEARCH_PASSWORD must be set");

let region = std::env::var("DUST_REGION").expect("DUST_REGION must be set");

// create ES client
let search_store = ElasticsearchSearchStore::new(&url, &username, &password).await?;

let index_fullname = format!("core.{}_{}", index_name, index_version);

// err if index does not exist
let response = search_store
.client
.indices()
.exists(IndicesExistsParts::Index(&[index_fullname.as_str()]))
.send()
.await?;

if response.status_code() != StatusCode::OK {
return Err(anyhow::anyhow!("Index does not exist").into());
}

if !args.skip_confirmation {
println!(
"Are you sure you want to backfill the index {} in region {}? (y/N)",
index_fullname, region
);
let mut input = String::new();
std::io::stdin().read_line(&mut input).unwrap();
if input.trim() != "y" {
return Err(anyhow::anyhow!("Aborted").into());
}
}

let db_uri = std::env::var("CORE_DATABASE_READ_REPLICA_URI")
.expect("CORE_DATABASE_READ_REPLICA_URI must be set");
let store = PostgresStore::new(&db_uri).await?;
// loop on all nodes in postgres using id as cursor, stopping when id is
// greated than the last id in data_sources_nodes at start of backfill
let mut next_cursor = start_cursor;

// grab last id in data_sources_nodes
let pool = store.raw_pool();
let c = pool.get().await?;
let last_id = c
.query_one("SELECT MAX(id) FROM data_sources_documents", &[])
.await?;
let last_id: i64 = last_id.get(0);
println!("Last id in data_sources_nodes: {}", last_id);
while next_cursor <= last_id {
println!(
"Processing {} nodes, starting at id {}. ",
batch_size, next_cursor
);
let (nodes, next_id_cursor) =
get_node_batch(pool, next_cursor, batch_size, query_type).await?;

next_cursor = match next_id_cursor {
Some(cursor) => cursor,
None => {
println!(
"No more nodes to process (last id: {}). \nBackfill complete.",
last_id
);
break;
}
};

let nodes_values: Vec<_> = nodes
.into_iter()
.filter(|node| node.2.len() > 0)
.flat_map(|node| {
[
json!({"update": {"_id": format!("{}__{}", node.4, node.1) }}),
json!({"doc": {"tags": node.2}}),
]
})
.collect();

let nodes_body: Vec<JsonBody<_>> = nodes_values.into_iter().map(|v| v.into()).collect();

search_store
.client
.bulk(BulkParts::Index(index_fullname.as_str()))
.body(nodes_body)
.send()
.await?;
match response.status_code() {
StatusCode::OK => println!("Succeeded."),
_ => {
let body = response.json::<serde_json::Value>().await?;
eprintln!("\n{:?}", body);
return Err(anyhow::anyhow!("Failed to insert nodes").into());
}
}
}

Ok(())
}

async fn get_node_batch(
pool: &Pool<PostgresConnectionManager<NoTls>>,
next_cursor: i64,
batch_size: usize,
query_type: NodeType,
) -> Result<
(Vec<(i64, String, Vec<String>, String, String)>, Option<i64>),
Box<dyn std::error::Error>,
> {
let nodes = list_data_source_documents(
&pool,
next_cursor,
batch_size.try_into().unwrap(),
query_type,
)
.await?;
let last_node = nodes.last().cloned();
let nodes_length = nodes.len();
match last_node {
Some((last_row_id, _, _, _, _)) => Ok((
nodes,
match nodes_length == batch_size {
true => Some(last_row_id),
false => None,
},
)),
None => Ok((vec![], None)),
}
}
1 change: 1 addition & 0 deletions core/bin/elasticsearch/backfill_folders_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ async fn list_data_source_nodes(
parents.get(1).cloned(),
parents,
source_url,
vec![],
),
row_id,
element_row_id,
Expand Down
1 change: 1 addition & 0 deletions core/src/data_sources/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ impl From<Document> for Node {
document.parent_id,
document.parents.clone(),
document.source_url,
document.tags,
)
}
}
Expand Down
1 change: 1 addition & 0 deletions core/src/data_sources/folder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ impl From<Folder> for Node {
folder.parent_id,
folder.parents,
folder.source_url,
vec![],
)
}
}
3 changes: 3 additions & 0 deletions core/src/data_sources/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ pub struct Node {
pub parent_id: Option<String>,
pub parents: Vec<String>,
pub source_url: Option<String>,
pub tags: Vec<String>,
}

impl Node {
Expand All @@ -104,6 +105,7 @@ impl Node {
parent_id: Option<String>,
parents: Vec<String>,
source_url: Option<String>,
tags: Vec<String>,
) -> Self {
Node {
data_source_id: data_source_id.to_string(),
Expand All @@ -117,6 +119,7 @@ impl Node {
parent_id: parent_id.clone(),
parents,
source_url,
tags,
}
}

Expand Down
1 change: 1 addition & 0 deletions core/src/databases/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ impl From<Table> for Node {
table.parents.get(1).cloned(),
table.parents,
table.source_url,
table.tags,
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,19 @@
"provider_visibility": {
"type": "keyword",
"index": false
},
"tags": {
"type": "text",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we'd need to understand if that's the right way of indexing
Will you never sort by tags, will you sometimes need exact tag match, will you sometimes filter rather than rank, etc.
Is there doc somewhere on the type of query that should be supported long term?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see there's exact match (so "keyword") and prefix; for prefix I think keyword is fine too although not sure
I see in the design doc there are questions about fuzzy search. Was this sorted out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to @flvndvd keywords cannot handle prefix directly, we need ngram (so edge) for that. Question about fuzzy search is not clear to me, I don't see the point of fuzzy searching on tags, but I've added the mapping in case we need it.

"analyzer": "standard",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd encourage to play a bit with the /_analyze endpoint to understand how indexing influences the search (doc). You can play locally!

Just want to reminder that to unlock the real power of Elasticsearch, time should be taken at indexing time, to index in a predictable way to ensure that searching if very efficient!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, I'll check the behaviour of the queries !

"fields": {
"edge": {
"type": "text",
"analyzer": "edge_analyzer"
},
"keyword": {
"type": "keyword"
}
}
}
}
}
}
Loading