Skip to content

Commit

Permalink
fix: bug fixes in product.py
Browse files Browse the repository at this point in the history
  • Loading branch information
raphael0202 committed Sep 19, 2023
1 parent d84fa1f commit 0d6aec2
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 102 deletions.
14 changes: 6 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@

This API is currently in development. Read [Search-a-licious roadmap architecture notes](https://docs.google.com/document/d/1mibE8nACcmen6paSrqT9JQk5VbuvlFUXI1S93yHCK2I/edit) to understand where we are headed.

The file product.schema.json contains a partial schema of the returned products.

### Organization
The main file is `api.py`, and the Product schema is in `models/product.py`.
The main file is `api.py`, and the schema is in `models/product.py`.

The `scripts/` directory contains various scripts for manual validation, constructing the product schema, importing, etc.
A CLI is available to perform common tasks.

### Running locally

Expand All @@ -34,7 +32,7 @@ Docker spins up:
- The search service on port 8000
- Redis on port 6379

You will then need to import from MongoDB (see instructions below).
You will then need to import from a JSONL dump (see instructions below).

### Development
For development, you have two options for running the service:
Expand Down Expand Up @@ -64,18 +62,18 @@ pre-commit run
```

### Running the import:
To import data from the [MongoDB export](https://world.openfoodfacts.org/data):
To import data from the [JSONL export](https://world.openfoodfacts.org/data):

1. First ensure that your docker environment has at least 150GB of disk and 6GB of RAM. This can be found under settings --> resources

2. Run the following command:
```console
python scripts/perform_import_parallel.py --filename=/path/to/products.jsonl --num_processes=2
python3 -m app import /path/to/products.jsonl.gz --num_processes=2
```

Or using docker:
```console
docker-compose run --rm -v $(pwd)/path/to/products.jsonl:/mnt/products.jsonl:ro searchservice python3 app/scripts/perform_import_parallel.py --filename=/mnt/products.jsonl --num_processes=2
docker-compose run --rm -v $(pwd)/path/to/products.jsonl.gz:/mnt/products.jsonl.gz:ro searchservice python3 -m app import /mnt/products.jsonl.gz --num_processes=2
```

If you get errors, try adding more RAM (12GB works well if you have that spare), or slow down the indexing process by setting `num_processes` to 1 in the command above.
Expand Down
4 changes: 4 additions & 0 deletions app/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from app.cli.main import main

if __name__ == "__main__":
main()
64 changes: 64 additions & 0 deletions app/cli/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from pathlib import Path
from typing import Optional

import typer

app = typer.Typer()


@app.command()
def import_data(
input_path: Path = typer.Argument(
...,
exists=True,
file_okay=True,
dir_okay=False,
help="Path of the JSONL data file",
),
num_processes: int = typer.Option(
default=2,
help="How many import processes to run in parallel"
),
num_items: Optional[int] = typer.Option(default=None, help="How many items to import"),
):
"""Import data into Elasticsearch."""
import time
from app.utils import get_logger
from app.cli.perform_import import perform_import

logger = get_logger()

start_time = time.perf_counter()
perform_import(
input_path,
num_items,
num_processes,
start_time,
)
end_time = time.perf_counter()
logger.info("Import time: %s seconds", end_time - start_time)



@app.command()
def write_to_redis():
import time

from redis import Redis

redis = Redis(
host="searchredis",
port=6379,
decode_responses=True,
)
key_name = "search_import_queue"
while True:
code = input("Please enter a product code:\n")
start_time = time.perf_counter()

redis.rpush(key_name, code)
end_time = time.perf_counter()
print(f"Time: {end_time - start_time} seconds")

def main() -> None:
app()
84 changes: 24 additions & 60 deletions app/cli/perform_import_parallel.py → app/cli/perform_import.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,4 @@
"""
Performs an import from a MongoDB products.jsonl file.
Pass in the path of the file with the filename argument
Example:
python scripts/perform_import_parallel.py --filename=X
"""
import argparse
from pathlib import Path
import time
from datetime import datetime
from multiprocessing import Pool
Expand All @@ -18,10 +10,13 @@
from app.config import CONFIG, Config
from app.import_queue.redis_client import RedisClient
from app.models.product import Product, ProductProcessor
from app.utils import connection, constants
from app.utils import connection, constants, get_logger
from app.utils.io import jsonl_iter


logger = get_logger(__name__)


def get_product_dict(processor: ProductProcessor, row, next_index: str):
"""Return the product dict suitable for a bulk insert operation"""
product = processor.from_dict(row)
Expand All @@ -36,19 +31,19 @@ def get_product_dict(processor: ProductProcessor, row, next_index: str):

def gen_documents(
processor: ProductProcessor,
filename: str,
file_path: Path,
next_index: str,
start_time,
num_items: int,
num_items: int | None,
num_processes: int,
process_id: int,
):
"""Generate documents to index for process number process_id
We chunk documents based on document num % process_id
"""
for i, row in enumerate(tqdm.tqdm(jsonl_iter(filename))):
if i > num_items:
for i, row in enumerate(tqdm.tqdm(jsonl_iter(file_path))):
if num_items is not None and i > num_items:
break
# Only get the relevant
if i % num_processes != process_id:
Expand All @@ -57,8 +52,10 @@ def gen_documents(
if i % 100000 == 0 and i:
# Roughly 2.5M lines as of August 2022
current_time = time.perf_counter()
print(
f"Processed: {i} lines in {round(current_time - start_time)} seconds",
logger.info(
"Processed: %d lines in %s seconds",
i,
round(current_time - start_time)
)

# At least one document doesn't have a code set, don't import it
Expand Down Expand Up @@ -94,19 +91,19 @@ def update_alias(es, next_index):

def import_parallel(
config: Config,
filename: str,
file_path: Path,
next_index: str,
start_time: float,
num_items: int,
num_items: int | None,
num_processes: int,
process_id: int,
):
"""One task of import.
:param str filename: the JSONL file to read
:param Path file_path: the JSONL file to read
:param str next_index: the index to write to
:param float start_time: the start time
:param int num_items: max number of items to import
:param int num_items: max number of items to import, default to no limit
:param int num_processes: total number of processes
:param int process_id: the index of the process (from 0 to num_processes - 1)
"""
Expand All @@ -120,7 +117,7 @@ def import_parallel(
es,
gen_documents(
processor,
filename,
file_path,
next_index,
start_time,
num_items,
Expand All @@ -130,8 +127,7 @@ def import_parallel(
raise_on_error=False,
)
if not success:
print("Encountered errors:")
print(errors)
logger.error("Encountered errors: %s", errors)


def get_redis_products(
Expand All @@ -142,15 +138,15 @@ def get_redis_products(
Those ids are set by productopener on products updates
"""
redis_client = RedisClient()
print(f"Processing redis updates since {last_updated_timestamp}")
logger.info("Processing redis updates since %s", last_updated_timestamp)
timestamp_processed_values = redis_client.get_processed_since(
last_updated_timestamp,
)
for _, row in timestamp_processed_values:
product_dict = get_product_dict(processor, row, next_index)
yield product_dict

print(f"Processed {len(timestamp_processed_values)} updates from Redis")
logger.info("Processed %d updates from Redis", len(timestamp_processed_values))


def get_redis_updates(next_index: str):
Expand All @@ -169,11 +165,11 @@ def get_redis_updates(next_index: str):
es, get_redis_products(next_index, last_updated_timestamp)
):
if not success:
print("A document failed: ", info)
logger.warning("A document failed: %s", info)


def perform_import(
filename: str, num_items: int, num_processes: int, start_time: float
file_path: Path, num_items: int | None, num_processes: int, start_time: float
):
"""Main function running the import sequence"""
es = connection.get_connection()
Expand All @@ -192,7 +188,7 @@ def perform_import(
args.append(
(
CONFIG,
filename,
file_path,
next_index,
start_time,
num_items,
Expand All @@ -207,35 +203,3 @@ def perform_import(
get_redis_updates(next_index)
# make alias point to new index
update_alias(es, next_index)


if __name__ == "__main__":
parser = argparse.ArgumentParser("perform_import")
parser.add_argument(
"--filename",
help="Filename where the JSONL file is located",
type=str,
)
parser.add_argument(
"--num_items",
help="How many items to import",
type=int,
default=100000000,
)
parser.add_argument(
"--num_processes",
help="How many import processes to run in parallel",
type=int,
default=2,
)
args = parser.parse_args()

start_time = time.perf_counter()
perform_import(
args.filename,
args.num_items,
args.num_processes,
start_time,
)
end_time = time.perf_counter()
print(f"Import time: {end_time - start_time} seconds")
25 changes: 0 additions & 25 deletions app/cli/write_to_redis.py

This file was deleted.

12 changes: 10 additions & 2 deletions app/models/product.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ def __init__(self, config: Config) -> None:

@abc.abstractmethod
def preprocess(self, document: JSONType) -> JSONType:
"""Preprocess the document before data ingestion in Elasticsearch.
This can be used to make document schema compatible with the project
schema or to add custom fields.
"""
pass


Expand All @@ -40,7 +45,9 @@ def process_text_lang_field(
for target_key in (
k
for k in data
if re.fullmatch(f"{re.escape(input_field)}{config.lang_separator}\w\w([-_]\w\w)?", k)
if re.fullmatch(
f"{re.escape(input_field)}{config.lang_separator}\w\w([-_]\w\w)?", k
)
):
input_value = preprocess_field(
data,
Expand Down Expand Up @@ -95,6 +102,7 @@ def process_taxonomy_field(
class ProductProcessor:
def __init__(self, config: Config) -> None:
self.config = config
self.preprocessor: DocumentPreprocessor | None

if config.preprocessor is not None:
preprocessor_cls = load_class_object_from_string(config.preprocessor)
Expand All @@ -104,7 +112,7 @@ def __init__(self, config: Config) -> None:

def from_dict(self, d: JSONType):
inputs = {}
d = self.preprocessor(d) if self.preprocessor is not None else d
d = self.preprocessor.preprocess(d) if self.preprocessor is not None else d

for field in self.config.fields:
input_field = field.get_input_field()
Expand Down
14 changes: 7 additions & 7 deletions app/openfoodfacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ def preprocess(self, document: JSONType) -> JSONType:
# - `languages_code`
# - `countries_tags`: we add every official language of the countries where the product
# can be found.
supported_langs = set(document["languages_codes"])
countries_tags = document["countries_tags"]
supported_langs = set(document.get("languages_codes", []))
countries_tags = document.get("countries_tags", [])
country_taxonomy = get_taxonomy("country", COUNTRIES_TAXONOMY_URL)

for country_tag in countries_tags:
supported_langs |= set(
country_taxonomy[country_tag]
.properties["language_codes"]["en"]
.split(",")
)
# Check that `country_tag` is in taxonomy
if (country_node := country_taxonomy[country_tag]) is not None:
# Get all official languages of the country, and add them to `supported_langs`
if (lang_codes :=country_node.properties.get("language_codes", {}).get("en")) is not None:
supported_langs |= set(lang_code for lang_code in lang_codes.split(",") if lang_code)

document["supported_langs"] = list(supported_langs)
return document
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ redis==5.0.0
uvicorn==0.23.2
tqdm==4.66.1
cachetools==5.3.1
typer==0.9.0

0 comments on commit 0d6aec2

Please sign in to comment.