Skip to content

Commit

Permalink
adds ability to run new follow-up job;
Browse files Browse the repository at this point in the history
TODO: consider removing harvest.restart_job and interface.add_harvest_records
  • Loading branch information
btylerburton committed Nov 21, 2024
1 parent 5e0ff76 commit 3ffe7e7
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 32 deletions.
16 changes: 16 additions & 0 deletions database/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,22 @@ def get_latest_harvest_records_by_source(self, source_id):

return [dict(zip(fields, record)) for record in records]

def get_all_latest_harvest_records_by_source(self, source_id):
# datetimes are returned as datetime objs not strs
sql = text(
f"""SELECT DISTINCT ON (identifier) *
FROM harvest_record
WHERE harvest_source_id = '{source_id}'
ORDER BY identifier, date_created DESC"""
)

res = self.db.execute(sql)

fields = list(res.keys())
records = res.fetchall()

return [dict(zip(fields, record)) for record in records]

def close(self):
if hasattr(self.db, "remove"):
self.db.remove()
Expand Down
87 changes: 71 additions & 16 deletions harvester/harvest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path

from typing import List
import requests
from boltons.setutils import IndexedSet
from ckanapi import RemoteCKAN
Expand Down Expand Up @@ -98,7 +98,6 @@ class HarvestSource:
def __post_init__(self) -> None:
self._db_interface: HarvesterDBInterface = db_interface
self._validator = Draft202012Validator(self.dataset_schema)

self.get_source_info_from_job_id(self.job_id)

@property
Expand All @@ -122,7 +121,7 @@ def report(self):
return self._report

@property
def source_attrs(self) -> list:
def source_attrs(self) -> List:
return self._source_attrs

@property
Expand Down Expand Up @@ -151,7 +150,7 @@ def get_source_info_from_job_id(self, job_id: str) -> None:
self.job_id,
)

def internal_records_to_id_hash(self, records: list[dict]) -> None:
def internal_records_to_id_hash(self, records: List[dict]) -> None:
for record in records:
# TODO: don't pass None to original metadata
self.internal_records[record["identifier"]] = Record(
Expand All @@ -163,7 +162,7 @@ def internal_records_to_id_hash(self, records: list[dict]) -> None:
_ckan_name=record["ckan_name"],
)

def external_records_to_id_hash(self, records: list[dict]) -> None:
def external_records_to_id_hash(self, records: List[dict]) -> None:
# ruff: noqa: F841

logger.info("converting harvest records to id: hash")
Expand Down Expand Up @@ -326,7 +325,7 @@ def do_report(self) -> None:
"validity": {True: 0, False: 0},
}
for key, group in groupby(
self.records, lambda x: x.action if x.status != "error" else None
self.records, lambda x: x.action if x.status != "error" else False
):
results["action"][key] = sum(1 for _ in group)

Expand All @@ -336,16 +335,13 @@ def do_report(self) -> None:
for key, group in groupby(self.records, lambda x: x.valid):
results["validity"][key] = sum(1 for _ in group)

# what actually happened?
logger.info("actual actions completed")
logger.info("actions completed")
logger.info(results["action"])

# what actually happened?
logger.info("actual status completed")
logger.info("status completed")
logger.info(results["status"])

# what's our record validity count?
logger.info("validity of the records")
logger.info("validity of records")
logger.info(results["validity"])

