Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend Restart to Adaptive samplers #1283

Merged
merged 17 commits into from
Jul 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions developer_tools/XSDSchemas/Samplers.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@
<xsd:element name="initialSeed" type="xsd:integer" minOccurs="0" maxOccurs="1"/>
<xsd:element name="variable" type="variableType" minOccurs="1" maxOccurs="unbounded"/>
<xsd:element name="constant" type="constantVarType" minOccurs="0" maxOccurs='unbounded'/>
<xsd:element name="Restart" type="AssemblerObjectType" minOccurs="0" maxOccurs="1"/>
</xsd:sequence>
<xsd:attribute name="name" type="xsd:string" use="required"/>
<xsd:attribute name="verbosity" type="verbosityAttr"/>
Expand Down
12 changes: 12 additions & 0 deletions framework/JobHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,18 @@ def addClientJob(self, args, functionToRun, identifier, metadata=None, uniqueHan
forceUseThreads = True, uniqueHandler = uniqueHandler,
clientQueue = True)

def addFinishedJob(self, data):
"""
Takes an already-finished job (for example, a restart realization) and adds it to the finished queue.
@ In, data, dict, completed realization
@ Out, None
"""
# create a placeholder runner
run = Runners.PassthroughRunner(self.messageHandler, data)
# place it on the finished queue
with self.__queueLock:
self.__finished.append(run)

def isFinished(self):
"""
Method to check if all the runs in the queue are finished
Expand Down
86 changes: 86 additions & 0 deletions framework/Runners/PassthroughRunner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Copyright 2017 Battelle Energy Alliance, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Module for Passthrough Runner class, which skips evaluation. Used particularly
for restarting Samplers from existing data currently.
"""
import numpy as np
from .Runner import Runner

class PassthroughRunner(Runner):
"""
A runner for when we already have the answer, but need to go through the mechanics.
"""
def __init__(self, messageHandler, data, metadata=None, uniqueHandler="any", profile=False):
"""
Init method
@ In, messageHandler, MessageHandler object, the global RAVEN message
handler object
@ In, data, dict, fully-evaluated realization
@ In, metadata, dict, optional, dictionary of metadata associated with
this run
@ In, uniqueHandler, string, optional, it is a special keyword attached to
this runner. For example, if present, to retrieve this runner using the
method jobHandler.getFinished, the uniqueHandler needs to be provided.
If uniqueHandler == 'any', every "client" can get this runner
@ In, profile, bool, optional, if True then at deconstruction timing statements will be printed
@ Out, None
"""
super(PassthroughRunner, self).__init__(messageHandler, metadata=metadata, uniqueHandler=uniqueHandler, profile=profile)
self._data = data # realization with completed data
self.returnCode = 0 # passthrough was born successful

def isDone(self):
"""
Method to check if the calculation associated with this Runner is finished
@ In, None
@ Out, isDone, bool, is it finished?
"""
return True # passthrough was born done

def getReturnCode(self):
"""
Returns the return code from "running the code."
@ In, None
@ Out, returnCode, int, the return code of this evaluation
"""
return self.returnCode

def getEvaluation(self):
"""
Return solution.
@ In, None
@ Out, result, dict, results
"""
result = {}
result.update(dict((key, np.atleast_1d(value)) for key, value in self._data['inputs'].items()))
result.update(dict((key, np.atleast_1d(value)) for key, value in self._data['outputs'].items()))
result.update(dict((key, np.atleast_1d(value)) for key, value in self._data['metadata'].items()))
return result

def start(self):
"""
Method to start the job associated to this Runner
@ In, None
@ Out, None
"""
pass # passthrough was born done

