-
Notifications
You must be signed in to change notification settings - Fork 116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Index tags #10441
base: main
Are you sure you want to change the base?
[core] Index tags #10441
Changes from 15 commits
3217ee8
bb17d9f
0ecec60
f6ec25b
4cc5312
de779e1
f98d1c1
ad808ea
4b1be82
2b0bf70
68a7227
cbeba8b
e261dcc
0bf78ab
f56a48b
5532cb8
d470868
dcd13ea
23a4967
a196aef
b61d983
32d86e5
61b1c38
8d49c80
3841c1f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,231 @@ | ||
use bb8::Pool; | ||
use bb8_postgres::PostgresConnectionManager; | ||
use clap::Parser; | ||
use dust::{ | ||
search_stores::search_store::ElasticsearchSearchStore, | ||
stores::{postgres::PostgresStore, store::Store}, | ||
}; | ||
use elasticsearch::{http::request::JsonBody, indices::IndicesExistsParts, BulkParts}; | ||
use http::StatusCode; | ||
use serde_json::json; | ||
use tokio_postgres::NoTls; | ||
|
||
#[derive(Clone, Copy, Debug, clap::ValueEnum)] | ||
enum NodeType { | ||
Document, | ||
Table, | ||
} | ||
|
||
#[derive(Parser, Debug)] | ||
#[command(author, version, about, long_about = None)] | ||
struct Args { | ||
#[arg(long, help = "The version of the index")] | ||
index_version: u32, | ||
|
||
#[arg(long, help = "Skip confirmation")] | ||
skip_confirmation: bool, | ||
|
||
#[arg(long, help = "The cursor to start from", default_value = "0")] | ||
start_cursor: i64, | ||
|
||
#[arg(long, help = "The batch size", default_value = "100")] | ||
batch_size: usize, | ||
|
||
#[arg(long, help = "The type of query to run", default_value = "document")] | ||
query_type: NodeType, | ||
} | ||
|
||
/* | ||
* Backfills tags for documents in Elasticsearch using the postgres table `data_sources_documents` and `tables` | ||
* | ||
* Usage: | ||
* cargo run --bin elasticsearch_backfill_document_tags_index -- --index-version <version> [--skip-confirmation] [--start-cursor <cursor>] [--batch-size <batch_size>] | ||
* | ||
*/ | ||
#[tokio::main] | ||
async fn main() { | ||
if let Err(e) = run().await { | ||
eprintln!("Error: {}", e); | ||
std::process::exit(1); | ||
} | ||
} | ||
|
||
async fn list_data_source_documents( | ||
pool: &Pool<PostgresConnectionManager<NoTls>>, | ||
id_cursor: i64, | ||
batch_size: i64, | ||
query_type: NodeType, | ||
) -> Result<Vec<(i64, String, Vec<String>, String, String)>, Box<dyn std::error::Error>> { | ||
let c = pool.get().await?; | ||
|
||
let q = match query_type { | ||
NodeType::Document => { | ||
"SELECT dsd.id,dsd.document_id, dsd.tags_array, ds.data_source_id, ds.internal_id \ | ||
FROM data_sources_documents dsd JOIN data_sources ds ON dsd.data_source = ds.id \ | ||
WHERE dsd.id > $1 ORDER BY dsd.id ASC LIMIT $2" | ||
} | ||
NodeType::Table => { | ||
"SELECT t.id,t.table_id, t.tags_array, ds.data_source_id, ds.internal_id \ | ||
FROM tables t JOIN data_sources ds ON t.data_source = ds.id \ | ||
WHERE t.id > $1 ORDER BY t.id ASC LIMIT $2" | ||
} | ||
}; | ||
|
||
let stmt = c.prepare(q).await?; | ||
let rows = c.query(&stmt, &[&id_cursor, &batch_size]).await?; | ||
|
||
let nodes: Vec<(i64, String, Vec<String>, String, String)> = rows | ||
.iter() | ||
.map(|row| { | ||
let id: i64 = row.get::<_, i64>(0); | ||
let document_id: String = row.get::<_, String>(1); | ||
let tags: Vec<String> = row.get::<_, Vec<String>>(2); | ||
let ds_id: String = row.get::<_, String>(3); | ||
let ds_internal_id: String = row.get::<_, String>(4); | ||
(id, document_id, tags, ds_id, ds_internal_id) | ||
}) | ||
.collect::<Vec<_>>(); | ||
Ok(nodes) | ||
} | ||
|
||
async fn run() -> Result<(), Box<dyn std::error::Error>> { | ||
// parse args and env vars | ||
let args = Args::parse(); | ||
let index_name = "data_sources_nodes"; | ||
let index_version = args.index_version; | ||
let batch_size = args.batch_size; | ||
let start_cursor = args.start_cursor; | ||
let query_type = args.query_type; | ||
|
||
let url = std::env::var("ELASTICSEARCH_URL").expect("ELASTICSEARCH_URL must be set"); | ||
let username = | ||
std::env::var("ELASTICSEARCH_USERNAME").expect("ELASTICSEARCH_USERNAME must be set"); | ||
let password = | ||
std::env::var("ELASTICSEARCH_PASSWORD").expect("ELASTICSEARCH_PASSWORD must be set"); | ||
|
||
let region = std::env::var("DUST_REGION").expect("DUST_REGION must be set"); | ||
|
||
// create ES client | ||
let search_store = ElasticsearchSearchStore::new(&url, &username, &password).await?; | ||
|
||
let index_fullname = format!("core.{}_{}", index_name, index_version); | ||
|
||
// err if index does not exist | ||
let response = search_store | ||
.client | ||
.indices() | ||
.exists(IndicesExistsParts::Index(&[index_fullname.as_str()])) | ||
.send() | ||
.await?; | ||
|
||
if response.status_code() != StatusCode::OK { | ||
return Err(anyhow::anyhow!("Index does not exist").into()); | ||
} | ||
|
||
if !args.skip_confirmation { | ||
println!( | ||
"Are you sure you want to backfill the index {} in region {}? (y/N)", | ||
index_fullname, region | ||
); | ||
let mut input = String::new(); | ||
std::io::stdin().read_line(&mut input).unwrap(); | ||
if input.trim() != "y" { | ||
return Err(anyhow::anyhow!("Aborted").into()); | ||
} | ||
} | ||
|
||
let db_uri = std::env::var("CORE_DATABASE_READ_REPLICA_URI") | ||
.expect("CORE_DATABASE_READ_REPLICA_URI must be set"); | ||
let store = PostgresStore::new(&db_uri).await?; | ||
// loop on all nodes in postgres using id as cursor, stopping when id is | ||
// greated than the last id in data_sources_nodes at start of backfill | ||
let mut next_cursor = start_cursor; | ||
|
||
// grab last id in data_sources_nodes | ||
let pool = store.raw_pool(); | ||
let c = pool.get().await?; | ||
let last_id = c | ||
.query_one("SELECT MAX(id) FROM data_sources_documents", &[]) | ||
.await?; | ||
let last_id: i64 = last_id.get(0); | ||
println!("Last id in data_sources_nodes: {}", last_id); | ||
while next_cursor <= last_id { | ||
println!( | ||
"Processing {} nodes, starting at id {}. ", | ||
batch_size, next_cursor | ||
); | ||
let (nodes, next_id_cursor) = | ||
get_node_batch(pool, next_cursor, batch_size, query_type).await?; | ||
|
||
next_cursor = match next_id_cursor { | ||
Some(cursor) => cursor, | ||
None => { | ||
println!( | ||
"No more nodes to process (last id: {}). \nBackfill complete.", | ||
last_id | ||
); | ||
break; | ||
} | ||
}; | ||
|
||
let nodes_values: Vec<_> = nodes | ||
.into_iter() | ||
.filter(|node| node.2.len() > 0) | ||
.flat_map(|node| { | ||
[ | ||
json!({"update": {"_id": format!("{}__{}", node.4, node.1) }}), | ||
json!({"doc": {"tags": node.2}}), | ||
] | ||
}) | ||
.collect(); | ||
|
||
let nodes_body: Vec<JsonBody<_>> = nodes_values.into_iter().map(|v| v.into()).collect(); | ||
|
||
search_store | ||
.client | ||
.bulk(BulkParts::Index(index_fullname.as_str())) | ||
.body(nodes_body) | ||
.send() | ||
.await?; | ||
match response.status_code() { | ||
StatusCode::OK => println!("Succeeded."), | ||
_ => { | ||
let body = response.json::<serde_json::Value>().await?; | ||
eprintln!("\n{:?}", body); | ||
return Err(anyhow::anyhow!("Failed to insert nodes").into()); | ||
} | ||
} | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
async fn get_node_batch( | ||
pool: &Pool<PostgresConnectionManager<NoTls>>, | ||
next_cursor: i64, | ||
batch_size: usize, | ||
query_type: NodeType, | ||
) -> Result< | ||
(Vec<(i64, String, Vec<String>, String, String)>, Option<i64>), | ||
Box<dyn std::error::Error>, | ||
> { | ||
let nodes = list_data_source_documents( | ||
&pool, | ||
next_cursor, | ||
batch_size.try_into().unwrap(), | ||
query_type, | ||
) | ||
.await?; | ||
let last_node = nodes.last().cloned(); | ||
let nodes_length = nodes.len(); | ||
match last_node { | ||
Some((last_row_id, _, _, _, _)) => Ok(( | ||
nodes, | ||
match nodes_length == batch_size { | ||
true => Some(last_row_id), | ||
false => None, | ||
}, | ||
)), | ||
None => Ok((vec![], None)), | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -89,6 +89,7 @@ impl From<Folder> for Node { | |
folder.parent_id, | ||
folder.parents, | ||
folder.source_url, | ||
vec![], | ||
) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -45,6 +45,19 @@ | |
"provider_visibility": { | ||
"type": "keyword", | ||
"index": false | ||
}, | ||
"tags": { | ||
"type": "text", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here we'd need to understand if that's the right way of indexing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see there's exact match (so "keyword") and prefix; for prefix I think keyword is fine too although not sure There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. according to @flvndvd keywords cannot handle prefix directly, we need ngram (so edge) for that. Question about fuzzy search is not clear to me, I don't see the point of fuzzy searching on tags, but I've added the mapping in case we need it. |
||
"analyzer": "standard", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd encourage to play a bit with the Just want to reminder that to unlock the real power of Elasticsearch, time should be taken at indexing time, to index in a predictable way to ensure that searching if very efficient! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks, I'll check the behaviour of the queries ! |
||
"fields": { | ||
"edge": { | ||
"type": "text", | ||
"analyzer": "edge_analyzer" | ||
}, | ||
"keyword": { | ||
"type": "keyword" | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scripts in bin/ are meant to be frequently reused; temporary backfill scripts should go in migrations
does this one really belong here in your opinion?
if it does, could it become part of backfill_nodes? it'd be better if there are not too many different scripts there
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's temporary, i'll move it to migrations