diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml new file mode 100644 index 00000000..b1a37163 --- /dev/null +++ b/.github/workflows/python-app.yml @@ -0,0 +1,39 @@ +# This workflow will install Python dependencies, run tests and lint with a single version of Python +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python + +name: Python application + +on: + push: + branches: [ "develop" ] + pull_request: + branches: [ "develop" ] + +permissions: + contents: read + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + - name: Set up Python 3.10 + uses: actions/setup-python@v3 + with: + python-version: "3.10.9" + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install flake8 pytest poetry + poetry install + #- name: Lint with flake8 + # run: | + # # stop the build if there are Python syntax errors or undefined names + # flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + # # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide + # flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics + - name: Test with pytest + run: | + poetry run pytest \ No newline at end of file diff --git a/Dockerfile-classic-api b/Dockerfile-classic-api index d58a6fe9..a0b3ce27 100644 --- a/Dockerfile-classic-api +++ b/Dockerfile-classic-api @@ -3,40 +3,50 @@ # Defines the runtime for the arXiv classic API, which provides a metadata # query API backed by Elasticsearch. -FROM arxiv/base:0.16.6 - -WORKDIR /opt/arxiv - - -ENV LC_ALL en_US.utf8 -ENV LANG en_US.utf8 -ENV LOGLEVEL 40 -ENV PIPENV_VENV_IN_PROJECT 1 -ENV FLASK_DEBUG 1 -ENV FLASK_APP /opt/arxiv/classic-api.py -ENV PATH "/opt/arxiv:${PATH}" -ENV ELASTICSEARCH_SERVICE_HOST 127.0.0.1 -ENV ELASTICSEARCH_SERVICE_PORT 9200 -ENV ELASTICSEARCH_SERVICE_PORT_9200_PROTO http -ENV ELASTICSEARCH_INDEX arxiv -ENV ELASTICSEARCH_USER elastic -ENV ELASTICSEARCH_PASSWORD changeme -ENV METADATA_ENDPOINT https://arxiv.org/docmeta_bulk/ - - -# Add Python application and configuration. -ADD Pipfile /opt/arxiv/ -ADD Pipfile.lock /opt/arxiv/ -RUN pip install -U pip pipenv -RUN pipenv sync --dev -ADD classic-api.py /opt/arxiv/ -ADD schema /opt/arxiv/schema -ADD mappings /opt/arxiv/mappings -ADD search /opt/arxiv/search -ADD wsgi-classic-api.py config/uwsgi-classic-api.ini /opt/arxiv/ - - -EXPOSE 8000 - -ENTRYPOINT ["pipenv", "run"] -CMD ["uwsgi", "--ini", "/opt/arxiv/uwsgi-classic-api.ini"] +# File: Dockerfile-classic-api +# Desc: arxiv search classic api +# Use: +# docker build --build-arg GIT_COMMIT=$(git rev-parse HEAD) \ +# -t "arxiv/arxiv-search-classic-api" -f ./Dockerfile-classic-api . +# docker run -it --env-file=env -p 8080:8080 arxiv/arxiv-search-classic-api + +FROM python:3.10.9-buster + +ARG GIT_COMMIT + +ENV \ + APP_HOME=/app \ + ELASTICSEARCH_PASSWORD=changeme \ + ELASTICSEARCH_SERVICE_HOST=127.0.0.1 \ + ELASTICSEARCH_SERVICE_PORT=9200 \ + ELASTICSEARCH_SERVICE_PORT_9200_PROTO=http \ + GIT_COMMIT=$GIT_COMMIT \ + METADATA_ENDPOINT=https://arxiv.org/docmeta_bulk/ \ + PIP_DEFAULT_TIMEOUT=100 \ + PIP_DISABLE_PIP_VERSION_CHECK=on \ + PIP_NO_CACHE_DIR=off \ + POETRY_VERSION=1.2.2 \ + PYTHONFAULTHANDLER=1 \ + PYTHONHASHSEED=random \ + PYTHONUNBUFFERED=1 \ + TRACE=1 + +WORKDIR $APP_HOME +COPY poetry.lock pyproject.toml ./ +COPY app.py wsgi.py uwsgi.ini ./ +COPY schema ./schema +COPY mappings ./mappings +COPY search ./search +RUN echo $GIT_COMMIT > ./git-commit.txt + +RUN pip install "gunicorn==20.1.0" "poetry==$POETRY_VERSION" +RUN poetry config virtualenvs.create false && \ + poetry install --no-interaction --no-ansi + +EXPOSE 8080 + +# See cicd/cloudbuild-master-pr.yaml for use in integration tests. +ENV GUNICORN gunicorn --bind :8080 \ + --workers 1 --threads 8 --timeout 0 "search.factory:create_classic_api_web_app()" + +CMD exec $GUNICORN \ No newline at end of file diff --git a/README.md b/README.md index f918e407..30717fc4 100644 --- a/README.md +++ b/README.md @@ -234,6 +234,11 @@ E.g. WITH_INTEGRATION=1 pipenv run nose2 --with-coverage ``` +You can also run to avoid all intentional error messages or kinesis print statements created during the tests +```bash +pytest --disable-warnings search +``` + ### Static checking Goal: zero errors/warnings. diff --git a/search/agent/__init__.py b/search/agent/__init__.py deleted file mode 100644 index 7b2a331d..00000000 --- a/search/agent/__init__.py +++ /dev/null @@ -1,34 +0,0 @@ -""" -The search agent is responsible for updating the index as papers are published. - -The agent consumes notifications on the ``MetadataIsAvailable`` stream. For -each notification, the agent retrieves metadata for the most recent version of -the indicated paper from the :mod:`search.services.metadata` service. The agent -also retrieves metadata for earlier versions, if there are multiple versions -available. Each version is passed to the :mod:`search.services.index` service, -and becomes available for discovery via :mod:`search.routes.ui`. -""" -from typing import Optional - -from flask import current_app as app - -from arxiv.base import agent -from search.agent.consumer import MetadataRecordProcessor - - -def process_stream(duration: Optional[int] = None) -> None: - """ - Configure and run the record processor. - - Parameters - ---------- - duration : int - Time (in seconds) to run record processing. If None (default), will - run "forever". - - """ - # We use the Flask application instance for configuration, and to manage - # integrations with metadata service, search index. - agent.process_stream( - MetadataRecordProcessor, app.config, duration=duration - ) diff --git a/search/agent/consumer.py b/search/agent/consumer.py deleted file mode 100644 index 19f2b181..00000000 --- a/search/agent/consumer.py +++ /dev/null @@ -1,325 +0,0 @@ -"""Provides a record processor for MetadataIsAvailable notifications.""" - -import json -import time -from typing import List, Dict, Any - -from retry.api import retry_call - -import logging - -from arxiv.base.agent import BaseConsumer -from search.services import metadata, index -from search.process import transform -from search.domain import DocMeta, Document - - -logger = logging.getLogger(__name__) -logger.propagate = False - - -class DocumentFailed(RuntimeError): - """Raised when an arXiv paper could not be added to the search index.""" - - -class IndexingFailed(RuntimeError): - """Raised when indexing failed such that future success is unlikely.""" - - -class MetadataRecordProcessor(BaseConsumer): - """Consumes ``MetadataIsAvailable`` notifications, updates the index.""" - - MAX_ERRORS = 5 - """Max number of individual document failures before aborting entirely.""" - - def __init__(self, *args: Any, **kwargs: Any) -> None: - """Initialize exception counter.""" - self.sleep: float = kwargs.pop("sleep", 0.1) - super(MetadataRecordProcessor, self).__init__( - *args, **kwargs - ) # type: ignore - self._error_count = 0 - - # TODO: bring McCabe index down. - def _get_metadata(self, arxiv_id: str) -> DocMeta: - """ - Retrieve metadata from the :mod:`.metadata` service. - - Parameters - ---------- - arxiv_id : str - Am arXiv identifier, with or without a version affix. - - Returns - ------- - :class:`.DocMeta` - Metadata for the arXiv paper. - - Raises - ------ - DocumentFailed - Indexing of the document failed. This may have no bearing on the - success of subsequent papers. - IndexingFailed - Indexing of the document failed in a way that indicates recovery - is unlikely for subsequent papers. - - """ - logger.debug("%s: get metadata", arxiv_id) - - try: - docmeta: DocMeta = retry_call( - metadata.retrieve, - (arxiv_id,), - exceptions=metadata.ConnectionFailed, - tries=2, - ) - except metadata.ConnectionFailed as ex: - # Things really are looking bad. There is no need to keep - # trying with subsequent records, so let's abort entirely. - logger.error("%s: second attempt failed, giving up", arxiv_id) - raise IndexingFailed( - "Indexing failed; metadata endpoint could not be reached." - ) from ex - except metadata.RequestFailed as ex: - logger.error(f"{arxiv_id}: request failed") - raise DocumentFailed("Request to metadata service failed") from ex - except metadata.BadResponse as ex: - logger.error(f"{arxiv_id}: bad response from metadata service") - raise DocumentFailed("Bad response from metadata service") from ex - except Exception as ex: - logger.error( - f"{arxiv_id}: unhandled error, metadata service: {ex}" - ) - raise IndexingFailed("Unhandled exception") from ex - return docmeta - - def _get_bulk_metadata(self, arxiv_ids: List[str]) -> List[DocMeta]: - """ - Retrieve metadata from :mod:`.metadata` service for multiple documents. - - Parameters - ---------- - arxiv_id : str - Am arXiv identifier, with or without a version affix. - - Returns - ------- - Dict[str, :class:`.DocMeta`] - A dictionary containing arxiv_ids as keys and DocMeta objects as - values. - - Raises - ------ - DocumentFailed - Indexing of the document failed. This may have no bearing on the - success of subsequent papers. - IndexingFailed - Indexing of the document failed in a way that indicates recovery - is unlikely for subsequent papers. - - """ - logger.debug("%s: get bulk metadata", arxiv_ids) - meta: List[DocMeta] - try: - meta = retry_call( - metadata.bulk_retrieve, - (arxiv_ids,), - exceptions=metadata.ConnectionFailed, - tries=2, - ) - except metadata.ConnectionFailed as ex: - # Things really are looking bad. There is no need to keep - # trying with subsequent records, so let's abort entirely. - logger.error("%s: second attempt failed, giving up", arxiv_ids) - raise IndexingFailed("Metadata endpoint not available") from ex - except metadata.RequestFailed as ex: - logger.error("%s: request failed", arxiv_ids) - raise DocumentFailed("Request to metadata service failed") from ex - except metadata.BadResponse as ex: - logger.error("%s: bad response from metadata service", arxiv_ids) - raise DocumentFailed("Bad response from metadata service") from ex - except Exception as ex: - logger.error( - "%s: unhandled error, metadata svc: %s", arxiv_ids, ex - ) - raise IndexingFailed("Unhandled exception") from ex - return meta - - @staticmethod - def _transform_to_document(docmeta: DocMeta) -> Document: - """ - Transform paper :class:`.DocMeta` to a search :class:`.Document`. - - Parameters - ---------- - docmeta : :class:`DocMeta` - Metadata for an arXiv paper. - - Returns - ------- - :class:`.Document` - A search document ready for indexing. - - Raises - ------ - DocumentFailed - Indexing of the document failed. This may have no bearing on the - success of subsequent papers. - - """ - try: - document = transform.to_search_document(docmeta) - except Exception as ex: - # At the moment we don't have any special exceptions. - logger.error("unhandled exception during transform: %s", ex) - raise DocumentFailed("Could not transform document") from ex - - return document - - @staticmethod - def _add_to_index(document: Document) -> None: - """ - Add a :class:`.Document` to the search index. - - Parameters - ---------- - document : :class:`.Document` - - Raises - ------ - IndexingFailed - Indexing of the document failed in a way that indicates recovery - is unlikely for subsequent papers. - - """ - try: - retry_call( - index.SearchSession.add_document, - (document,), - exceptions=index.IndexConnectionError, - tries=2, - ) - except index.IndexConnectionError as ex: - raise IndexingFailed("Could not index document") from ex - except Exception as ex: - logger.error(f"Unhandled exception from index service: {ex}") - raise IndexingFailed("Unhandled exception") from ex - - @staticmethod - def _bulk_add_to_index(documents: List[Document]) -> None: - """ - Add :class:`.Document` to the search index. - - Parameters - ---------- - documents : :class:`.Document` - - Raises - ------ - IndexingFailed - Indexing of the document failed in a way that indicates recovery - is unlikely for subsequent papers. - - """ - try: - retry_call( - index.SearchSession.bulk_add_documents, - (documents,), - exceptions=index.IndexConnectionError, - tries=2, - ) - except index.IndexConnectionError as ex: - raise IndexingFailed("Could not bulk index documents") from ex - except Exception as ex: - logger.error(f"Unhandled exception from index service: {ex}") - raise IndexingFailed("Unhandled exception") from ex - - def index_paper(self, arxiv_id: str) -> None: - """ - Index a single paper, including its previous versions. - - Parameters - ---------- - arxiv_id : str - A **versionless** arXiv e-print identifier. - - """ - self.index_papers([arxiv_id]) - - def index_papers(self, arxiv_ids: List[str]) -> None: - """ - Index multiple papers, including their previous versions. - - Parameters - ---------- - arxiv_ids : List[str] - A list of **versionless** arXiv e-print identifiers. - - Raises - ------ - DocumentFailed - Indexing of the documents failed. This may have no bearing on the - success of subsequent papers. - IndexingFailed - Indexing of the documents failed in a way that indicates recovery - is unlikely for subsequent papers. - - """ - try: - documents = [] - for docmeta in self._get_bulk_metadata(arxiv_ids): - logger.debug("%s: transform to Document", docmeta.paper_id) - document = MetadataRecordProcessor._transform_to_document( - docmeta - ) - documents.append(document) - logger.debug("add to index in bulk") - MetadataRecordProcessor._bulk_add_to_index(documents) - except (DocumentFailed, IndexingFailed) as ex: - # We just pass these along so that process_record() can keep track. - logger.debug(f"{arxiv_ids}: Document failed: {ex}") - raise ex - - # FIXME: Argument type. - def process_record(self, record: Dict[Any, Any]) -> None: - """ - Call for each record that is passed to process_records. - - Parameters - ---------- - data : bytes - partition_key : bytes - sequence_number : int - sub_sequence_number : int - - Raises - ------ - IndexingFailed - Indexing of the document failed in a way that indicates recovery - is unlikely for subsequent papers, or too many individual - documents failed. - - """ - time.sleep(self.sleep) - logger.info(f'Processing record {record["SequenceNumber"]}') - if self._error_count > self.MAX_ERRORS: - raise IndexingFailed("Too many errors") - - try: - deserialized = json.loads(record["Data"].decode("utf-8")) - except json.decoder.JSONDecodeError as ex: - logger.error("Error while deserializing data %s", ex) - logger.error("Data payload: %s", record["Data"]) - raise DocumentFailed("Could not deserialize record data") - # return # Don't bring down the whole batch. - - try: - arxiv_id: str = deserialized.get("document_id") - self.index_paper(arxiv_id) - except DocumentFailed as ex: - logger.debug("%s: failed to index document: %s", arxiv_id, ex) - self._error_count += 1 - except IndexingFailed as ex: - logger.error("Indexing failed: %s", ex) - raise diff --git a/search/agent/tests/__init__.py b/search/agent/tests/__init__.py deleted file mode 100644 index a8ead5d1..00000000 --- a/search/agent/tests/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Tests for :mod:`search.agent`.""" diff --git a/search/agent/tests/test_integration.py b/search/agent/tests/test_integration.py deleted file mode 100644 index 39a82a85..00000000 --- a/search/agent/tests/test_integration.py +++ /dev/null @@ -1,138 +0,0 @@ -"""Integration tests for :mod:`search.agent` with Kinesis.""" - -from unittest import TestCase, mock -import os -import json -import time -import boto3 -import tempfile -import subprocess - -from arxiv.base.agent import StopProcessing -from search.domain import DocMeta -from search.services import metadata -from search.agent import process_stream -from search.factory import create_ui_web_app - - -BASE_PATH = os.path.join( - os.path.split(os.path.abspath(__file__))[0], "../../../tests/data/examples" -) - - -class TestKinesisIntegration(TestCase): - """Test :class:`.MetadataRecordProcessor` with a live Kinesis stream.""" - - __test__ = int(bool(os.environ.get("WITH_INTEGRATION", False))) - - @classmethod - def setUpClass(cls): - """Spin up ES and index documents.""" - os.environ["ELASTICSEARCH_SERVICE_HOST"] = "localhost" - os.environ["ELASTICSEARCH_SERVICE_PORT"] = "9201" - os.environ["ELASTICSEARCH_SERVICE_PORT_9201_PROTO"] = "http" - os.environ["ELASTICSEARCH_VERIFY"] = "false" - - os.environ["KINESIS_STREAM"] = "MetadataIsAvailable" - os.environ["KINESIS_SHARD_ID"] = "0" - os.environ["KINESIS_CHECKPOINT_VOLUME"] = tempfile.mkdtemp() - os.environ["KINESIS_ENDPOINT"] = "http://127.0.0.1:6568" - os.environ["KINESIS_VERIFY"] = "false" - os.environ["KINESIS_START_TYPE"] = "TRIM_HORIZON" - - print("pulling localstack image") - _ = subprocess.run( - "docker pull atlassianlabs/localstack", - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - shell=True, - ) - - print("starting localstack") - start_localstack = subprocess.run( - "docker run -d -p 6568:4568 --name ltest atlassianlabs/localstack", - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - shell=True, - ) - - if start_localstack.returncode != 0: - raise RuntimeError( - f"Could not start localstack: {start_localstack.stdout}." - f" Is one already running? Is port 6568 available?" - ) - cls.ls_container = start_localstack.stdout.decode("ascii").strip() - print(f"localstack started as {cls.ls_container}") - - cls.client = boto3.client( - "kinesis", - region_name="us-east-1", - endpoint_url="http://localhost:6568", - aws_access_key_id="foo", - aws_secret_access_key="bar", - verify=False, - ) - print("creating stream ahead of time, to populate with records") - cls.client.create_stream( - StreamName="MetadataIsAvailable", ShardCount=1 - ) - time.sleep(5) - print("created stream, ready to test") - cls.app = create_ui_web_app() - - @classmethod - def tearDownClass(cls): - """Tear down Elasticsearch once all tests have run.""" - _ = subprocess.run( - f"docker rm -f {cls.ls_container}", - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - shell=True, - ) - - @mock.patch("search.agent.consumer.index") - @mock.patch("search.agent.consumer.metadata") - def test_process(self, mock_metadata, mock_index): - """Add some records to the stream, and run processing loop for 5s.""" - to_index = [ - "1712.04442", # flux capacitor - "1511.07473", # flux capacitor - "1604.04228", # flux capacitor - "1403.6219", # λ - "1404.3450", # $z_1$ - "1703.09067", # $\lambda$ - "1408.6682", # $\lambda$ - "1607.05107", # Schröder - "1509.08727", # Schroder - "1710.01597", # Schroeder - "1708.07156", # w w - "1401.1012", # Wonmin Son - ] - for document_id in to_index: - data = bytes( - json.dumps({"document_id": document_id}), encoding="utf-8" - ) - - self.client.put_record( - StreamName="MetadataIsAvailable", Data=data, PartitionKey="0" - ) - - def retrieve(document_id): - with open(os.path.join(BASE_PATH, f"{document_id}.json")) as f: - return DocMeta(**json.load(f)) - - mock_metadata.retrieve.side_effect = retrieve - - # Preserve exceptions - mock_metadata.RequestFailed = metadata.RequestFailed - mock_metadata.SecurityException = metadata.SecurityException - mock_metadata.ConnectionFailed = metadata.ConnectionFailed - mock_metadata.BadResponse = metadata.BadResponse - - with self.app.app_context(): - try: - process_stream(duration=30) - except StopProcessing: - pass - - self.assertGreater(mock_metadata.bulk_retrieve.call_count, 0) diff --git a/search/agent/tests/test_record_processor.py b/search/agent/tests/test_record_processor.py deleted file mode 100644 index 56adf97e..00000000 --- a/search/agent/tests/test_record_processor.py +++ /dev/null @@ -1,407 +0,0 @@ -"""Unit tests for :mod:`search.agent`.""" - -from unittest import TestCase, mock - -from search.domain import DocMeta, Document -from search.services import metadata, index -from search.agent import consumer - -# type: ignore - - -class TestIndexPaper(TestCase): - """Re-index all versions of an arXiv paper.""" - - def setUp(self): - """Initialize a :class:`.MetadataRecordProcessor`.""" - self.checkpointer = mock.MagicMock() - self.args = ( - "foo", - "1", - "a1b2c3d4", - "qwertyuiop", - "us-east-1", - self.checkpointer, - ) - - @mock.patch("boto3.client") - @mock.patch("search.agent.consumer.index.SearchSession") - @mock.patch("search.agent.consumer.transform") - @mock.patch("search.agent.consumer.metadata") - def test_paper_has_one_version( - self, mock_meta, mock_tx, mock_idx, mock_client_factory - ): - """The arXiv paper has only one version.""" - mock_client = mock.MagicMock() - mock_waiter = mock.MagicMock() - mock_client.get_waiter.return_value = mock_waiter - mock_client_factory.return_value = mock_client - processor = consumer.MetadataRecordProcessor(*self.args) - - mock_docmeta = DocMeta( - version=1, - paper_id="1234.56789", - title="foo", - submitted_date="2001-03-02T03:04:05-400", - ) - mock_meta.retrieve.return_value = mock_docmeta - mock_meta.bulk_retrieve.return_value = [mock_docmeta] - - mock_doc = Document( - version=1, - paper_id="1234.56789", - title="foo", - submitted_date=["2001-03-02T03:04:05-400"], - ) - mock_tx.to_search_document.return_value = mock_doc - - processor.index_paper("1234.56789") - - mock_idx.bulk_add_documents.assert_called_once_with([mock_doc]) - - @mock.patch("boto3.client") - @mock.patch("search.agent.consumer.index.SearchSession") - @mock.patch("search.agent.consumer.transform") - @mock.patch("search.agent.consumer.metadata") - def test_paper_has_three_versions( - self, mock_meta, mock_tx, mock_idx, mock_client_factory - ): - """The arXiv paper has three versions.""" - mock_client = mock.MagicMock() - mock_waiter = mock.MagicMock() - mock_client.get_waiter.return_value = mock_waiter - mock_client_factory.return_value = mock_client - processor = consumer.MetadataRecordProcessor(*self.args) - - mock_dm_1 = DocMeta( - version=1, - paper_id="1234.56789", - title="foo", - submitted_date="2001-03-02T03:04:05-400", - ) - mock_dm_2 = DocMeta( - version=2, - paper_id="1234.56789", - title="foo", - submitted_date="2001-03-03T03:04:05-400", - ) - mock_dm_3 = DocMeta( - version=3, - paper_id="1234.56789", - title="foo", - submitted_date="2001-03-04T03:04:05-400", - ) - mock_meta.retrieve.side_effect = [mock_dm_3, mock_dm_1, mock_dm_2] - - mock_meta.bulk_retrieve.return_value = [ - mock_dm_3, - mock_dm_1, - mock_dm_2, - mock_dm_3, - ] - - mock_doc_1 = Document( - version=1, - paper_id="1234.56789", - title="foo", - submitted_date=["2001-03-02T03:04:05-400"], - submitted_date_all=["2001-03-02T03:04:05-400"], - ) - mock_doc_2 = Document( - version=2, - paper_id="1234.56789", - title="foo", - submitted_date=["2001-03-03T03:04:05-400"], - submitted_date_all=[ - "2001-03-02T03:04:05-400", - "2001-03-03T03:04:05-400", - ], - ) - mock_doc_3 = Document( - version=3, - paper_id="1234.56789", - title="foo", - submitted_date=["2001-03-04T03:04:05-400"], - submitted_date_all=[ - "2001-03-02T03:04:05-400", - "2001-03-03T03:04:05-400", - "2001-03-04T03:04:05-400", - ], - ) - mock_tx.to_search_document.side_effect = [ - mock_doc_3, - mock_doc_1, - mock_doc_2, - mock_doc_3, - ] - processor.index_paper("1234.56789") - self.assertEqual( - mock_meta.bulk_retrieve.call_count, - 1, - "Metadata should be retrieved for current version" - " with bulk_retrieve", - ) - self.assertEqual( - mock_meta.retrieve.call_count, - 0, - "Metadata should be retrieved for each non-current" " version", - ) - - mock_idx.bulk_add_documents.assert_called_once_with( - [mock_doc_3, mock_doc_1, mock_doc_2, mock_doc_3] - ) - - -class TestAddToIndex(TestCase): - """Add a search document to the index.""" - - def setUp(self): - """Initialize a :class:`.MetadataRecordProcessor`.""" - self.checkpointer = mock.MagicMock() - self.args = ( - "foo", - "1", - "a1b2c3d4", - "qwertyuiop", - "us-east-1", - self.checkpointer, - ) - - @mock.patch("boto3.client") - @mock.patch("search.agent.consumer.index.SearchSession") - def test_add_document_succeeds(self, mock_index, mock_client_factory): - """The search document is added successfully.""" - mock_client = mock.MagicMock() - mock_waiter = mock.MagicMock() - mock_client.get_waiter.return_value = mock_waiter - mock_client_factory.return_value = mock_client - processor = consumer.MetadataRecordProcessor(*self.args) - try: - processor._add_to_index(Document()) - except Exception as ex: - self.fail(ex) - mock_index.add_document.assert_called_once() - - @mock.patch("boto3.client") - @mock.patch("search.agent.consumer.index.SearchSession") - def test_index_raises_index_connection_error( - self, mock_index, mock_client_factory - ): - """The index raises :class:`.index.IndexConnectionError`.""" - mock_client = mock.MagicMock() - mock_waiter = mock.MagicMock() - mock_client.get_waiter.return_value = mock_waiter - mock_client_factory.return_value = mock_client - processor = consumer.MetadataRecordProcessor(*self.args) - - mock_index.add_document.side_effect = index.IndexConnectionError - with self.assertRaises(consumer.IndexingFailed): - processor._add_to_index(Document()) - - @mock.patch("boto3.client") - @mock.patch("search.agent.consumer.index.SearchSession") - def test_index_raises_unhandled_error( - self, mock_index, mock_client_factory - ): - """The index raises an unhandled exception.""" - mock_client = mock.MagicMock() - mock_waiter = mock.MagicMock() - mock_client.get_waiter.return_value = mock_waiter - mock_client_factory.return_value = mock_client - processor = consumer.MetadataRecordProcessor(*self.args) - - mock_index.add_document.side_effect = RuntimeError - with self.assertRaises(consumer.IndexingFailed): - processor._add_to_index(Document()) - - -class TestBulkAddToIndex(TestCase): - """Add multiple search documents to the index in bulk.""" - - def setUp(self): - """Initialize a :class:`.MetadataRecordProcessor`.""" - self.checkpointer = mock.MagicMock() - self.args = ( - "foo", - "1", - "a1b2c3d4", - "qwertyuiop", - "us-east-1", - self.checkpointer, - ) - - @mock.patch("boto3.client") - @mock.patch("search.agent.consumer.index.SearchSession") - def test_bulk_add_documents_succeeds( - self, mock_index, mock_client_factory - ): - """The search document is added successfully.""" - mock_client = mock.MagicMock() - mock_waiter = mock.MagicMock() - mock_client.get_waiter.return_value = mock_waiter - mock_client_factory.return_value = mock_client - processor = consumer.MetadataRecordProcessor(*self.args) - try: - processor._bulk_add_to_index([Document()]) - except Exception as ex: - self.fail(ex) - mock_index.bulk_add_documents.assert_called_once() - - @mock.patch("boto3.client") - @mock.patch("search.agent.consumer.index.SearchSession") - def test_index_raises_index_connection_error( - self, mock_index, mock_client_factory - ): - """The index raises :class:`.index.IndexConnectionError`.""" - mock_client = mock.MagicMock() - mock_waiter = mock.MagicMock() - mock_client.get_waiter.return_value = mock_waiter - mock_client_factory.return_value = mock_client - processor = consumer.MetadataRecordProcessor(*self.args) - - mock_index.bulk_add_documents.side_effect = index.IndexConnectionError - with self.assertRaises(consumer.IndexingFailed): - processor._bulk_add_to_index([Document()]) - - @mock.patch("boto3.client") - @mock.patch("search.agent.consumer.index.SearchSession") - def test_index_raises_unhandled_error( - self, mock_index, mock_client_factory - ): - """The index raises an unhandled exception.""" - mock_client = mock.MagicMock() - mock_waiter = mock.MagicMock() - mock_client.get_waiter.return_value = mock_waiter - mock_client_factory.return_value = mock_client - processor = consumer.MetadataRecordProcessor(*self.args) - - mock_index.bulk_add_documents.side_effect = RuntimeError - with self.assertRaises(consumer.IndexingFailed): - processor._bulk_add_to_index([Document()]) - - -class TestTransformToDocument(TestCase): - """Transform metadata into a search document.""" - - def setUp(self): - """Initialize a :class:`.MetadataRecordProcessor`.""" - self.checkpointer = mock.MagicMock() - self.args = ( - "foo", - "1", - "a1b2c3d4", - "qwertyuiop", - "us-east-1", - self.checkpointer, - ) - - @mock.patch("boto3.client") - @mock.patch("search.agent.consumer.transform") - def test_transform_raises_exception( - self, mock_transform, mock_client_factory - ): - """The transform module raises an exception.""" - mock_client = mock.MagicMock() - mock_waiter = mock.MagicMock() - mock_client.get_waiter.return_value = mock_waiter - mock_client_factory.return_value = mock_client - processor = consumer.MetadataRecordProcessor(*self.args) - - mock_transform.to_search_document.side_effect = RuntimeError - with self.assertRaises(consumer.DocumentFailed): - processor._transform_to_document(DocMeta()) - - -class TestGetMetadata(TestCase): - """Retrieve metadata for an arXiv e-print.""" - - def setUp(self): - """Initialize a :class:`.MetadataRecordProcessor`.""" - self.checkpointer = mock.MagicMock() - self.args = ( - "foo", - "1", - "a1b2c3d4", - "qwertyuiop", - "us-east-1", - self.checkpointer, - ) - - @mock.patch("boto3.client") - @mock.patch("search.agent.consumer.metadata") - def test_metadata_service_returns_metadata( - self, mock_metadata, mock_client_factory - ): - """The metadata service returns valid metadata.""" - mock_client = mock.MagicMock() - mock_waiter = mock.MagicMock() - mock_client.get_waiter.return_value = mock_waiter - mock_client_factory.return_value = mock_client - processor = consumer.MetadataRecordProcessor(*self.args) - - docmeta = DocMeta() - mock_metadata.retrieve.return_value = docmeta - self.assertEqual( - docmeta, - processor._get_metadata("1234.5678"), - "The metadata is returned.", - ) - - @mock.patch("boto3.client") - @mock.patch("search.agent.consumer.metadata") - def test_metadata_service_raises_connection_error( - self, mock_metadata, mock_client_factory - ): - """The metadata service raises :class:`.metadata.ConnectionFailed`.""" - mock_metadata.RequestFailed = metadata.RequestFailed - mock_metadata.ConnectionFailed = metadata.ConnectionFailed - - mock_client = mock.MagicMock() - mock_waiter = mock.MagicMock() - mock_client.get_waiter.return_value = mock_waiter - mock_client_factory.return_value = mock_client - processor = consumer.MetadataRecordProcessor(*self.args) - - mock_metadata.retrieve.side_effect = metadata.ConnectionFailed - with self.assertRaises(consumer.IndexingFailed): - processor._get_metadata("1234.5678") - - @mock.patch("boto3.client") - @mock.patch("search.agent.consumer.metadata") - def test_metadata_service_raises_request_error( - self, mock_metadata, mock_client_factory - ): - """The metadata service raises :class:`.metadata.RequestFailed`.""" - mock_metadata.RequestFailed = metadata.RequestFailed - mock_metadata.ConnectionFailed = metadata.ConnectionFailed - - mock_client = mock.MagicMock() - mock_waiter = mock.MagicMock() - mock_client.get_waiter.return_value = mock_waiter - mock_client_factory.return_value = mock_client - - processor = consumer.MetadataRecordProcessor(*self.args) - - mock_metadata.retrieve.side_effect = metadata.RequestFailed - with self.assertRaises(consumer.DocumentFailed): - processor._get_metadata("1234.5678") - - @mock.patch("boto3.client") - @mock.patch("search.agent.consumer.metadata") - def test_metadata_service_raises_bad_response( - self, mock_metadata, mock_client_factory - ): - """The metadata service raises :class:`.metadata.BadResponse`.""" - mock_metadata.RequestFailed = metadata.RequestFailed - mock_metadata.ConnectionFailed = metadata.ConnectionFailed - mock_metadata.BadResponse = metadata.BadResponse - - mock_client = mock.MagicMock() - mock_waiter = mock.MagicMock() - mock_client.get_waiter.return_value = mock_waiter - mock_client_factory.return_value = mock_client - processor = consumer.MetadataRecordProcessor(*self.args) - - mock_metadata.retrieve.side_effect = metadata.BadResponse - with self.assertRaises(consumer.DocumentFailed): - processor._get_metadata("1234.5678") diff --git a/search/controllers/__init__.py b/search/controllers/__init__.py index 206d9a1a..b7ea7d62 100644 --- a/search/controllers/__init__.py +++ b/search/controllers/__init__.py @@ -30,7 +30,7 @@ def health_check() -> Tuple[str, int, Dict[str, Any]]: """ # We don't handle any exceptions here because we want the framework # exception handling to take care of it and log them. - document_set = index.SearchSession.search( # type: ignore + document_set = index.SearchSession.current_session().search( # type: ignore SimpleQuery(search_field="all", value="theory") ) if document_set["results"]: diff --git a/search/controllers/advanced/__init__.py b/search/controllers/advanced/__init__.py index 1026ea68..1a4c92ab 100644 --- a/search/controllers/advanced/__init__.py +++ b/search/controllers/advanced/__init__.py @@ -124,7 +124,7 @@ def search(request_params: MultiDict) -> Response: # Execute the search. We'll use the results directly in # template rendering, so they get added directly to the # response content. asdict( - response_data.update(SearchSession.search(q)) # type: ignore + response_data.update(SearchSession.current_session().search(q)) except index.IndexConnectionError as ex: raise BadGateway( "There was a problem connecting to the search index. This " @@ -171,7 +171,9 @@ def search(request_params: MultiDict) -> Response: # example, we can generate new form-friendly requests to update sort # order and page size by embedding the form (hidden). response_data["form"] = form - return response_data, HTTPStatus.OK, {} + headers={} + headers["Surrogate-Control"]="max-age=600" + return response_data, HTTPStatus.OK, headers def _query_from_form(form: forms.AdvancedSearchForm) -> AdvancedQuery: diff --git a/search/controllers/api/__init__.py b/search/controllers/api/__init__.py index e53faaba..6aaeb47e 100644 --- a/search/controllers/api/__init__.py +++ b/search/controllers/api/__init__.py @@ -106,9 +106,7 @@ def search(params: MultiDict) -> Tuple[Dict[str, Any], int, Dict[str, Any]]: q.include_fields += include_fields q = paginate(q, params) # type: ignore - document_set = index.SearchSession.search( # type: ignore - q, highlight=False - ) + document_set = index.SearchSession.current_session().search(q, highlight=False) document_set["metadata"]["query"] = query_terms logger.debug( "Got document set with %i results", len(document_set["results"]) @@ -140,10 +138,9 @@ def paper(paper_id: str) -> Tuple[Dict[str, Any], int, Dict[str, Any]]: Raised when there is no document with the provided paper ID. """ + document = index.SearchSession.current_session().get_document(paper_id) try: - document = index.SearchSession.current_session().get_document( - paper_id - ) # type: ignore + pass except index.DocumentNotFound as ex: logger.error("Document not found") raise NotFound("No such document") from ex diff --git a/search/controllers/classic_api/__init__.py b/search/controllers/classic_api/__init__.py index 9e18ab2d..58d6876f 100644 --- a/search/controllers/classic_api/__init__.py +++ b/search/controllers/classic_api/__init__.py @@ -193,7 +193,7 @@ def paper( try: document = index.SearchSession.current_session().get_document( paper_id - ) # type: ignore + ) except index.DocumentNotFound as ex: logger.error("Document not found") raise NotFound("No such document") from ex @@ -201,4 +201,4 @@ def paper( ClassicSearchResponseData(results=document), # type: ignore HTTPStatus.OK, {}, - ) # type: ignore + ) diff --git a/search/controllers/simple/__init__.py b/search/controllers/simple/__init__.py index aa658cff..83fd3d54 100644 --- a/search/controllers/simple/__init__.py +++ b/search/controllers/simple/__init__.py @@ -146,7 +146,7 @@ def search( # Execute the search. We'll use the results directly in # template rendering, so they get added directly to the # response content.asdict - response_data.update(SearchSession.search(q)) # type: ignore + response_data.update(SearchSession.current_session().search(q)) except index.IndexConnectionError as ex: raise BadGateway( "There was a problem connecting to the search index. This is " @@ -185,7 +185,9 @@ def search( q = None response_data["query"] = q response_data["form"] = form - return response_data, HTTPStatus.OK, {} + headers={} + headers["Surrogate-Control"]="max-age=600" + return response_data, HTTPStatus.OK, headers def retrieve_document(document_id: str) -> Response: @@ -215,7 +217,7 @@ def retrieve_document(document_id: str) -> Response: """ try: - result = SearchSession.get_document(document_id) # type: ignore + result = SearchSession.current_session().get_document(document_id) except index.IndexConnectionError as ex: raise BadGateway( "There was a problem connecting to the search index. This is " diff --git a/search/controllers/tests.py b/search/controllers/tests.py index cb9caaf1..1630ca6a 100644 --- a/search/controllers/tests.py +++ b/search/controllers/tests.py @@ -14,20 +14,16 @@ class TestHealthCheck(TestCase): def test_index_is_down(self, mock_index): """Test returns 'DOWN' + status 500 when index raises an exception.""" mock_index.search.side_effect = RuntimeError - response, status_code, _ = health_check() - self.assertEqual(response, "DOWN", "Response content should be DOWN") - self.assertEqual( - status_code, - HTTPStatus.INTERNAL_SERVER_ERROR, - "Should return 500 status code.", - ) + + with self.assertRaises(RuntimeError): + response, status_code, _ = health_check() @mock.patch("search.controllers.index.SearchSession") def test_index_returns_no_result(self, mock_index): """Test returns 'DOWN' + status 500 when index returns no results.""" mock_index.search.return_value = {"metadata": {}, "results": []} response, status_code, _ = health_check() - self.assertEqual(response, "DOWN", "Response content should be DOWN") + self.assertEqual(response, "DOWN: document_set lacked results", "Response content should be DOWN: document_set lacked results") self.assertEqual( status_code, HTTPStatus.INTERNAL_SERVER_ERROR, diff --git a/search/factory.py b/search/factory.py index a639a793..c656e348 100644 --- a/search/factory.py +++ b/search/factory.py @@ -49,7 +49,8 @@ def create_ui_web_app() -> Flask: for filter_name, template_filter in filters.filters: app.template_filter(filter_name)(template_filter) - index_startup_check(app) + if app.config["TESTING"]: + index_startup_check(app) return app @@ -131,7 +132,7 @@ def index_startup_check(app): raise ex try: - document_set = index.SearchSession.search( # type: ignore + document_set = index.SearchSession.current_session().search( # type: ignore SimpleQuery(search_field="all", value="theory") ) if document_set["results"]: diff --git a/search/services/index/__init__.py b/search/services/index/__init__.py index 21a1b4bc..16c99b57 100644 --- a/search/services/index/__init__.py +++ b/search/services/index/__init__.py @@ -54,7 +54,6 @@ def search2(self: Any, index: Any=None, doc_type: Any=None, body: Any=None, para import logging -from arxiv.integration.meta import MetaIntegration from search.context import get_application_config, get_application_global from search.domain import ( Document, @@ -146,7 +145,7 @@ def handle_es_exceptions() -> Generator: raise -class SearchSession(metaclass=MetaIntegration): +class SearchSession(): """Encapsulates session with Elasticsearch host.""" def __init__( diff --git a/search/services/index/tests/test_reindex.py b/search/services/index/tests/test_reindex.py index 54c67646..cc6cdd49 100644 --- a/search/services/index/tests/test_reindex.py +++ b/search/services/index/tests/test_reindex.py @@ -18,7 +18,7 @@ def test_reindex_from_scratch(self, mock_Elasticsearch): """Reindex to an index that does not exist.""" mock_es = mock.MagicMock() mock_Elasticsearch.return_value = mock_es - index.SearchSession.reindex("barindex", "bazindex") + index.SearchSession.current_session().reindex("barindex", "bazindex") self.assertEqual( mock_es.indices.create.call_count, 1, @@ -48,7 +48,7 @@ def test_reindex_already_exists(self, mock_Elasticsearch): mock_es = mock.MagicMock() mock_Elasticsearch.return_value = mock_es mock_es.indices.create.side_effect = raise_index_exists - index.SearchSession.reindex("barindex", "bazindex") + index.SearchSession.current_session().reindex("barindex", "bazindex") self.assertEqual( mock_es.indices.create.call_count, 1, @@ -83,7 +83,7 @@ def test_get_task_status(self, mock_Elasticsearch): mock_Elasticsearch.return_value = mock_es task_id = "foonode:bartask" - index.SearchSession.get_task_status(task_id) + index.SearchSession.current_session().get_task_status(task_id) self.assertEqual( mock_es.tasks.get.call_count, 1, diff --git a/search/services/index/tests/tests.py b/search/services/index/tests/tests.py index 22f0bf28..bbc86b7d 100644 --- a/search/services/index/tests/tests.py +++ b/search/services/index/tests/tests.py @@ -201,7 +201,7 @@ def test_advanced_query(self, mock_Elasticsearch, mock_Search): ), ) - document_set = index.SearchSession.search(query, highlight=True) + document_set = index.SearchSession.current_session().search(query, highlight=True) self.assertEqual(document_set["metadata"]["start"], 0) self.assertEqual(int(document_set["metadata"]["total_results"]), 53) self.assertEqual(int(document_set["metadata"]["current_page"]), 1) @@ -233,7 +233,7 @@ def test_simple_query(self, mock_Elasticsearch, mock_Search): query = SimpleQuery( order="relevance", size=10, search_field="title", value="foo title" ) - document_set = index.SearchSession.search(query, highlight=True) + document_set = index.SearchSession.current_session().search(query, highlight=True) # self.assertIsInstance(document_set, DocumentSet) self.assertEqual(document_set["metadata"]["start"], 0) self.assertEqual(document_set["metadata"]["total_results"], 53) @@ -269,7 +269,7 @@ def test_classic_query(self, mock_Elasticsearch, mock_Search): size=10, ) - document_set = index.SearchSession.search(query, highlight=True) + document_set = index.SearchSession.current_session().search(query, highlight=True) # self.assertIsInstance(document_set, DocumentSet) self.assertEqual(document_set["metadata"]["start"], 0) self.assertEqual(document_set["metadata"]["total_results"], 53) @@ -309,7 +309,7 @@ def test_classic_query_complex(self, mock_Elasticsearch, mock_Search): size=10, ) - document_set = index.SearchSession.search(query, highlight=True) + document_set = index.SearchSession.current_session().search(query, highlight=True) # self.assertIsInstance(document_set, DocumentSet) self.assertEqual(document_set["metadata"]["start"], 0) self.assertEqual(document_set["metadata"]["total_results"], 53) @@ -345,7 +345,7 @@ def test_classic_query_id_list(self, mock_Elasticsearch, mock_Search): size=10, ) - document_set = index.SearchSession.search(query, highlight=True) + document_set = index.SearchSession.current_session().search(query, highlight=True) # self.assertIsInstance(document_set, DocumentSet) self.assertEqual(document_set["metadata"]["start"], 0) self.assertEqual(document_set["metadata"]["total_results"], 53) @@ -385,7 +385,7 @@ def test_classic_query_phrases(self, mock_Elasticsearch, mock_Search): size=10, ) - document_set = index.SearchSession.search(query, highlight=True) + document_set = index.SearchSession.current_session().search(query, highlight=True) # self.assertIsInstance(document_set, DocumentSet) self.assertEqual(document_set["metadata"]["start"], 0) self.assertEqual(document_set["metadata"]["total_results"], 53) diff --git a/search/templates/search/base.html b/search/templates/search/base.html index dd3cbf9c..01df75d8 100644 --- a/search/templates/search/base.html +++ b/search/templates/search/base.html @@ -24,7 +24,6 @@