def kill(self):
"""
Method to kill the job associated to this Runner
@ In, None
@ Out, None
"""
pass # passthrough was born done; you can't kill it
2 changes: 1 addition & 1 deletion framework/Runners/Runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class Runner(MessageHandler.MessageUser):
Generic base class for running codes and models in parallel environments
both internally (shared data) and externally.
"""
def __init__(self, messageHandler, identifier = None, metadata = None, uniqueHandler = "any", profile = False):
def __init__(self, messageHandler, identifier=None, metadata=None, uniqueHandler="any", profile=False):
"""
Initialize command variable
@ In, messageHandler, MessageHandler instance, the global RAVEN message handler instance
Expand Down
1 change: 1 addition & 0 deletions framework/Runners/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from .InternalRunner import InternalRunner
from .SharedMemoryRunner import SharedMemoryRunner
from .DistributedMemoryRunner import DistributedMemoryRunner
from .PassthroughRunner import PassthroughRunner
from .Error import Error

# from .Factory import knownTypes
Expand Down
32 changes: 18 additions & 14 deletions framework/Steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,9 +626,10 @@ def _localInitializeStep(self,inDictionary):
for inputIndex in range(inDictionary['jobHandler'].runInfoDict['batchSize']):
if inDictionary[self.samplerType].amIreadyToProvideAnInput():
try:
newInput = self._findANewInputToRun(inDictionary[self.samplerType], inDictionary['Model'], inDictionary['Input'], inDictionary['Output'])
inDictionary["Model"].submit(newInput, inDictionary[self.samplerType].type, inDictionary['jobHandler'], **copy.deepcopy(inDictionary[self.samplerType].inputInfo))
self.raiseADebug('Submitted input '+str(inputIndex+1))
newInput = self._findANewInputToRun(inDictionary[self.samplerType], inDictionary['Model'], inDictionary['Input'], inDictionary['Output'], inDictionary['jobHandler'])
if newInput is not None:
inDictionary["Model"].submit(newInput, inDictionary[self.samplerType].type, inDictionary['jobHandler'], **copy.deepcopy(inDictionary[self.samplerType].inputInfo))
self.raiseADebug('Submitted input '+str(inputIndex+1))
except utils.NoMoreSamplesNeeded:
self.raiseAMessage('Sampler returned "NoMoreSamplesNeeded". Continuing...')

Expand Down Expand Up @@ -710,8 +711,9 @@ def _localTakeAstepRun(self,inDictionary):

if sampler.amIreadyToProvideAnInput():
try:
newInput = self._findANewInputToRun(sampler, model, inputs, outputs)
model.submit(newInput, inDictionary[self.samplerType].type, jobHandler, **copy.deepcopy(sampler.inputInfo))
newInput = self._findANewInputToRun(sampler, model, inputs, outputs, jobHandler)
if newInput is not None:
model.submit(newInput, inDictionary[self.samplerType].type, jobHandler, **copy.deepcopy(sampler.inputInfo))
except utils.NoMoreSamplesNeeded:
self.raiseAMessage(' ... Sampler returned "NoMoreSamplesNeeded". Continuing...')
break
Expand All @@ -728,7 +730,7 @@ def _localTakeAstepRun(self,inDictionary):
# if any collected runs failed, let the sampler treat them appropriately, and any other closing-out actions
sampler.finalizeSampler(self.failedRuns)

