Skip to content
This repository has been archived by the owner on Aug 16, 2019. It is now read-only.

Commit

Permalink
Attempt to fix subsystem creation issues
Browse files Browse the repository at this point in the history
Spotted by sentry as `OVERWATCH-PROCESSING-1`.
  • Loading branch information
raymondEhlers committed Oct 29, 2018
1 parent a4ca928 commit b9230b9
Showing 1 changed file with 55 additions and 36 deletions.
91 changes: 55 additions & 36 deletions overwatch/processing/processRuns.py
Original file line number Diff line number Diff line change
Expand Up @@ -724,44 +724,63 @@ def processMovedFilesIntoRuns(runs, runDict):
if runDir in runs:
run = runs[runDir]

# Determine the subsystems which we want to update. When the subsystem is initially created,
# every subsystem that does not have its `fileLocationSubsystem` is already created. Thus, we want to
# update any subsystem that already exist (including those which do not have their own
# `fileLocationSubsystem`) or those which have new files. However, if it's somehow not included
# (for example, EMC files were not provided, then these won't be included).
# Basically, we need existing subsystems in the run, and then any which subsystems which have
# files in the `runDict`.
subsystemsToCheck = set(run.subsystems)
subsystemsToCheck.union(runDict[runDir])
# Possible scenarios:
# - 1) runDir has new data, and the corresponding subsystem exists. -> Update the subsystem.
# - 2) runDir has new data, but subsystem doens't exist in run container -> Create a new subsystem.
# - 3) runDir has new data, and it is used in a subsystem which doesn't have it's own data -> Update the subsystem with HLT files (but it will be indirect)
# - 4) runDir doesn't have new data, but the subsystem exists -> Do nothing.

# Handle scenario 3
# To handle this scenario, we basically want to copy the HLT (or other fileLocationSubsystem) files to the existing subsystem.
# When we are done, the run subsystems should be a subset of the runDict subsystems. The runDict subsystems could have additional subsystems
# if began receiving new data.
for name, subsystem in iteritems(run.subsystems):
if name not in runDict[runDir]:
newFiles = []
if subsystem.fileLocationSubsystem != subsystem.subsystem:
# This will almost always be "HLT", but in principle it could be something else!
newFiles = runDict[runDir].get(subsystem.fileLocationSubsystem, [])
runDict[runDir][name] = newFiles

# Sanity check to make sure that we've in the desired state to proceed.
assert set(runDict[runDir]).issuperset(run.subsystems)

# Update each subsystem and note that it needs to be reprocessed
for subsystemName in subsystemsToCheck:
if subsystemName in run.subsystems:
logger.debug("Updating files in existing subsystem {subsystemName}.".format(subsystemName = subsystemName))
# Update the existing subsystem
subsystem = run.subsystems[subsystemName]
# Add the new files and note them in the subsystem, which will lead to reprocessing.
subsystem.newFile = True
for filename in runDict[runDir][subsystemName]:
# We need the full path to the file (ie everything except for the dirPrefix).
filename = os.path.join(subsystem.baseDir, filename)
subsystem.files[utilities.extractTimeStampFromFilename(filename)] = processingClasses.fileContainer(filename = filename, startOfRun = subsystem.startOfRun)

# Update time stamps
fileKeys = subsystem.files.keys()
# This should rarely change, but in principle we could get a new file that we missed.
subsystem.startOfRun = fileKeys[0]
logger.info("Previous EOR: {endOfRun}\tNew: {fileKey}".format(endOfRun = subsystem.endOfRun, fileKey = fileKeys[-1]))
subsystem.endOfRun = fileKeys[-1]
for subsystemName in runDict[runDir]:
# We only want to update files if there are actually new files (and not just an empty list)
if runDict[runDir][subsystemName]:
if subsystemName in run.subsystems:
# Scenario 1
# Update the subsystem
logger.debug("Updating files in existing subsystem {subsystemName}.".format(subsystemName = subsystemName))
# Update the existing subsystem
subsystem = run.subsystems[subsystemName]
# Add the new files and note them in the subsystem, which will lead to reprocessing.
subsystem.newFile = True
for filename in runDict[runDir][subsystemName]:
# We need the full path to the file (ie everything except for the dirPrefix).
filename = os.path.join(subsystem.baseDir, filename)
subsystem.files[utilities.extractTimeStampFromFilename(filename)] = processingClasses.fileContainer(filename = filename, startOfRun = subsystem.startOfRun)

# Update time stamps
fileKeys = subsystem.files.keys()
# This should rarely change, but in principle we could get a new file that we missed.
subsystem.startOfRun = fileKeys[0]
logger.info("Previous EOR: {endOfRun}\tNew: {fileKey}".format(endOfRun = subsystem.endOfRun, fileKey = fileKeys[-1]))
subsystem.endOfRun = fileKeys[-1]
else:
# Scenario 2
# Create a new subsystem in an existing run.
# This may occur if a new run has started, but we haven't yet received data from each subsystem.
logger.debug("Creating new subsystem {subsystemName} in existing run.".format(subsystemName = subsystemName))
# NOTE: We don't catch the exception here, as we want it to fail if the subsystem doesn't have its own
# files and the HLT receiver data isn't available.
createNewSubsystemFromMovedFilesInformation(runs, subsystemName, runDict, runDir)
else:
# Create a new subsystem in an existing run.
# This shouldn't be super common, as it corresponds to the case where we have
# already setup a run (with all of its subsystems already created), then later received a file
# from a new subsystem.
logger.debug("Creating new subsystem {subsystemName} in existing run.".format(subsystemName = subsystemName))
# NOTE: We don't catch the exception here, as we want it to fail if the subsystem doesn't have its own
# files and the HLT receiver data isn't available.
createNewSubsystemFromMovedFilesInformation(runs, subsystemName, runDict, runDir)

# Scenario 4
# We end up here if there is now new data for subsystemName.
# In that case, we have nothing to do and we just continue.
logger.debug('"No new data for subsystem "{subsystemName}", so not updating this subsystem in the run'.format(subsystemName = subsystemName))
else:
# The run doesn't yet exist, so we'll create a new run and new subsystems.
# First, create the new run.
Expand Down

0 comments on commit b9230b9

Please sign in to comment.