Skip to content

Commit

Permalink
Merge pull request #1103 from microbiomedata/issue-1079-ingest-cli
Browse files Browse the repository at this point in the history
Update ingest CLI to support scheduled ingest
  • Loading branch information
pkalita-lbl authored Jan 5, 2024
2 parents 910c92d + a039275 commit 74c7be9
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 24 deletions.
83 changes: 66 additions & 17 deletions nmdc_server/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
from typing import Optional

import click
import requests

from nmdc_server import jobs, models
from nmdc_server.config import Settings, settings
from nmdc_server.database import SessionLocal, SessionLocalIngest
from nmdc_server import jobs
from nmdc_server.config import Settings
from nmdc_server.database import SessionLocalIngest
from nmdc_server.ingest import errors
from nmdc_server.ingest.all import load
from nmdc_server.ingest.common import maybe_merge_download_artifact
from nmdc_server.logger import get_logger


Expand Down Expand Up @@ -63,8 +62,11 @@ def truncate():
@click.option("-v", "--verbose", count=True)
@click.option("--function-limit", type=click.INT, default=100)
@click.option("--skip-annotation", is_flag=True, default=False)
def ingest(verbose, function_limit, skip_annotation):
@click.option("--swap-rancher-secrets", is_flag=True, default=False)
def ingest(verbose, function_limit, skip_annotation, swap_rancher_secrets):
"""Ingest the latest data from mongo into the ingest database."""
settings = Settings()

level = logging.WARN
if verbose == 1:
level = logging.INFO
Expand All @@ -73,17 +75,8 @@ def ingest(verbose, function_limit, skip_annotation):
logger = get_logger(__name__)
logging.basicConfig(level=level, format="%(message)s")
logger.setLevel(logging.INFO)
jobs.migrate(ingest_db=True)
with SessionLocalIngest() as ingest_db:
load(ingest_db, function_limit=function_limit, skip_annotation=skip_annotation)
if settings.current_db_uri != settings.ingest_database_uri:
with SessionLocal() as prod_db:
# copy persistent data from the production db to the ingest db
maybe_merge_download_artifact(ingest_db, prod_db.query(models.FileDownload))
maybe_merge_download_artifact(ingest_db, prod_db.query(models.BulkDownload))
maybe_merge_download_artifact(
ingest_db, prod_db.query(models.BulkDownloadDataObject)
)

jobs.do_ingest(function_limit, skip_annotation)

for m, s in errors.missing.items():
click.echo(f"missing {m}:")
Expand All @@ -95,6 +88,62 @@ def ingest(verbose, function_limit, skip_annotation):
for id in s:
click.echo(id)

if swap_rancher_secrets:

def require_setting(name: str):
if not getattr(settings, name, None):
raise ValueError(f"{name} must be set to use --swap-rancher-secrets")

require_setting("rancher_api_base_url")
require_setting("rancher_api_auth_token")
require_setting("rancher_project_id")
require_setting("rancher_postgres_secret_id")
require_setting("rancher_backend_workload_id")
require_setting("rancher_worker_workload_id")

headers = {"Authorization": f"Bearer {settings.rancher_api_auth_token}"}

click.echo(f"Getting current secret {settings.rancher_postgres_secret_id}")
secret_url = (
f"{settings.rancher_api_base_url}"
f"/project/{settings.rancher_project_id}"
f"/secrets/{settings.rancher_postgres_secret_id}"
)
response = requests.get(secret_url, headers=headers)
response.raise_for_status()
current = response.json()
update = {
"data": {
"INGEST_URI": current["data"]["POSTGRES_URI"],
"POSTGRES_PASSWORD": current["data"]["POSTGRES_PASSWORD"],
"POSTGRES_URI": current["data"]["INGEST_URI"],
}
}

click.echo(f"Updating secret {settings.rancher_postgres_secret_id}")
response = requests.put(secret_url, headers=headers, json=update)
response.raise_for_status()

click.echo(f"Redeploying workload {settings.rancher_backend_workload_id}")
response = requests.post(
f"{settings.rancher_api_base_url}"
f"/project/{settings.rancher_project_id}"
f"/workloads/{settings.rancher_backend_workload_id}?action=redeploy",
headers=headers,
)
response.raise_for_status()

click.echo(f"Redeploying workload {settings.rancher_worker_workload_id}")
response = requests.post(
f"{settings.rancher_api_base_url}"
f"/project/{settings.rancher_project_id}"
f"/workloads/{settings.rancher_worker_workload_id}?action=redeploy",
headers=headers,
)
response.raise_for_status()

click.echo("Done")


@cli.command()
@click.option("--print-sql", is_flag=True, default=False)
Expand Down
8 changes: 8 additions & 0 deletions nmdc_server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ class Settings(BaseSettings):
# App settings related to UI behavior
disable_bulk_download: str = ""

# Rancher information to swap databases after ingest
rancher_api_base_url: Optional[str] = None
rancher_api_auth_token: Optional[str] = None
rancher_project_id: Optional[str] = None
rancher_postgres_secret_id: Optional[str] = None
rancher_backend_workload_id: Optional[str] = None
rancher_worker_workload_id: Optional[str] = None

@property
def current_db_uri(self) -> str:
if self.environment == "testing":
Expand Down
19 changes: 12 additions & 7 deletions nmdc_server/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

HERE = Path(__file__).parent

logger = get_logger(__name__)


@celery_app.task
def ping():
Expand Down Expand Up @@ -54,13 +56,7 @@ def migrate(ingest_db: bool = False):
command.upgrade(alembic_cfg, "head")


@celery_app.task
def ingest(function_limit=None, skip_annotation=False):
"""Truncate database and ingest all data from the mongo source."""
logger = get_logger(__name__)
logging.basicConfig(level=logging.INFO, format="%(message)s")
logger.setLevel(logging.INFO)

def do_ingest(function_limit, skip_annotation):
with database.SessionLocalIngest() as ingest_db:
try:
ingest_db.execute("select truncate_tables()").all()
Expand Down Expand Up @@ -88,3 +84,12 @@ def ingest(function_limit=None, skip_annotation=False):
maybe_merge_download_artifact(ingest_db, prod_db.query(models.FileDownload))
maybe_merge_download_artifact(ingest_db, prod_db.query(models.BulkDownload))
maybe_merge_download_artifact(ingest_db, prod_db.query(models.BulkDownloadDataObject))


@celery_app.task
def ingest(function_limit=None, skip_annotation=False):
"""Truncate database and ingest all data from the mongo source."""
logging.basicConfig(level=logging.INFO, format="%(message)s")
logger.setLevel(logging.INFO)

do_ingest(function_limit, skip_annotation)

0 comments on commit 74c7be9

Please sign in to comment.