Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Develop #11

Merged
merged 3 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 0 additions & 100 deletions INSTALL.md

This file was deleted.

272 changes: 70 additions & 202 deletions README.md

Large diffs are not rendered by default.

37 changes: 19 additions & 18 deletions __init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
import os
import json
import multiprocessing
import sys
from datetime import datetime
from collections import OrderedDict


Config = configparser.ConfigParser()
Config.read('applications/kaptive/settings.ini')
Config.read('applications/kaptive_web/settings.ini')
queue_path = Config.get('Path', 'queue_path')
upload_path = Config.get('Path', 'upload_path')

Expand All @@ -17,9 +17,9 @@ def save_json_to_file(f, json_string):
try:
with open(f, 'wt+') as file:
json.dump(json_string, file, indent=4)
print("Wrote to file: " + f)
print("Wrote to file: " + f, file=sys.stderr)
except (IOError, OSError) as e:
print("Error writing file: " + f + "; " + str(e))
print("Error writing file: " + f + "; " + str(e), file=sys.stderr)


# Read JSON Object from a file
Expand All @@ -29,13 +29,13 @@ def read_json_from_file(f):
with open(f, 'r') as file:
data = json.load(file, object_pairs_hook=OrderedDict)
except ValueError as e:
print("Error parsing file " + f + ": " + str(e))
print("Error parsing file " + f + ": " + str(e), file=sys.stderr)
if not f.endswith('.bak'):
os.remove(f)
data = read_json_from_file((f + '.bak'))
save_json_to_file(f, data)
except (IOError, OSError) as e:
print("Error reading file " + f + ": " + str(e))
print("Error reading file " + f + ": " + str(e), file=sys.stderr)
if not f.endswith('.bak'):
os.remove(f)
data = read_json_from_file((f + '.bak'))
Expand All @@ -45,7 +45,8 @@ def read_json_from_file(f):

job_queue_path = os.path.join(queue_path, 'queue')
available_worker = multiprocessing.cpu_count() - 1
if os.path.exists(job_queue_path): # and os.path.getsize(job_queue_path) > 2: catches empty queue (i.e. if file contains {})
if os.path.exists(
job_queue_path): # and os.path.getsize(job_queue_path) > 2: catches empty queue (i.e. if file contains {})
data = OrderedDict() # read_json_from_file returns an OrderedDict even if empty, no need to declare here.
# Put the jobs in processing back to the job queue
data = read_json_from_file(job_queue_path)
Expand All @@ -58,7 +59,7 @@ def read_json_from_file(f):
data['Available worker'] = available_worker
data['Last update (worker)'] = str(datetime.now().strftime('%d %b %Y %H:%M:%S'))
save_json_to_file(job_queue_path, data)
print("Queue file updated.")
print("Queue file updated.", file=sys.stderr)

for i in data['Job queue']:
job_list_path = os.path.join(upload_path, i[0], 'job_list.json')
Expand All @@ -75,16 +76,16 @@ def read_json_from_file(f):
job_name = j['Fasta file']
job_seq = j['Job seq']
save_json_to_file(job_list_path, job_data)
print("Fixed coruppted data in job list.")
print("Fixed corrupted data in job list.", file=sys.stderr)
break
else:
data = OrderedDict()
data['Job queue'] = []
data['Processing queue'] = []
data['Last update (queue)'] = str(datetime.now().strftime('%d %b %Y %H:%M:%S'))
data['Total worker'] = available_worker
data['Available worker'] = available_worker
data['Last update (worker)'] = str(datetime.now().strftime('%d %b %Y %H:%M:%S'))
data = OrderedDict({
'Job queue': [],
'Processing queue': [],
'Last update (queue)': str(datetime.now().strftime('%d %b %Y %H:%M:%S')),
'Total worker': available_worker,
'Available worker': available_worker,
'Last update (worker)': str(datetime.now().strftime('%d %b %Y %H:%M:%S'))
})
save_json_to_file(job_queue_path, data)
print("Queue file created.")

print("Queue file created.", file=sys.stderr)
97 changes: 44 additions & 53 deletions controllers/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import time
from collections import OrderedDict
from datetime import datetime
from pathlib import Path

queue_lock = threading.Lock()
job_list_lock = threading.Lock()
Expand All @@ -14,7 +15,7 @@

# Read config file
Config = configparser.ConfigParser()
Config.read('applications/kaptive/settings.ini')
Config.read('applications/kaptive_web/settings.ini')
base_path = Config.get('Path', 'base_path')
reference_database_path = Config.get('Path', 'reference_database_path')
upload_path = Config.get('Path', 'upload_path')
Expand All @@ -27,67 +28,45 @@
job_queue_path = os.path.join(queue_path, 'queue')

