Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3321/load attachment #3467

Merged
merged 38 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
29533b4
func to fetch opp att from db and s3 and update json obj with attachm…
babebe Jan 8, 2025
59b8fbd
optional arg pipeline when calling open search bulk method
babebe Jan 8, 2025
23adf26
add feature flag
babebe Jan 8, 2025
7296739
fmt lnt
babebe Jan 8, 2025
c8677d3
Merge branch 'main' of https://github.com/HHS/simpler-grants-gov into…
babebe Jan 8, 2025
e904b53
filter for att type
babebe Jan 9, 2025
786b1ee
Merge branch 'main' of https://github.com/HHS/simpler-grants-gov into…
babebe Jan 9, 2025
5ff2c46
fmt lint
babebe Jan 9, 2025
6f36593
move code
babebe Jan 9, 2025
9bc3731
move code
babebe Jan 9, 2025
c3703bb
cln up
babebe Jan 9, 2025
7be0453
Merge branch 'main' of https://github.com/HHS/simpler-grants-gov into…
babebe Jan 9, 2025
7db73ff
update upload test file to take in optional argument bucket_name and …
babebe Jan 13, 2025
06c303d
dont fileter
babebe Jan 13, 2025
954a6d9
rm pipeline arg
babebe Jan 13, 2025
4550a47
update call to fixiture
babebe Jan 13, 2025
6330352
modify test to , create a test local-opportinities bucket
babebe Jan 13, 2025
a75d049
fmt
babebe Jan 13, 2025
fc82b68
Merge branch 'main' of https://github.com/HHS/simpler-grants-gov into…
babebe Jan 13, 2025
5d0575b
dont pass s3 clinet
babebe Jan 13, 2025
3ef1033
update ff
babebe Jan 13, 2025
06c544e
cleanup
babebe Jan 14, 2025
5a9bf2e
Merge branch 'main' of https://github.com/HHS/simpler-grants-gov into…
babebe Jan 14, 2025
758d52c
rm api ff add ff to LoadOpportunitiesToIndexConfig
babebe Jan 14, 2025
6f10baf
revert changes to existing feature
babebe Jan 14, 2025
0f3f99e
updates test with no attachment
babebe Jan 14, 2025
59aec6a
cleanup
babebe Jan 14, 2025
4bfefca
Merge branch 'main' of https://github.com/HHS/simpler-grants-gov into…
babebe Jan 14, 2025
afb4db3
Merge branch 'main' of https://github.com/HHS/simpler-grants-gov into…
babebe Jan 15, 2025
7fd9cb0
Create ERD diagram and Update OpenAPI spec
nava-platform-bot Jan 15, 2025
9dedd6e
specify pipeline on bulk req
babebe Jan 15, 2025
427e01d
Merge branch '3321/load-attachment' of https://github.com/HHS/simpler…
babebe Jan 15, 2025
05f28a2
add optional pipeline arg to bulk_upset
babebe Jan 15, 2025
12b2689
merge
babebe Jan 15, 2025
1f0ade6
cleanup
babebe Jan 15, 2025
7732f0b
Merge branch '3321/load-attachment' of https://github.com/HHS/simpler…
babebe Jan 15, 2025
0ca5eef
f8x
babebe Jan 16, 2025
45b364f
Create ERD diagram and Update OpenAPI spec
nava-platform-bot Jan 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/local.env
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ IS_LOCAL_AWS=1
# Feature Flags
############################
ENABLE_OPPORTUNITY_LOG_MSG=false
ENABLE_OPPORTUNITY_ATTACHMENT_PIPELINE=true

############################
# Endpoint Configuration
Expand Down
2 changes: 1 addition & 1 deletion api/openapi.generated.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2092,7 +2092,7 @@ components:
format: uuid
description: The ID of the saved search
example: !!python/object:uuid.UUID
int: 82637552140693101888240202082641616217
int: 223701124228644688372276000513924728179
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have a fix for this shortly so it stops popping up on PRs

