Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove pugsql #48

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
9 changes: 0 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,6 @@ Both the OIT Legacy Data Warehouse and the Experts Data Warehouse are Oracle
databases. See [experts\_dw on GitHub](https://github.com/UMNLibraries/experts_dw)
for supported versions of the required Oracle InstanctClient library.

#### LDAP

Experts ETL uses LDAP to search for some student researcher information. See the
[python-ldap build prerequisites](https://www.python-ldap.org/en/python-ldap-3.3.0/installing.html#build-prerequisites)
for the required system libraries to install in your local environment.

### pyenv, venv, and poetry

To install and manage Python versions we use
Expand Down Expand Up @@ -90,9 +84,6 @@ via environment variables:
* `EXPERTS_DB_PASS`
* `EXPERTS_DB_HOSTNAME`
* `EXPERTS_DB_SERVICE_NAME`
* UMN LDAP
* `UMN_LDAP_DOMAIN`
* `UMN_LDAP_PORT`

Some tests are integration tests that connect to these external services, so
these variables must be set for testing. One option is to set these
Expand Down
3 changes: 0 additions & 3 deletions env.dist
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,3 @@ EXPERTS_ETL_FROM_EMAIL_ADDRESS="[email protected]"

EXPERTS_ETL_TICKET_EMAIL_ADDRESS="[email protected]"
EXPERTS_ETL_ERROR_EMAIL_ADDRESS="[email protected]"

UMN_LDAP_DOMAIN="umn.ldap.domain"
UMN_LDAP_PORT="umn_ldap_port_number"
2 changes: 1 addition & 1 deletion experts_etl/demographics.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ def latest_demographics_for_emplid(session, emplid):
session.query(PureEligibleDemogChngHst)
.filter(
PureEligibleDemogChngHst.emplid == emplid,
PureEligibleDemogChngHst.timestamp == subqry
PureEligibleDemogChngHst.timestamp == subqry.scalar_subquery()
)
.one_or_none()
)
Expand Down
37 changes: 23 additions & 14 deletions experts_etl/oit_to_edw/person.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from sqlalchemy import and_, func, text

from experts_dw import db
from experts_dw.rawsql import update_pure_sync_person_data, insert_pure_sync_person_data, update_pure_sync_user_data, insert_pure_sync_user_data, update_pure_sync_staff_org_association, insert_pure_sync_staff_org_association, delete_obsolete_primary_jobs
from experts_dw.models import PureEligiblePersonNew, PureEligiblePersonChngHst, PureEligibleDemogNew, PureEligibleDemogChngHst, Person, PureSyncPersonDataScratch, PureSyncStaffOrgAssociationScratch, PureSyncUserDataScratch
from experts_dw.sqlapi import sqlapi
from experts_etl import loggers
from experts_etl.demographics import latest_demographics_for_emplid, latest_not_null_internet_id_for_emplid
from experts_etl.umn_data_error import record_person_no_job_data_error
Expand All @@ -25,9 +25,6 @@ def run(
transaction_record_limit=transaction_record_limit,
experts_etl_logger=None
):
# This connection API in general needs work. Including this here for the sake of consistency
# with other ETL module.run() functions.
sqlapi.setengine(db.engine(db_name))

if experts_etl_logger is None:
experts_etl_logger = loggers.experts_etl_logger()
Expand Down Expand Up @@ -58,24 +55,36 @@ def run(
session.commit()
load_count = 0

update_targets_from_scratch()
# We now use cx_oracle_connection() to prepare our target tables
with db.cx_oracle_connection() as connection:
update_targets_from_scratch(connection, experts_etl_logger)

session.commit()

experts_etl_logger.info('ending: oit -> edw', extra={'pure_sync_job': 'person'})

def update_targets_from_scratch():
with sqlapi.transaction():
sqlapi.update_pure_sync_person_data()
sqlapi.insert_pure_sync_person_data()
def update_targets_from_scratch(connection, experts_etl_logger):
try:
cur = connection.cursor()

sqlapi.update_pure_sync_user_data()
sqlapi.insert_pure_sync_user_data()
cur.execute(update_pure_sync_person_data)
cur.execute(insert_pure_sync_person_data)

sqlapi.update_pure_sync_staff_org_association()
sqlapi.insert_pure_sync_staff_org_association()
cur.execute(update_pure_sync_user_data)
cur.execute(insert_pure_sync_user_data)

sqlapi.delete_obsolete_primary_jobs()
cur.execute(update_pure_sync_staff_org_association)
cur.execute(insert_pure_sync_staff_org_association)

cur.execute(delete_obsolete_primary_jobs)

connection.commit()
except Exception as e:
connection.rollback()
formatted_exception = loggers.format_exception(e)
experts_etl_logger.error(
f'Exception encountered during updating pure_sync_data tables: {formatted_exception}'
)

def load_into_scratch(session, person_dict):
pure_sync_person_data = PureSyncPersonDataScratch(
Expand Down
71 changes: 51 additions & 20 deletions experts_etl/umn_data_error.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from datetime import datetime
from experts_dw import db
from experts_dw.rawsql import unreported_umn_data_errors, record_reporting_of_umn_data_errors, count_pure_eligible_persons_in_dept
from experts_dw.models import UmnDataError
from experts_dw.sqlapi import sqlapi
from experts_etl.exceptions import *
from experts_etl import loggers

from sqlalchemy.sql import text

import os
import csv
from io import StringIO
Expand All @@ -23,32 +25,50 @@ def run(
from_address=None,
ticket_address=None
):
# This connection API in general needs work. Including this here for the sake of consistency
# with other ETL module.run() functions.
sqlapi.setengine(db.engine(db_name))

if experts_etl_logger is None:
experts_etl_logger = loggers.experts_etl_logger()
experts_etl_logger.info('starting: error reporting', extra={'pure_sync_job': 'error_reporting'})

if smtp_server is None:
smtp_server = smtplib.SMTP('localhost', 25)
if from_address is None:
from_address = os.environ.get('EXPERTS_ETL_FROM_EMAIL_ADDRESS')
if ticket_address is None:
ticket_address = os.environ.get('EXPERTS_ETL_TICKET_EMAIL_ADDRESS')

with sqlapi.transaction():
report_via_email(smtp_server=smtp_server, from_address=from_address, ticket_address=ticket_address)
smtp_server.quit()
with db.cx_oracle_connection() as connection:
unreported_errors = get_unreported_errors(connection)
if len(unreported_errors) > 0:
if smtp_server is None:
smtp_server = smtplib.SMTP('localhost', 25)
if from_address is None:
from_address = os.environ.get('EXPERTS_ETL_FROM_EMAIL_ADDRESS')
if ticket_address is None:
ticket_address = os.environ.get('EXPERTS_ETL_TICKET_EMAIL_ADDRESS')

message = generate_error_report_message(unreported_errors, from_address, ticket_address)
message_sent_successfully = False
try:
smtp_server.send_message(message)
message_sent_successfully = True
except Exception as e:
formatted_exception = loggers.format_exception(e)
experts_etl_logger.error(
f'Exception encountered during sending of error report message: {formatted_exception}'
)
smtp_server.quit()
if message_sent_successfully:
record_reporting_of_errors(connection, experts_etl_logger)

experts_etl_logger.info('ending: error reporting', extra={'pure_sync_job': 'error_reporting'})

def report_via_email(smtp_server, from_address, ticket_address):
unreported_errors = list(sqlapi.unreported_umn_data_errors())
if len(unreported_errors) == 0:
return
def get_unreported_errors(connection):
cursor = connection.cursor()
cursor.execute(unreported_umn_data_errors)
cursor.rowfactory = lambda *args: dict(
zip([col[0] for col in cursor.description], args)
)
unreported_errors = cursor.fetchall()
cursor.close()
return unreported_errors

def generate_error_report_message(unreported_errors, from_address, ticket_address):
# Build and send e-mail with csv attachment of errors
datetime_string = datetime.now().strftime('%Y-%m-%dT%H:%M:%S')

message = MIMEMultipart()
Expand All @@ -68,9 +88,19 @@ def report_via_email(smtp_server, from_address, ticket_address):
part.add_header('Content-Disposition', f'attachment; filename="umn-data-errors-{datetime_string}.csv"')
message.attach(part)

smtp_server.send_message(message)
return message

sqlapi.record_reporting_of_umn_data_errors()
def record_reporting_of_errors(connection, experts_etl_logger):
try:
cursor = connection.cursor()
cursor.execute(record_reporting_of_umn_data_errors)
connection.commit()
except Exception as e:
connection.rollback()
formatted_exception = loggers.format_exception(e)
experts_etl_logger.error(
f'Exception encountered during recording of data errors: {formatted_exception}'
)

def create_csv_report(unreported_errors):
# Remove fields which may confuse people doing data entry:
Expand Down Expand Up @@ -123,7 +153,8 @@ def record_unknown_dept_errors(
um_campus_descr=None,
):
if persons_in_dept is None:
persons_in_dept = sqlapi.count_pure_eligible_persons_in_dept(deptid=deptid)
# We are using a SQL Alchemy session to execute a count statement. A workaround from having to introduce new connection objects into the process.
persons_in_dept = session.execute(text(count_pure_eligible_persons_in_dept), {"deptid": deptid}).scalars().one_or_none()

session.add(
find_or_create_umn_data_error(
Expand Down
Loading