Skip to content

Commit

Permalink
fix: update after autocomplete PR
Browse files Browse the repository at this point in the history
  • Loading branch information
raphael0202 committed Nov 9, 2023
1 parent 0359c0b commit 1d4a9e8
Show file tree
Hide file tree
Showing 12 changed files with 264 additions and 275 deletions.
35 changes: 17 additions & 18 deletions app/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
from fastapi.templating import Jinja2Templates

from app import config
from app._types import SearchResponse
from app.config import check_config_is_defined, settings
from app.postprocessing import load_result_processor
from app.query import (
build_completion_query,
build_elasticsearch_query_builder,
build_search_query,
execute_query,
build_completion_query,
)
from app._types import SearchResponse
from app.utils import connection, get_logger, init_sentry

logger = get_logger()
Expand All @@ -29,11 +29,14 @@
logger.warning("Main configuration is not set, use CONFIG_PATH envvar")
FILTER_QUERY_BUILDER = None
RESULT_PROCESSOR = None
TAXONOMY_RESULT_PROCESSOR = None
else:
# we cache query builder and result processor here for faster processing
FILTER_QUERY_BUILDER = build_elasticsearch_query_builder(config.CONFIG)
RESULT_PROCESSOR = load_result_processor(config.CONFIG.result_processor)
TAXONOMY_RESULT_PROCESSOR = load_result_processor(config.CONFIG.taxonomy.autocomplete.result_processor)
TAXONOMY_RESULT_PROCESSOR = load_result_processor(
config.CONFIG.taxonomy.autocomplete.result_processor
)