def _findANewInputToRun(self, sampler, model, inputs, outputs):
def _findANewInputToRun(self, sampler, model, inputs, outputs, jobHandler):
"""
Repeatedly calls Sampler until a new run is found or "NoMoreSamplesNeeded" is raised.
@ In, sampler, Sampler, the sampler in charge of generating the sample
Expand All @@ -741,18 +743,20 @@ def _findANewInputToRun(self, sampler, model, inputs, outputs):
(i.e., a DataObject, File, or HDF5, I guess? Maybe these should all
inherit from some base "Data" so that we can ensure a consistent
interface for these?)
@ Out, newInp, list, list containing the new inputs
@ In, jobHandler, object, the raven object used to handle jobs
@ Out, newInp, list, list containing the new inputs (or None if a restart)
"""
#The value of "found" determines what the Sampler is ready to provide.
# case 0: a new sample has been discovered and can be run, and newInp is a new input list.
# case 1: found the input in restart, and newInp is a realization dicitonary of data to use
found = None
while found != 0:
found, newInp = sampler.generateInput(model,inputs)
if found == 1:
# loop over the outputs for this step and collect the data for each
for collector, outIndex in self._outputDictCollectionLambda:
collector([newInp,outputs[outIndex]])
found, newInp = sampler.generateInput(model,inputs)
if found == 1:
# "submit" the finished run
jobHandler.addFinishedJob(newInp)
return None
# NOTE: we return None here only because the Sampler's "counter" is not correctly passed
# through if we add several samples at once through the restart. If we actually returned
# a Realization object from the Sampler, this would not be a problem. - talbpaul
return newInp
#
#
Expand Down
197 changes: 197 additions & 0 deletions tests/framework/Samplers/Restart/adaptive_mc.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
<?xml version="1.0" ?>
<Simulation verbosity="debug">
<TestInfo>
<name>framework/Samplers/Restart.AMC</name>
<author>talbpaul</author>
<created>2020-07-16</created>
<classesTested>Samplers.AdaptiveMonteCarlo</classesTested>
<description>
Tests restarting an Adaptive Monte Carlo sampling from restart. \texttt{makeCoarse} samples initial data, then \texttt{makeRestart}
makes additional samples, restarting from the first set of samples. \texttt{makeFine} does all the samples without restart
for comparison. The model for \texttt{coarse} always returns a value of 1, while the model for \texttt{restart} returns a value of 2, so
you can tell which samples came from which sampling strategy. Further, the solution export for
the restart should contain all the sample points, not merely the newly-sampled points.
</description>
</TestInfo>

<RunInfo>
<WorkingDir>AMC</WorkingDir>
<Sequence>makeCoarse,makeRestart,makeFine,print</Sequence>
</RunInfo>

<Steps>
<MultiRun name="makeCoarse">
<Input class="DataObjects" type="PointSet">dummyIN</Input>
<Model class="Models" type="ExternalModel">coarsemod</Model>
<Sampler class="Samplers" type="AdaptiveMonteCarlo">coarse</Sampler>
<SolutionExport class="DataObjects" type="PointSet">exportCoarse</SolutionExport>
<Output class="DataObjects" type="PointSet">solns</Output>
</MultiRun>
<MultiRun name="makeFine">
<Input class="DataObjects" type="PointSet">dummyIN</Input>
<Model class="Models" type="ExternalModel">finemod</Model>
<Sampler class="Samplers" type="AdaptiveMonteCarlo">fine</Sampler>
<SolutionExport class="DataObjects" type="PointSet">exportFine</SolutionExport>
<Output class="DataObjects" type="PointSet">solnsFine</Output>
</MultiRun>
<MultiRun name="makeRestart">
<Input class="DataObjects" type="PointSet">solns</Input>
<Model class="Models" type="ExternalModel">finemod</Model>
<Sampler class="Samplers" type="AdaptiveMonteCarlo">restart</Sampler>
<SolutionExport class="DataObjects" type="PointSet">exportRestart</SolutionExport>
<Output class="DataObjects" type="PointSet">solnsRestart</Output>
</MultiRun>
<IOStep name="print">
<Input class="DataObjects" type="PointSet">solns</Input>
<Input class="DataObjects" type="PointSet">solnsFine</Input>
<Input class="DataObjects" type="PointSet">solnsRestart</Input>
<Input class="DataObjects" type="PointSet">exportCoarse</Input>
<Input class="DataObjects" type="PointSet">exportFine</Input>
<Input class="DataObjects" type="PointSet">exportRestart</Input>
<Output class="OutStreams" type="Print">coarse</Output>
<Output class="OutStreams" type="Print">fine</Output>
<Output class="OutStreams" type="Print">restart</Output>
<Output class="OutStreams" type="Print">exp_coarse</Output>
<Output class="OutStreams" type="Print">exp_fine</Output>
<Output class="OutStreams" type="Print">exp_restart</Output>
</IOStep>
</Steps>

<Distributions>
<Uniform name="u1">
<lowerBound>1</lowerBound>
<upperBound>2</upperBound>
</Uniform>
<Uniform name="u2">
<lowerBound>2</lowerBound>
<upperBound>3</upperBound>
</Uniform>
</Distributions>