# Setup logger
logger = logging.getLogger("kaptive")
logger = logging.getLogger('kaptive_web') # Setup a new logger
logger.propagate = False # Prevents multi printing, see: https://stackoverflow.com/questions/6729268/log-messages-appearing-twice-with-python-logging
logger.setLevel(logging.DEBUG)

# Get number of tblastn or blastn running (for debug purpose only)
procs = subprocess.check_output(['ps', 'uaxw']).decode('utf-8').splitlines()
blast_procs = [proc for proc in procs if 'blast' in proc]
blast_count = len(blast_procs)
if blast_count > 0:
logger.debug(' Blast found: ' + str(blast_count))
if not logger.handlers: # Prevents multi printing, see: https://stackoverflow.com/questions/6729268/log-messages-appearing-twice-with-python-logging
logger.addHandler(handler := logging.StreamHandler())
handler.setFormatter(
logging.Formatter(
"%(asctime)s %(levelname)s [%(filename)s.%(funcName)s:%(lineno)d] %(message)s",
"%Y-%m-%d %H:%M:%S"
)
)


def index():
return dict()


def jobs():
import uuid
import fnmatch
import re
import uuid

if request.vars.message is not None:
response.flash = request.vars.message

# Generate an UUID for each job submission
session.uuid = str(uuid.uuid4())

# -------------------------------------------------------------------------
# Get a list of reference database files, order by file name but with all
# Klebsiella databases listed first.
# - Copy the database files to this folder.
# - Name the first one (default) 1-xxxx, second one 2-xxxx, so on so forth.
# -------------------------------------------------------------------------
filelist_klebsiella = dict()
filelist_other = dict()
logger.debug(f'[{session.uuid}] Reference database file found:')
for f in sorted(os.listdir(reference_database_path)):
if os.path.isfile(os.path.join(reference_database_path, f)) and fnmatch.fnmatch(f, '*.gbk'):
fname = re.sub('\.gbk$', '', f)
fname = re.sub('_', ' ', fname)
fname = re.sub('\d-', '', fname)
fname = fname.replace(' k ', ' K ').replace(' o ', ' O ')
logger.debug(f'[{session.uuid}] Database Name: ' + f)
if 'klebsiella' in fname.lower():
filelist_klebsiella[f] = fname
else:
filelist_other[f] = fname
# Create sorted list of tuples with all Klebsiella databases preceeding databases of other genus
filelist_klebsiella_sorted = sorted(filelist_klebsiella.items(), key=lambda k: k[0])
filelist_other_sorted = sorted(filelist_other.items(), key=lambda k: k[0])
filelist_sorted = filelist_klebsiella_sorted + filelist_other_sorted
# Merge filelist dicts as this is used below and later
filelist = filelist_klebsiella.copy()
filelist.update(filelist_other)
filelist = {
str(i): i.stem.replace('_', ' ') for i in Path(reference_database_path).glob('*.gbk')
}

if len(filelist) == 0:
logger.error(f'[{session.uuid}] No reference database file found.')
response.flash = 'Internal error. No reference database file found. Please contact us.'

# Create the form
fields = [Field('job_name', label=T('Job name (optional)')),
Field('assembly','upload', requires=[IS_NOT_EMPTY()], label=T('Assembly file*'), custom_store=upload_file),
Field('reference', requires=IS_IN_SET(filelist_sorted, zero=None), label=T('Reference database'))
Field('reference', requires=IS_IN_SET(filelist, zero=None), label=T('Reference database'))
]
if captcha:
fields.append(captcha_field()) # Google reCaptcha v2
Expand All @@ -104,19 +83,27 @@ def jobs():
compression = get_compression_type(file_path)
if compression == 'zip':
process_zip_file(file_dir, file_path)
logger.debug(f'[{session.uuid}] Zip file uploaded: ' + request.vars.assembly.filename)
logger.debug(f'[{session.uuid}] Zip file uploaded: {request.vars.assembly.filename}')
elif compression == 'gz':
process_gz_file(file_dir, request.vars.assembly.filename)
logger.debug(f'[{session.uuid}] GZip file uploaded: ' + request.vars.assembly.filename)
logger.debug(f'[{session.uuid}] GZip file uploaded: {request.vars.assembly.filename}')