app = FastAPI(
Expand Down Expand Up @@ -167,25 +170,21 @@ def search(

@app.get("/taxonomy")
def taxonomy_autocomplete(
q: Annotated[
str, Query(description="Give the user input string to autocomplete.)")
],
taxonomy_name: Annotated[
str, Query(description="Give the taxonomy name to search in.")
],
lang: Annotated[
str, Query(description="Give the language to search in, defaults to 'en'.)")
] = "en",
size: Annotated[
int, Query(description="Number of results to return.")
] = 10
q: Annotated[str, Query(description="User autocomplete query.")],
taxonomy_name: Annotated[
str, Query(description="Name of the taxonomy to search in.")
],
lang: Annotated[
str, Query(description="Language to search in, defaults to 'en'.")
] = "en",
size: Annotated[int, Query(description="Number of results to return.")] = 10,
):
query = build_completion_query(
q=q, taxonomy_name=taxonomy_name, lang=lang, size=size, config=config.CONFIG
q=q, taxonomy_name=taxonomy_name, lang=lang, size=size, config=config.CONFIG
)
results = query.execute()

response = TAXONOMY_RESULT_PROCESSOR.process(results, None)
response = TAXONOMY_RESULT_PROCESSOR.process(results)

return {
**response,
Expand Down Expand Up @@ -241,4 +240,4 @@ def html_search(

@app.get("/robots.txt", response_class=PlainTextResponse)
def robots_txt():
return """User-agent: *\nDisallow: /"""
return """User-agent: *\nDisallow: /"""
40 changes: 26 additions & 14 deletions app/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@

import typer

from app.cli.perform_import import perform_taxonomies_import
from app.config import check_config_is_defined, set_global_config

app = typer.Typer()


Expand Down Expand Up @@ -37,7 +34,7 @@ def import_data(

from app import config
from app.cli.perform_import import perform_import
from app.config import set_global_config
from app.config import check_config_is_defined, set_global_config
from app.utils import get_logger

logger = get_logger()
Expand All @@ -52,24 +49,39 @@ def import_data(
num_items,
num_processes,
start_time,
config.CONFIG,
config.CONFIG, # type: ignore
)
end_time = time.perf_counter()
logger.info("Import time: %s seconds", end_time - start_time)


@app.command(name="import-taxonomies")
def import_taxonomies():
@app.command()
def import_taxonomies(
config_path: Optional[Path] = typer.Option(
default=None,
help="path of the yaml configuration file, it overrides CONFIG_PATH envvar",
dir_okay=False,
file_okay=True,
exists=True,
),
):
"""Import taxonomies into Elasticsearch."""
import time
from app.utils import get_logger

from app import config
from app.cli.perform_import import perform_taxonomy_import
from app.config import check_config_is_defined, set_global_config
from app.utils import get_logger

logger = get_logger()

if config_path:
set_global_config(config_path)

check_config_is_defined()

start_time = time.perf_counter()
perform_taxonomies_import(
start_time,
config.CONFIG,
)
perform_taxonomy_import(config.CONFIG)
end_time = time.perf_counter()
logger.info("Import time: %s seconds", end_time - start_time)

Expand Down Expand Up @@ -101,7 +113,7 @@ def import_from_queue(
),
):
from app import config
from app.config import check_config_is_defined, settings
from app.config import check_config_is_defined, set_global_config, settings
from app.queue_helpers import run_queue_safe
from app.utils import connection, get_logger, init_sentry

Expand All @@ -119,7 +131,7 @@ def import_from_queue(
check_config_is_defined()

# run queue
run_queue_safe(config.CONFIG)
run_queue_safe(config.CONFIG) # type: ignore


def main() -> None:
Expand Down
140 changes: 73 additions & 67 deletions app/cli/perform_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,24 @@
from elasticsearch.helpers import bulk, parallel_bulk
from elasticsearch_dsl import Index, Search

from app._types import JSONType
from app.config import Config, TaxonomyConfig
from app.indexing import DocumentProcessor, generate_index_object
from app.indexing import (
DocumentProcessor,
generate_index_object,
generate_taxonomy_index_object,
)
from app.queue_helpers import RedisClient
from app._types import JSONType
from app.taxonomy import get_taxonomy
from app.utils import connection, get_logger
from app.utils.io import jsonl_iter

logger = get_logger(__name__)


def get_document_dict(processor: DocumentProcessor, row, next_index: str) -> JSONType:
def get_document_dict(
processor: DocumentProcessor, row, next_index: str
) -> JSONType | None:
"""Return the document dict suitable for a bulk insert operation."""
document = processor.from_dict(row)
if not document:
Expand All @@ -29,13 +35,13 @@ def get_document_dict(processor: DocumentProcessor, row, next_index: str) -> JSO


def gen_documents(
processor: DocumentProcessor,
file_path: Path,
next_index: str,
start_time,
num_items: int | None,
num_processes: int,
process_id: int,
processor: DocumentProcessor,
file_path: Path,
next_index: str,
start_time,
num_items: int | None,
num_processes: int,
process_id: int,
):
"""Generate documents to index for process number process_id
Expand All @@ -62,30 +68,34 @@ def gen_documents(
yield document_dict


def gen_taxonomies(
processor: DocumentProcessor,
next_index: str,
taxonomies: TaxonomyConfig
def gen_taxonomy_documents(
taxonomy_config: TaxonomyConfig, next_index: str, supported_langs: set[str]
):
"""Generate documents to index for process number process_id
"""Generator for taxonomy documents in Elasticsearch.
We chunk documents based on document num % process_id
:param taxonomy_config: the taxonomy configuration
:param next_index: the index to write to
:param supported_langs: a set of supported languages
:yield: a dict with the document to index, compatible with ES bulk API
"""
for i, taxonomy_source_config in enumerate(tqdm.tqdm(taxonomies.sources)):
for taxonomy_source_config in tqdm.tqdm(taxonomy_config.sources):
taxonomy = get_taxonomy(
taxonomy_source_config.name, str(taxonomy_source_config.url)
)
for x, node in taxonomy.nodes.items():
taxonomy_dict = {
"code": node.id,
"taxonomy_name": taxonomy_source_config.name.replace(" ", "_"),
"name": node.synonyms
for node in taxonomy.iter_nodes():
names = {
lang: lang_names
for lang, lang_names in node.synonyms.items()
if lang in supported_langs
}
yield {
"_index": next_index,
"_source": {
"id": node.id,
"taxonomy_name": taxonomy_source_config.name,
"names": names,
},
}
document_dict = get_document_dict(processor, taxonomy_dict, next_index)
if not document_dict:
continue

yield document_dict


def update_alias(es, next_index: str, index_alias: str):
Expand All @@ -106,13 +116,13 @@ def update_alias(es, next_index: str, index_alias: str):


def import_parallel(
config: Config,
file_path: Path,
next_index: str,
start_time: float,
num_items: int | None,
num_processes: int,
process_id: int,
config: Config,
file_path: Path,
next_index: str,
start_time: float,
num_items: int | None,
num_processes: int,
process_id: int,
):
"""One task of import.
Expand Down Expand Up @@ -148,27 +158,25 @@ def import_parallel(
logger.error("Encountered errors: %s", errors)


def import_taxonomies(
config: Config,
next_index: str,
start_time: float
):
"""One task of import.
:param str next_index: the index to write to
:param float start_time: the start time
def import_taxonomies(config: Config, next_index: str):
"""Import taxonomies into Elasticsearch.
A single taxonomy index is used to store all taxonomy items.
:param config: the configuration to use
:param next_index: the index to write to
"""
processor = DocumentProcessor(config)
# open a connection for this process
es = connection.get_es_client(timeout=120, retry_on_timeout=True)
# Note that bulk works better than parallel bulk for our usecase.
# The preprocessing in this file is non-trivial, so it's better to parallelize that. If we then do parallel_bulk
# here, this causes queueing and a lot of memory usage in the importer process.
# The preprocessing in this file is non-trivial, so it's better to
# parallelize that. If we then do parallel_bulk
# here, this causes queueing and a lot of memory usage in the importer
# process.
success, errors = bulk(
es,
gen_taxonomies(
processor,
next_index,
config.taxonomy
gen_taxonomy_documents(
config.taxonomy, next_index, supported_langs=set(config.supported_langs)
),
raise_on_error=False,
)
Expand All @@ -177,7 +185,7 @@ def import_taxonomies(


def get_redis_products(
processor: DocumentProcessor, next_index: str, last_updated_timestamp: int
processor: DocumentProcessor, next_index: str, last_updated_timestamp: int
):
"""Fetch IDs of documents to update from Redis."""
redis_client = RedisClient()
Expand Down Expand Up @@ -208,18 +216,18 @@ def get_redis_updates(next_index: str, config: Config):

# Since this is only done by a single process, we can use parallel_bulk
for success, info in parallel_bulk(
es, get_redis_products(processor, next_index, last_updated_timestamp)
es, get_redis_products(processor, next_index, last_updated_timestamp)
):
if not success:
logger.warning("A document failed: %s", info)


def perform_import(
file_path: Path,
num_items: int | None,
num_processes: int,
start_time: float,
config: Config,
file_path: Path,
num_items: int | None,
num_processes: int,
start_time: float,
config: Config,
):
"""Main function running the import sequence"""
es = connection.get_es_client()
Expand Down Expand Up @@ -255,24 +263,22 @@ def perform_import(
update_alias(es, next_index, config.index.name)


def perform_taxonomies_import(
start_time: float,
config: Config,
):
"""Main function running the import sequence"""
def perform_taxonomy_import(config: Config):
"""Create a new index for taxonomies and import them.
:param config: the configuration to use
"""
es = connection.get_es_client()
merged_dict = {**config.dict(), **config.taxonomy.autocomplete.dict()}
taxonomy_config: Config = Config(**merged_dict)
# we create a temporary index to import to
# at the end we will change alias to point to it
index_date = datetime.now().strftime("%Y-%m-%d-%H-%M-%S-%f")
next_index = f"{taxonomy_config.index.name}-{index_date}"
next_index = f"{config.taxonomy.autocomplete.index.name}-{index_date}"

index = generate_index_object(next_index, taxonomy_config)
index = generate_taxonomy_index_object(next_index, config)
# create the index
index.save()

import_taxonomies(taxonomy_config, next_index, start_time)
import_taxonomies(config, next_index)

# make alias point to new index
update_alias(es, next_index, taxonomy_config.index.name)
update_alias(es, next_index, config.taxonomy.autocomplete.index.name)
Loading

0 comments on commit 1d4a9e8

Please sign in to comment.