<Samplers>
<AdaptiveMonteCarlo name="coarse">
<TargetEvaluation class="DataObjects" type="PointSet">solns</TargetEvaluation>
<Convergence>
<limit>50</limit>
<persistence>5</persistence>
<forceIteration>False</forceIteration>
<expectedValue prefix="mean" tol="5e-1">x1</expectedValue>
</Convergence>
<initialSeed>42</initialSeed>
<variable name="x1">
<distribution>u1</distribution>
</variable>
<variable name="x2">
<distribution>u2</distribution>
</variable>
</AdaptiveMonteCarlo>
<AdaptiveMonteCarlo name="restart">
<TargetEvaluation class="DataObjects" type="PointSet">solnsRestart</TargetEvaluation>
<Convergence>
<limit>50</limit>
<persistence>5</persistence>
<forceIteration>False</forceIteration>
<expectedValue prefix="mean" tol="1e-1">x1</expectedValue>
</Convergence>
<initialSeed>42</initialSeed>
<variable name="x1">
<distribution>u1</distribution>
</variable>
<variable name="x2">
<distribution>u2</distribution>
</variable>
<Restart class="DataObjects" type="PointSet">solns</Restart>
</AdaptiveMonteCarlo>
<AdaptiveMonteCarlo name="fine">
<TargetEvaluation class="DataObjects" type="PointSet">solnsFine</TargetEvaluation>
<Convergence>
<limit>50</limit>
<persistence>5</persistence>
<forceIteration>False</forceIteration>
<expectedValue prefix="mean" tol="1e-1">x1</expectedValue>
</Convergence>
<initialSeed>42</initialSeed>
<variable name="x1">
<distribution>u1</distribution>
</variable>
<variable name="x2">
<distribution>u2</distribution>
</variable>
</AdaptiveMonteCarlo>
</Samplers>

<Models>
<Dummy name="MyDummy" subType=""/>
<ExternalModel ModuleToLoad="../coarse" name="coarsemod" subType="">
<variables>x1,x2,ans</variables>
</ExternalModel>
<ExternalModel ModuleToLoad="../fine" name="finemod" subType="">
<variables>x1,x2,ans</variables>
</ExternalModel>
</Models>

<DataObjects>
<PointSet name="dummyIN">
<Input>x1,x2</Input>
<Output>OutputPlaceHolder</Output>
</PointSet>
<PointSet name="solns">
<Input>x1,x2</Input>
<Output>ans</Output>
</PointSet>
<PointSet name="solnsFine">
<Input>x1,x2</Input>
<Output>ans</Output>
</PointSet>
<PointSet name="solnsRestart">
<Input>x1,x2</Input>
<Output>ans</Output>
</PointSet>
<PointSet name="exportCoarse">
<Input>solutionUpdate</Input>
<Output>mean_x1,mean_ste_x1</Output>
</PointSet>
<PointSet name="exportFine">
<Input>solutionUpdate</Input>
<Output>mean_x1,mean_ste_x1</Output>
</PointSet>
<PointSet name="exportRestart">
<Input>solutionUpdate</Input>
<Output>mean_x1,mean_ste_x1</Output>
</PointSet>
</DataObjects>

<OutStreams>
<Print name="coarse">
<type>csv</type>
<source>solns</source>
<what>input,output</what>
</Print>
<Print name="fine">
<type>csv</type>
<source>solnsFine</source>
<what>input,output</what>
</Print>
<Print name="restart">
<type>csv</type>
<source>solnsRestart</source>
<what>input,output</what>
</Print>
<Print name="exp_coarse">
<type>csv</type>
<source>exportCoarse</source>
<what>input,output</what>
</Print>
<Print name="exp_fine">
<type>csv</type>
<source>exportFine</source>
<what>input,output</what>
</Print>
<Print name="exp_restart">
<type>csv</type>
<source>exportRestart</source>
<what>input,output</what>
</Print>
</OutStreams>

</Simulation>
Loading