job_status = {
Expand All @@ -360,8 +356,52 @@ def do_report(self) -> None:
self.db_interface.update_harvest_job(self.job_id, job_status)
self._report = job_status

def rehydrate_old_job(self):
logger.info("rehydrate old job to pick up and finish")
def restart_job(self):
logger.info(f"restarting failed job for {self.name}")
job = self.db_interface.get_harvest_job(self.job_id)
updated_job = self.db_interface.update_harvest_job(
job.id, {"status": "in_progress"}
)
print(f"Updated job {updated_job.id} to in_progress")
db_records = []
for db_record in job.records:
new_record = Record(
self,
db_record.identifier,
json.loads(db_record.source_raw),
db_record.source_hash,
db_record.action,
_status=db_record.status,
_ckan_id=db_record.ckan_id,
_ckan_name=db_record.ckan_name,
_id=db_record.id,
)
db_records.append(new_record)
self._records = db_records

def follow_up_job(self):
logger.info(f"kicking off pickup job for {self.name}")
db_records = self.db_interface.get_all_latest_harvest_records_by_source(self.id)
job = self.db_interface.get_harvest_job(self.job_id)
updated_job = self.db_interface.update_harvest_job(
job.id, {"status": "in_progress"}
)
print(f"Updated job {updated_job.id} to in_progress")
new_records = []
for db_record in db_records:
new_record = Record(
self,
db_record["identifier"],
json.loads(db_record["source_raw"]),
db_record["source_hash"],
db_record["action"],
_status=db_record["status"],
_ckan_id=db_record["ckan_id"],
_ckan_name=db_record["ckan_name"],
_id=db_record["id"],
)
new_records.append(new_record)
self._records = new_records


@dataclass
Expand All @@ -378,6 +418,7 @@ class Record:
_status: str = None
_ckan_id: str = None
_ckan_name: str = None
_id: str = None

ckanified_metadata: dict = field(default_factory=lambda: {})

Expand All @@ -401,6 +442,14 @@ def ckan_id(self) -> str:
def ckan_id(self, value) -> None:
self._ckan_id = value

@property
def id(self) -> str:
return self._id

@id.setter
def id(self, value) -> None:
self._id = value

@property
def ckan_name(self) -> str:
return self._ckan_name
Expand Down Expand Up @@ -448,7 +497,7 @@ def validation_msg(self) -> str:
@validation_msg.setter
def validation_msg(self, value) -> None:
if not isinstance(value, str):
raise ValueError("status must be a string")
raise ValueError("validation_msg must be a string")
self._validation_msg = value

@property
Expand Down Expand Up @@ -535,8 +584,14 @@ def sync(self) -> None:
logger.warning(f"{self.identifier} is invalid. bypassing {self.action}")
return

start = datetime.now(timezone.utc)
if self.status == "success":
logger.info(
f"{self.identifier} has status 'success'. bypassing {self.action}"
)
return

start = datetime.now(timezone.utc)
# todo:
try:
if self.action == "delete":
self.delete_record()
Expand Down
20 changes: 10 additions & 10 deletions tests/fixtures.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
"harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b",
"action": "create",
"status": "error",
"source_raw": "example data"
"source_raw": "{\"title\": \"test-0\", \"identifier\": \"test-0\"}"
},
{
"id": "c218c965-3670-45c8-bfcd-f852d71ed917",
Expand All @@ -49,7 +49,7 @@
"harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b",
"action": "create",
"status": "error",
"source_raw": "example data"
"source_raw": "{\"title\": \"test-1\", \"identifier\": \"test-1\"}"
},
{
"id": "e1f603cc-8b6b-483f-beb4-86bda5462b79",
Expand All @@ -58,7 +58,7 @@
"harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b",
"action": "create",
"status": "error",
"source_raw": "example data"
"source_raw": "{\"title\": \"test-2\", \"identifier\": \"test-2\"}"
},
{
"id": "1c004473-0802-4f22-a16d-7a2d7559719e",
Expand All @@ -67,7 +67,7 @@
"harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b",
"action": "create",
"status": "error",
"source_raw": "example data"
"source_raw": "{\"title\": \"test-3\", \"identifier\": \"test-3\"}"
},
{
"id": "deb12fa0-d812-4d6e-98f4-d4f7d776c6b3",
Expand All @@ -76,7 +76,7 @@
"harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b",
"action": "create",
"status": "error",
"source_raw": "example data"
"source_raw": "{\"title\": \"test-4\", \"identifier\": \"test-4\"}"
},
{
"id": "27b5d5d6-808b-4a8c-ae4a-99f118e282dd",
Expand All @@ -85,7 +85,7 @@
"harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b",
"action": "create",
"status": "error",
"source_raw": "example data"
"source_raw": "{\"title\": \"test-5\", \"identifier\": \"test-5\"}"
},
{
"id": "c232a2ca-6344-4692-adc2-29f618a2eff3",
Expand All @@ -94,7 +94,7 @@
"harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b",
"action": "create",
"status": "error",
"source_raw": "example data"
"source_raw": "{\"title\": \"test-6\", \"identifier\": \"test-6\"}"
},
{
"id": "95021355-bad0-442b-98e9-475ecd849033",
Expand All @@ -103,7 +103,7 @@
"harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b",
"action": "create",
"status": "error",
"source_raw": "example data"
"source_raw": "{\"title\": \"test-7\", \"identifier\": \"test-7\"}"
},
{
"id": "09f073b3-00e3-4147-ba69-a5d0fd7ce027",
Expand All @@ -112,7 +112,7 @@
"harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b",
"action": "create",
"status": "error",
"source_raw": "example data"
"source_raw": "{\"title\": \"test-8\", \"identifier\": \"test-8\"}"
},
{
"id": "97492788-5d62-4feb-8641-6f6692aec026",
Expand All @@ -121,7 +121,7 @@
"harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b",
"action": "create",
"status": "error",
"source_raw": "example data"
"source_raw": "{\"title\": \"test-9\", \"identifier\": \"test-9\"}"
}
],
"record_error": [
Expand Down
8 changes: 5 additions & 3 deletions tests/integration/harvest/test_ckan_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def test_ckanify_dcatus(
harvest_source = HarvestSource(harvest_job.id)
harvest_source.prepare_external_data()

records = [
record = [
(
{
"identifier": "cftc-dc1",
Expand All @@ -105,9 +105,11 @@ def test_ckanify_dcatus(
}
)
]
interface.add_harvest_records(records)
interface.add_harvest_records(record)
harvest_source.extract()
test_record = [x for x in harvest_source.records if x.identifier == "cftc-dc1"][0]
test_record = [x for x in harvest_source.records if x.identifier == "cftc-dc1"][
0
]

expected_result = {
"name": "commitment-of-traders",
Expand Down
Loading

1 comment on commit 3ffe7e7

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests Skipped Failures Errors Time
2 0 💤 0 ❌ 0 🔥 7.969s ⏱️

Please sign in to comment.