# Get a list of fasta files
fastalist = [f for f in os.listdir(os.path.join(upload_path, session.uuid))
if os.path.isfile(os.path.join(upload_path, session.uuid, f))]
fastafiles = []

allowed_characters = ' a-zA-Z0-9_.-'
fastafiles_invalid = []

no_of_fastas = 0
for f in fastalist:
if is_file_fasta(os.path.join(upload_path, session.uuid, f)):

# Validate inputs
if re.search(fr'[^{allowed_characters}]', f):
fastafiles_invalid.append(f)

# Spaces and hashes cause problems, so rename files to be spaceless and hashless, if needed.
if ' ' in f:
new_f = f.replace(' ', '_')
Expand All @@ -139,18 +126,22 @@ def jobs():
if no_of_fastas == 0:
logger.error(f'[{session.uuid}] No fasta file found in uploaded file.')
redirect(URL(r=request, f='jobs', vars=dict(message=T("No fasta file was found in the uploaded file."))))
fastafiles_string = ', '.join(fastafiles)
if fastafiles_invalid:
fastafiles_invalid_str = ', '.join(fastafiles_invalid)
error_msg = f'Input file contains invalid characters: {fastafiles_invalid_str}. Please include only {allowed_characters}'
logger.error(f'[{session.uuid}] {error_msg}')
redirect(URL(r=request, f='jobs', vars=dict(message=T(error_msg))))

logger.debug(f'[{session.uuid}] Selected reference database: ' + request.vars.reference)

# Save job details to a JSON file
build_meta_json(session.uuid, request.vars.job_name, fastafiles_string, no_of_fastas,
filelist.get(request.vars.reference, None), submit_time)
build_meta_json(session.uuid, request.vars.job_name, fastafiles, filelist[request.vars.reference], submit_time)

# Create empty result file
create_table_file(request.vars.reference)
create_table_file()

# Build job list
build_job_dict(session.uuid, request.vars.reference, submit_time, fastafiles, no_of_fastas, upload_path)
build_job_dict(session.uuid, request.vars.reference, submit_time, fastafiles, upload_path)

# Add job to job queue
add_job_to_queue(job_queue_path, session.uuid, no_of_fastas)
Expand Down Expand Up @@ -205,7 +196,7 @@ def confirmation():

if os.path.exists(result_json_path) and (succeeded_jobs + failed_jobs == total_jobs): # If job finished
content = ''
result_data = read_json_from_file(result_json_path)
result_data = read_json_from_file(result_json_path, json_lines=True)
result_status = 1
elif pending_jobs == 0 and running_jobs == 0:
content = ''
Expand All @@ -217,11 +208,11 @@ def confirmation():
logger.debug(f'[{session.uuid}] No available worker. Job is in the queue.')
result_status = 2
else:
content = 'Processing your job, it usually takes ~1 minute for each assembly file to complete. ' \
content = 'Processing your job, it usually takes < a few seconds for each assembly file to complete. ' \
'This page will refresh every ' + str(refresh_time / 1000) + \
' seconds until the process is completed. Please do not close this page or start a new job.'
if os.path.exists(result_json_path):
result_data = read_json_from_file(result_json_path)
result_data = read_json_from_file(result_json_path, json_lines=True)
else:
logger.debug(f'[{session.uuid}] Cannot find final result JSON file.')
result_status = 2
Expand Down Expand Up @@ -296,16 +287,16 @@ def download():
@cache.action()
def get_svg():
uuid = request.args(0)
assemble_name = request.args(1)
path = os.path.join(upload_path, uuid, 'locus_image', assemble_name + '.svg')
assembly_name = request.args(1)
path = os.path.join(upload_path, uuid, 'locus_image', assembly_name + '.svg')
return response.stream(path)


@cache.action()
def get_png():
uuid = request.args(0)
assemble_name = request.args(1)
path = os.path.join(upload_path, uuid, 'locus_image', assemble_name + '.png')
assembly_name = request.args(1)
path = os.path.join(upload_path, uuid, 'locus_image', assembly_name + '.png')
if os.path.exists(path):
return response.stream(path)
else:
Expand Down
Binary file removed extras/Example_broken_assembly.png
Binary file not shown.
Binary file removed extras/Example_close_match.png
Binary file not shown.
Binary file removed extras/Example_more_distant_match.png
Binary file not shown.
Binary file removed extras/Example_novel_locus.png
Binary file not shown.
Binary file removed extras/Example_novel_variant.png
Binary file not shown.
Binary file removed extras/Example_variant_database_run.png
Binary file not shown.
Loading