name:
type: string
description: Name of the saved search
Expand Down
43 changes: 40 additions & 3 deletions api/src/search/backend/load_opportunities_to_index.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import base64
import logging
from enum import StrEnum
from typing import Iterator, Sequence
from typing import Iterator, List, Sequence

import smart_open
from opensearchpy.exceptions import ConnectionTimeout, TransportError
from pydantic import Field
from pydantic_settings import SettingsConfigDict
Expand All @@ -16,6 +18,7 @@
from src.db.models.opportunity_models import (
CurrentOpportunitySummary,
Opportunity,
OpportunityAttachment,
OpportunitySearchIndexQueue,
)
from src.task.task import Task
Expand All @@ -36,6 +39,10 @@ class LoadOpportunitiesToIndexConfig(PydanticBaseEnvConfig):
alias_name: str = Field(default="opportunity-index-alias") # LOAD_OPP_SEARCH_ALIAS_NAME
index_prefix: str = Field(default="opportunity-index") # LOAD_OPP_INDEX_PREFIX

enable_opportunity_attachment_pipeline: bool = Field(
default=False, alias="ENABLE_OPPORTUNITY_ATTACHMENT_PIPELINE"
)


class LoadOpportunitiesToIndex(Task):
class Metrics(StrEnum):
Expand All @@ -53,7 +60,6 @@ def __init__(

self.search_client = search_client
self.is_full_refresh = is_full_refresh

if config is None:
config = LoadOpportunitiesToIndexConfig()
self.config = config
Expand Down Expand Up @@ -269,6 +275,31 @@ def fetch_existing_opportunity_ids_in_index(self) -> set[int]:

return opportunity_ids

def filter_attachments(
self, attachments: List[OpportunityAttachment], filters: list
) -> List[OpportunityAttachment]:
return [attachment for attachment in attachments if attachment.mime_type in filters]
babebe marked this conversation as resolved.
Show resolved Hide resolved

def get_attachment_json_for_opportunity(
self, opp_attachments: List[OpportunityAttachment]
) -> list[dict]:
babebe marked this conversation as resolved.
Show resolved Hide resolved

attachments = []
for att in opp_attachments:
with smart_open.open(
att.file_location,
"rb",
) as file:
file_content = file.read()
attachments.append(
{
"filename": att.file_name,
"data": base64.b64encode(file_content).decode("utf-8"),
}
)

return attachments

@retry(
stop=stop_after_attempt(3), # Retry up to 3 times
wait=wait_fixed(2), # Wait 2 seconds between retries
Expand Down Expand Up @@ -300,7 +331,13 @@ def load_records(self, records: Sequence[Opportunity]) -> set[int]:
self.increment(self.Metrics.TEST_RECORDS_SKIPPED)
continue

json_records.append(schema.dump(record))
json_record = schema.dump(record)
if self.config.enable_opportunity_attachment_pipeline:
json_record["attachments"] = self.get_attachment_json_for_opportunity(
record.opportunity_attachments
)

json_records.append(json_record)
self.increment(self.Metrics.RECORDS_LOADED)

loaded_opportunity_ids.add(record.opportunity_id)
Expand Down
4 changes: 3 additions & 1 deletion api/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ def upload_opportunity_attachment_s3(reset_aws_env_vars, mock_s3_bucket):
for file in files:
file_path = os.path.join(root, file)
s3_client.upload_file(
file_path, Bucket=mock_s3_bucket, Key=os.path.relpath(file_path, test_folder_path)
file_path,
Bucket=mock_s3_bucket,
Key=os.path.relpath(file_path, test_folder_path),
)

# Check files were uploaded to mock s3
Expand Down
122 changes: 104 additions & 18 deletions api/tests/src/search/backend/test_load_opportunities_to_index.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import base64
import itertools

import pytest
Expand All @@ -7,10 +8,12 @@
LoadOpportunitiesToIndex,
LoadOpportunitiesToIndexConfig,
)
from src.util import file_util
from src.util.datetime_util import get_now_us_eastern_datetime
from tests.conftest import BaseTestClass
from tests.src.db.models.factories import (
AgencyFactory,
OpportunityAttachmentFactory,
OpportunityFactory,
OpportunitySearchIndexQueueFactory,
)
Expand Down Expand Up @@ -39,26 +42,44 @@ def test_load_opportunities_to_index(
opportunities = []
opportunities.extend(
OpportunityFactory.create_batch(
size=6, is_posted_summary=True, agency_code=agency.agency_code
size=6,
is_posted_summary=True,
agency_code=agency.agency_code,
opportunity_attachments=[],
)
)
opportunities.extend(OpportunityFactory.create_batch(size=3, is_forecasted_summary=True))
opportunities.extend(OpportunityFactory.create_batch(size=2, is_closed_summary=True))
opportunities.extend(
OpportunityFactory.create_batch(size=8, is_archived_non_forecast_summary=True)
OpportunityFactory.create_batch(
size=3, is_forecasted_summary=True, opportunity_attachments=[]
)
)
opportunities.extend(
OpportunityFactory.create_batch(
size=6, is_archived_forecast_summary=True, agency_code=agency.agency_code
size=2, is_closed_summary=True, opportunity_attachments=[]
)
)
opportunities.extend(
OpportunityFactory.create_batch(
size=8, is_archived_non_forecast_summary=True, opportunity_attachments=[]
)
)
opportunities.extend(
OpportunityFactory.create_batch(
size=6,
is_archived_forecast_summary=True,
agency_code=agency.agency_code,
opportunity_attachments=[],
)
)

# Create some opportunities that won't get fetched / loaded into search
OpportunityFactory.create_batch(size=3, is_draft=True)
OpportunityFactory.create_batch(size=4, no_current_summary=True)
OpportunityFactory.create_batch(size=3, is_draft=True, opportunity_attachments=[])
OpportunityFactory.create_batch(size=4, no_current_summary=True, opportunity_attachments=[])

AgencyFactory.create(agency_code="MY-TEST-AGENCY", is_test_agency=True)
OpportunityFactory.create_batch(size=3, agency_code="MY-TEST-AGENCY")
OpportunityFactory.create_batch(
size=3, agency_code="MY-TEST-AGENCY", opportunity_attachments=[]
)

for opportunity in opportunities:
OpportunitySearchIndexQueueFactory.create(
Expand Down Expand Up @@ -105,7 +126,7 @@ def test_load_opportunities_to_index(
)

# Rerunning but first add a few more opportunities to show up
opportunities.extend(OpportunityFactory.create_batch(size=3))
opportunities.extend(OpportunityFactory.create_batch(size=3, opportunity_attachments=[]))
load_opportunities_to_index.index_name = (
load_opportunities_to_index.index_name + "-new-data"
)
Expand All @@ -119,6 +140,49 @@ def test_load_opportunities_to_index(
[record["opportunity_id"] for record in resp.records]
)

def test_opportunity_attachment_pipeline(
self,
mock_s3_bucket,
db_session,
enable_factory_create,
load_opportunities_to_index,
monkeypatch: pytest.MonkeyPatch,
opportunity_index_alias,
search_client,
):
filename = "test_file_1.txt"
file_path = f"s3://{mock_s3_bucket}/{filename}"
content = "I am a file"
with file_util.open_stream(file_path, "w") as outfile:
outfile.write(content)

opportunity = OpportunityFactory.create(opportunity_attachments=[])
OpportunityAttachmentFactory.create(
mime_type="text/plain",
opportunity=opportunity,
file_location=file_path,
file_name=filename,
)

load_opportunities_to_index.index_name = (
load_opportunities_to_index.index_name + "-pipeline"
)

load_opportunities_to_index.run()

resp = search_client.search(opportunity_index_alias, {"size": 100})

record = [d for d in resp.records if d.get("opportunity_id") == opportunity.opportunity_id]
attachment = record[0]["attachments"][0]

# assert correct attachment was uploaded
assert attachment["filename"] == filename

# assert content of file was b64encoded
decoded = base64.b64decode(attachment["data"])

assert decoded.decode("utf-8") == content
babebe marked this conversation as resolved.
Show resolved Hide resolved


class TestLoadOpportunitiesToIndexPartialRefresh(BaseTestClass):
@pytest.fixture(scope="class")
Expand Down Expand Up @@ -148,18 +212,36 @@ def test_load_opportunities_to_index(

# Load a bunch of records into the DB
opportunities = []
opportunities.extend(OpportunityFactory.create_batch(size=6, is_posted_summary=True))
opportunities.extend(OpportunityFactory.create_batch(size=3, is_forecasted_summary=True))
opportunities.extend(OpportunityFactory.create_batch(size=2, is_closed_summary=True))
opportunities.extend(
OpportunityFactory.create_batch(size=8, is_archived_non_forecast_summary=True)
OpportunityFactory.create_batch(
size=6, is_posted_summary=True, opportunity_attachments=[]
)
)
opportunities.extend(
OpportunityFactory.create_batch(
size=3, is_forecasted_summary=True, opportunity_attachments=[]
)
)
opportunities.extend(
OpportunityFactory.create_batch(
size=2, is_closed_summary=True, opportunity_attachments=[]
)
)
opportunities.extend(
OpportunityFactory.create_batch(size=6, is_archived_forecast_summary=True)
OpportunityFactory.create_batch(
size=8, is_archived_non_forecast_summary=True, opportunity_attachments=[]
)
)
opportunities.extend(
OpportunityFactory.create_batch(
size=6, is_archived_forecast_summary=True, opportunity_attachments=[]
)
)

AgencyFactory.create(agency_code="MY-TEST-AGENCY-123", is_test_agency=True)
test_opps = OpportunityFactory.create_batch(size=2, agency_code="MY-TEST-AGENCY-123")
test_opps = OpportunityFactory.create_batch(
size=2, agency_code="MY-TEST-AGENCY-123", opportunity_attachments=[]
)

for opportunity in itertools.chain(opportunities, test_opps):
OpportunitySearchIndexQueueFactory.create(
Expand All @@ -179,7 +261,11 @@ def test_load_opportunities_to_index(
] == len(test_opps)

# Add a few more opportunities that will be created
opportunities.extend(OpportunityFactory.create_batch(size=3, is_posted_summary=True))
opportunities.extend(
OpportunityFactory.create_batch(
size=3, is_posted_summary=True, opportunity_attachments=[]
)
)

# Delete some opportunities
opportunities_to_delete = [opportunities.pop(), opportunities.pop(), opportunities.pop()]
Expand Down Expand Up @@ -221,7 +307,7 @@ def test_load_opportunities_to_index_index_does_not_exist(self, db_session, sear

def test_new_opportunity_gets_indexed(self, db_session, load_opportunities_to_index):
"""Test that a new opportunity in the queue gets indexed"""
test_opportunity = OpportunityFactory.create()
test_opportunity = OpportunityFactory.create(opportunity_attachments=[])

# Add to queue
OpportunitySearchIndexQueueFactory.create(opportunity=test_opportunity)
Expand All @@ -234,7 +320,7 @@ def test_new_opportunity_gets_indexed(self, db_session, load_opportunities_to_in

def test_draft_opportunity_not_indexed(self, db_session, load_opportunities_to_index):
"""Test that draft opportunities are not indexed"""
test_opportunity = OpportunityFactory.create(is_draft=True)
test_opportunity = OpportunityFactory.create(is_draft=True, opportunity_attachments=[])

# Add to queue
OpportunitySearchIndexQueueFactory.create(opportunity=test_opportunity)
Expand Down