Skip to content

Commit

Permalink
Merge branch 'main' into 4845-pray-and-pay-revisions
Browse files Browse the repository at this point in the history
  • Loading branch information
v-anne authored Feb 3, 2025
2 parents 8e8b959 + 69c78e9 commit ae9d640
Show file tree
Hide file tree
Showing 2 changed files with 430 additions and 71 deletions.
208 changes: 139 additions & 69 deletions cl/recap/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from datetime import datetime
from http import HTTPStatus
from multiprocessing import process
from typing import List, Optional, Tuple
from typing import Optional, Tuple
from zipfile import ZipFile

import requests
Expand Down Expand Up @@ -171,7 +171,10 @@ def do_pacer_fetch(fq: PacerFetchQueue):
mark_fq_successful.si(fq.pk),
).apply_async()
elif fq.request_type == REQUEST_TYPE.ATTACHMENT_PAGE:
result = fetch_attachment_page.apply_async(args=(fq.pk,))
result = chain(
fetch_attachment_page.si(fq.pk),
replicate_fq_att_page_to_subdocket_rds.s(),
).apply_async()
return result


Expand Down Expand Up @@ -738,25 +741,27 @@ async def find_subdocket_att_page_rds(
original_file_content = text.encode("utf-8")
original_file_name = pq.filepath_local.name

@sync_to_async
def save_pq_instances():
with transaction.atomic():
for main_rd in main_rds:
main_pacer_case_id = main_rd.docket_entry.docket.pacer_case_id
# Create additional pqs for each subdocket case found.
pq_created = ProcessingQueue.objects.create(
uploader_id=pq.uploader_id,
pacer_doc_id=pacer_doc_id,
pacer_case_id=main_pacer_case_id,
court_id=pq.court_id,
upload_type=UPLOAD_TYPE.ATTACHMENT_PAGE,
filepath_local=ContentFile(
original_file_content, name=original_file_name
),
)
pqs_to_process_pks.append(pq_created.pk)
pqs_to_create = []
async for main_rd in main_rds:
main_pacer_case_id = main_rd.docket_entry.docket.pacer_case_id
# Create additional pqs for each subdocket case found.
pqs_to_create.append(
ProcessingQueue(
uploader_id=pq.uploader_id,
pacer_doc_id=pacer_doc_id,
pacer_case_id=main_pacer_case_id,
court_id=pq.court_id,
upload_type=UPLOAD_TYPE.ATTACHMENT_PAGE,
filepath_local=ContentFile(
original_file_content, name=original_file_name
),
)
)

if pqs_to_create:
pqs_created = await ProcessingQueue.objects.abulk_create(pqs_to_create)
pqs_to_process_pks.extend([pq.pk for pq in pqs_created])

await save_pq_instances()
return pqs_to_process_pks


Expand Down Expand Up @@ -789,37 +794,38 @@ async def find_subdocket_pdf_rds(

pdf_binary_content = pq.filepath_local.read()

@sync_to_async
def save_pq_instances():
with transaction.atomic():
for i, main_rd in enumerate(main_rds):
if i == 0 and not pq.pacer_case_id:
# If the original PQ does not have a pacer_case_id,
# assign it a pacer_case_id from one of the matched RDs
# to ensure the RD lookup in process_recap_pdf succeeds.
pq.pacer_case_id = (
main_rd.docket_entry.docket.pacer_case_id
)
pq.save()
continue
pqs_to_create = []
main_rds = [rd async for rd in main_rds]
for i, main_rd in enumerate(main_rds):
if i == 0 and not pq.pacer_case_id:
# If the original PQ does not have a pacer_case_id,
# assign it a pacer_case_id from one of the matched RDs
# to ensure the RD lookup in process_recap_pdf succeeds.
pq.pacer_case_id = main_rd.docket_entry.docket.pacer_case_id
await pq.asave()
continue

main_pacer_case_id = main_rd.docket_entry.docket.pacer_case_id
# Create additional pqs for each subdocket case found.
pq_created = ProcessingQueue.objects.create(
uploader_id=pq.uploader_id,
pacer_doc_id=pq.pacer_doc_id,
pacer_case_id=main_pacer_case_id,
document_number=pq.document_number,
attachment_number=pq.attachment_number,
court_id=pq.court_id,
upload_type=UPLOAD_TYPE.PDF,
filepath_local=ContentFile(
pdf_binary_content, name=pq.filepath_local.name
),
)
pqs_to_process_pks.append(pq_created.pk)
main_pacer_case_id = main_rd.docket_entry.docket.pacer_case_id
# Create additional pqs for each subdocket case found.
pqs_to_create.append(
ProcessingQueue(
uploader_id=pq.uploader_id,
pacer_doc_id=pq.pacer_doc_id,
pacer_case_id=main_pacer_case_id,
document_number=pq.document_number,
attachment_number=pq.attachment_number,
court_id=pq.court_id,
upload_type=UPLOAD_TYPE.PDF,
filepath_local=ContentFile(
pdf_binary_content, name=pq.filepath_local.name
),
)
)

if pqs_to_create:
pqs_created = await ProcessingQueue.objects.abulk_create(pqs_to_create)
pqs_to_process_pks.extend([pq.pk for pq in pqs_created])

await save_pq_instances()
return pqs_to_process_pks


Expand Down Expand Up @@ -1980,48 +1986,54 @@ def fetch_pacer_doc_by_rd(
ignore_result=True,
)
@transaction.atomic
def fetch_attachment_page(self: Task, fq_pk: int) -> None:
def fetch_attachment_page(self: Task, fq_pk: int) -> list[int]:
"""Fetch a PACER attachment page by rd_pk
This is very similar to process_recap_attachment, except that it manages
status as it proceeds and it gets the cookie info from redis.
:param self: The celery task
:param fq_pk: The PK of the RECAP Fetch Queue to update.
:return: None
:return: A list of PQ IDs that require replication to sub-dockets.
"""

fq = PacerFetchQueue.objects.get(pk=fq_pk)
rd = fq.recap_document
court_id = rd.docket_entry.docket.court_id
pacer_case_id = rd.docket_entry.docket.pacer_case_id
pacer_doc_id = rd.pacer_doc_id
# Check court connectivity, if fails retry the task, hopefully, it'll be
# retried in a different not blocked node
if not is_pacer_court_accessible(rd.docket_entry.docket.court_id):
if not is_pacer_court_accessible(court_id):
if self.request.retries == self.max_retries:
msg = f"Blocked by court: {rd.docket_entry.docket.court_id}"
msg = f"Blocked by court: {court_id}"
mark_fq_status(fq, msg, PROCESSING_STATUS.FAILED)
self.request.chain = None
return None
return []
raise self.retry()

mark_fq_status(fq, "", PROCESSING_STATUS.IN_PROGRESS)

if not rd.pacer_doc_id:
if not pacer_doc_id:
msg = (
"Unable to get attachment page: Unknown pacer_doc_id for "
"RECAP Document object %s" % rd.pk
)
mark_fq_status(fq, msg, PROCESSING_STATUS.NEEDS_INFO)
return
self.request.chain = None
return []

if rd.is_acms_document():
msg = "ACMS attachment pages are not currently supported"
mark_fq_status(fq, msg, PROCESSING_STATUS.FAILED)
return
self.request.chain = None
return []

session_data = get_pacer_cookie_from_cache(fq.user_id)
if not session_data:
msg = "Unable to find cached cookies. Aborting request."
mark_fq_status(fq, msg, PROCESSING_STATUS.FAILED)
return
self.request.chain = None
return []

try:
r = get_att_report_by_rd(rd, session_data)
Expand All @@ -2033,19 +2045,22 @@ def fetch_attachment_page(self: Task, fq_pk: int) -> None:
]:
if self.request.retries == self.max_retries:
mark_fq_status(fq, msg, PROCESSING_STATUS.FAILED)
return
self.request.chain = None
return []
logger.info(
f"Ran into HTTPError: {exc.response.status_code}. Retrying."
)
raise self.retry(exc=exc)
else:
mark_fq_status(fq, msg, PROCESSING_STATUS.FAILED)
return
self.request.chain = None
return []
except requests.RequestException as exc:
if self.request.retries == self.max_retries:
msg = "Failed to get attachment page from network."
mark_fq_status(fq, msg, PROCESSING_STATUS.FAILED)
return
self.request.chain = None
return []
logger.info("Ran into a RequestException. Retrying.")
raise self.retry(exc=exc)
except PacerLoginException as exc:
Expand All @@ -2054,32 +2069,33 @@ def fetch_attachment_page(self: Task, fq_pk: int) -> None:
mark_fq_status(fq, msg, PROCESSING_STATUS.FAILED)
delete_pacer_cookie_from_cache(fq.user_id)
self.request.chain = None
return None
return []
mark_fq_status(
fq, f"{msg} Retrying.", PROCESSING_STATUS.QUEUED_FOR_RETRY
)
raise self.retry(exc=exc)

text = r.response.text
is_appellate = is_appellate_court(rd.docket_entry.docket.court_id)
is_appellate = is_appellate_court(court_id)
# Determine the appropriate parser function based on court jurisdiction
# (appellate or district)
att_data_parser = (
get_data_from_appellate_att_report
if is_appellate
else get_data_from_att_report
)
att_data = att_data_parser(text, rd.docket_entry.docket.court_id)
att_data = att_data_parser(text, court_id)

if att_data == {}:
msg = "Not a valid attachment page upload"
mark_fq_status(fq, msg, PROCESSING_STATUS.INVALID_CONTENT)
return
self.request.chain = None
return []

try:
async_to_sync(merge_attachment_page_data)(
rd.docket_entry.docket.court,
rd.docket_entry.docket.pacer_case_id,
pacer_case_id,
att_data["pacer_doc_id"],
# Appellate attachments don't contain a document_number
None if is_appellate else att_data["document_number"],
Expand All @@ -2092,17 +2108,71 @@ def fetch_attachment_page(self: Task, fq_pk: int) -> None:
"attachment data"
)
mark_fq_status(fq, msg, PROCESSING_STATUS.FAILED)
return
self.request.chain = None
return []
except RECAPDocument.DoesNotExist as exc:
msg = "Could not find docket to associate with attachment metadata"
if self.request.retries == self.max_retries:
mark_fq_status(fq, msg, PROCESSING_STATUS.FAILED)
return
self.request.chain = None
return []
mark_fq_status(fq, msg, PROCESSING_STATUS.QUEUED_FOR_RETRY)
raise self.retry(exc=exc)
msg = "Successfully completed fetch and save."
mark_fq_status(fq, msg, PROCESSING_STATUS.SUCCESSFUL)

# Logic to replicate the attachment page to sub-dockets matched by RECAPDocument
if is_appellate_court(court_id):
# Subdocket replication for appellate courts is currently not supported.
self.request.chain = None
return []

sub_docket_main_rds = list(
get_main_rds(court_id, pacer_doc_id).exclude(
docket_entry__docket__pacer_case_id=pacer_case_id
)
)
sub_docket_pqs = []
for main_rd in sub_docket_main_rds:
# Create PQs related to RD that require replication.
sub_docket_pqs.append(
ProcessingQueue(
uploader_id=fq.user_id,
pacer_doc_id=main_rd.pacer_doc_id,
pacer_case_id=main_rd.docket_entry.docket.pacer_case_id,
court_id=court_id,
upload_type=UPLOAD_TYPE.ATTACHMENT_PAGE,
filepath_local=ContentFile(
text.encode(), name="attachment_page.html"
),
)
)

if not sub_docket_pqs:
self.request.chain = None
return []
# Return PQ IDs to process attachment page replication for sub-dockets.
pqs_created = ProcessingQueue.objects.bulk_create(sub_docket_pqs)
return [pq.pk for pq in pqs_created]


@app.task(
bind=True,
ignore_result=True,
)
def replicate_fq_att_page_to_subdocket_rds(
self: Task, pq_ids_to_process: list[int]
) -> None:
"""Replicate Attachment page from a FQ to subdocket RECAPDocuments.
:param self: The celery task
:param pq_ids_to_process: A list of PQ IDs that require replication to sub-dockets.
:return: None
"""

for pq_pk in pq_ids_to_process:
async_to_sync(process_recap_attachment)(pq_pk)


def get_fq_docket_kwargs(fq):
"""Gather the kwargs for the Juriscraper DocketReport from the fq object
Expand Down
Loading

0 comments on commit ae9d640

Please sign in to comment.