From 42e31aa2a55ed4e96ee97c754770797bf2316298 Mon Sep 17 00:00:00 2001 From: 4wc Date: Thu, 17 Oct 2024 10:35:31 -0400 Subject: [PATCH 1/2] Created a new argument parser class that allows for more options when modifying arguments for the Virac. Still uses argparse as the backbone, but starts with a dictionary that users can more easily edit for their specific beam line before creating the argparse parser. Updated the SNS linac, IDmp, and BTF to use this new class. --- virtaccl/EPICS_Server/ca_server.py | 10 +++ .../PyORBIT_Model/pyorbit_va_arguments.py | 26 ++++++ virtaccl/site/BTF/btf_virtual_accelerator.py | 57 +++++-------- virtaccl/site/SNS_IDmp/IDmp_maker.py | 4 +- .../site/SNS_IDmp/IDmp_virtual_accelerator.py | 60 ++++++-------- virtaccl/site/SNS_Linac/virtual_SNS_linac.py | 68 +++++++-------- virtaccl/virtual_accelerator.py | 82 ++++++++++++------- 7 files changed, 170 insertions(+), 137 deletions(-) create mode 100644 virtaccl/PyORBIT_Model/pyorbit_va_arguments.py diff --git a/virtaccl/EPICS_Server/ca_server.py b/virtaccl/EPICS_Server/ca_server.py index cb279b9..ea7919b 100644 --- a/virtaccl/EPICS_Server/ca_server.py +++ b/virtaccl/EPICS_Server/ca_server.py @@ -10,6 +10,16 @@ from pcaspy import SimpleServer from virtaccl.server import Server +from virtaccl.virtual_accelerator import VA_Parser + + +def add_epics_arguments(va_parser: VA_Parser) -> VA_Parser: + # Number (in seconds) that determine some delay parameter in the server. Not exactly sure how it works, so use at + # your own risk. + va_parser.add_argument('--ca_proc', default=0.1, type=float, + help='Number (in seconds) that determine some delay parameter in the server. Not exactly ' + 'sure how it works, so use at your own risk.') + return va_parser def to_epics_timestamp(t: datetime): diff --git a/virtaccl/PyORBIT_Model/pyorbit_va_arguments.py b/virtaccl/PyORBIT_Model/pyorbit_va_arguments.py new file mode 100644 index 0000000..a5d3e2a --- /dev/null +++ b/virtaccl/PyORBIT_Model/pyorbit_va_arguments.py @@ -0,0 +1,26 @@ +from virtaccl.virtual_accelerator import VA_Parser + + +def add_pyorbit_arguments(va_parser: VA_Parser) -> VA_Parser: + # Lattice xml input file and the sequences desired from that file. + va_parser.add_argument('--lattice', type=str, help='Pathname of lattice file.') + va_parser.add_argument("--start", default="MEBT", type=str, + help='Desired sequence of the lattice to start the model with.') + va_parser.add_argument("end", nargs='?', type=str, + help='Desired sequence of the lattice to end the model with.') + va_parser.add_argument('--space_charge', const=0.01, nargs='?', type=float, + help="Adds Uniform Ellipse Space Charge nodes to the lattice. The minimum distance in meters" + " between nodes can be specified; the default is 0.01m if no minimum is given. If the " + "argument is not used, no space charge nodes are added.") + + # Desired initial bunch file and the desired number of particles from that file. + va_parser.add_argument('--bunch', type=str, help='Pathname of input bunch file.') + va_parser.add_argument('--particle_number', default=1000, type=int, + help='Number of particles to use.') + va_parser.add_argument('--beam_current', default=38.0, type=float, + help='Initial beam current in mA.') + va_parser.add_argument('--save_bunch', const='end_bunch.dat', nargs='?', type=str, + help="Saves the bunch at the end of the lattice after each track in the given location. " + "If no location is given, the bunch is saved as 'end_bunch.dat' in the working " + "directory.") + return va_parser diff --git a/virtaccl/site/BTF/btf_virtual_accelerator.py b/virtaccl/site/BTF/btf_virtual_accelerator.py index 7130596..91bdced 100644 --- a/virtaccl/site/BTF/btf_virtual_accelerator.py +++ b/virtaccl/site/BTF/btf_virtual_accelerator.py @@ -1,17 +1,14 @@ -# Channel access server used to generate fake PV signals analogous to accelerator components. -# The main body of the script instantiates PVs from a file passed by command line argument. import json import math import os import sys import time -import argparse from pathlib import Path -from importlib.metadata import version from orbit.py_linac.lattice_modifications import Add_quad_apertures_to_lattice, Add_rfgap_apertures_to_lattice from virtaccl.PyORBIT_Model.pyorbit_child_nodes import BPMclass, FCclass, BCMclass +from virtaccl.PyORBIT_Model.pyorbit_va_arguments import add_pyorbit_arguments from virtaccl.site.BTF.orbit_model.btf_lattice_factory import PyORBIT_Lattice_Factory from orbit.core.bunch import Bunch @@ -25,49 +22,39 @@ from virtaccl.site.BTF.orbit_model.btf_child_nodes import BTF_Screenclass, BTF_Slitclass from virtaccl.PyORBIT_Model.pyorbit_lattice_controller import OrbitModel -from virtaccl.EPICS_Server.ca_server import EPICS_Server +from virtaccl.EPICS_Server.ca_server import EPICS_Server, add_epics_arguments -from virtaccl.virtual_accelerator import va_parser, virtual_accelerator +from virtaccl.virtual_accelerator import virtual_accelerator, VA_Parser def main(): loc = Path(__file__).parent - parser, va_version = va_parser() - parser.description = 'Run the SNS Linac PyORBIT virtual accelerator server. Version ' + va_version + va_parser = VA_Parser() + va_parser.description = 'Run the BTF PyORBIT virtual accelerator server.' + + va_parser = add_pyorbit_arguments(va_parser) + # Set the defaults for the PyORBIT model. + va_parser.change_argument_default('--lattice', loc / 'orbit_model/btf_lattice_straight.xml') + va_parser.change_argument_default('--start', 'MEBT1') + va_parser.change_argument_default('end', 'MEBT2') + va_parser.change_argument_default('--bunch', loc / 'orbit_model/parmteq_bunch_RFQ_output_1.00e+05.dat') + + va_parser = add_epics_arguments(va_parser) # Json file that contains a dictionary connecting EPICS name of devices with their associated element model names. - parser.add_argument('--file', '-f', default=loc / 'btf_config.json', type=str, - help='Pathname of config json file.') - - # Lattice xml input file and the sequences desired from that file. - parser.add_argument('--lattice', default=loc / 'orbit_model/btf_lattice_straight.xml', type=str, - help='Pathname of lattice file') - parser.add_argument("--start", default="MEBT1", type=str, - help='Desired sequence of the lattice to start the model with (default=MEBT1).') - parser.add_argument("end", nargs='?', default="MEBT2", type=str, - help='Desired sequence of the lattice to end the model with (default=MEBT2).') - - # Desired initial bunch file and the desired number of particles from that file. - parser.add_argument('--bunch', default=loc / 'orbit_model/parmteq_bunch_RFQ_output_1.00e+05.dat', type=str, - help='Pathname of input bunch file.') - parser.add_argument('--particle_number', default=10000, type=int, - help='Number of particles to use (default=1000).') - parser.add_argument('--beam_current', default=30.0, type=float, - help='Initial beam current in mA. (default=30.0).') - parser.add_argument('--save_bunch', const='end_bunch.dat', nargs='?', type=str, - help="Saves the bunch at the end of the lattice after each track in the given location. " - "If no location is given, the bunch is saved as 'end_bunch.dat' in the working directory. " - "(Default is that the bunch is not saved.)") + va_parser.add_argument('--config_file', '-f', default=loc / 'btf_config.json', type=str, + help='Pathname of config json file.') # Json file that contains a dictionary connecting EPICS name of devices with their phase offset. - parser.add_argument('--phase_offset', default=None, type=str, - help='Pathname of phase offset file.') + va_parser.add_argument('--phase_offset', default=None, type=str, + help='Pathname of phase offset file.') - args = parser.parse_args() + va_parser = va_parser.initialize_arguments() + args = va_parser.parse_args() debug = args.debug save_bunch = args.save_bunch - config_file = Path(args.file) + config_file = Path(args.config_file) with open(config_file, "r") as json_file: devices_dict = json.load(json_file) @@ -302,7 +289,7 @@ def main(): server = EPICS_Server() - virtual_accelerator(model, beam_line, server, parser) + virtual_accelerator(model, beam_line, server, va_parser) if __name__ == '__main__': diff --git a/virtaccl/site/SNS_IDmp/IDmp_maker.py b/virtaccl/site/SNS_IDmp/IDmp_maker.py index e7f14a7..e8a2577 100644 --- a/virtaccl/site/SNS_IDmp/IDmp_maker.py +++ b/virtaccl/site/SNS_IDmp/IDmp_maker.py @@ -100,8 +100,8 @@ def get_IDMP_lattice_and_bunch(particle_number=1000, x_off=0, xp_off=0, y_off=0, # Dump # Define the sequence and add the list of nodes to the sequence. - fodo = Sequence('FODO') - fodo.setNodes(list_of_nodes) + idmp = Sequence('IDmp') + idmp.setNodes(list_of_nodes) # Define the lattice, add the list of nodes to the lattice, and initialize the lattice. my_lattice = LinacAccLattice('My Lattice') diff --git a/virtaccl/site/SNS_IDmp/IDmp_virtual_accelerator.py b/virtaccl/site/SNS_IDmp/IDmp_virtual_accelerator.py index 57895e3..c8b2671 100644 --- a/virtaccl/site/SNS_IDmp/IDmp_virtual_accelerator.py +++ b/virtaccl/site/SNS_IDmp/IDmp_virtual_accelerator.py @@ -1,58 +1,52 @@ -# Channel access server used to generate fake PV signals analogous to accelerator components. -# The main body of the script instantiates PVs from a file passed by command line argument. import json from pathlib import Path from virtaccl.PyORBIT_Model.pyorbit_child_nodes import BPMclass, WSclass, ScreenClass +from virtaccl.PyORBIT_Model.pyorbit_va_arguments import add_pyorbit_arguments from virtaccl.site.SNS_Linac.virtual_devices import (Quadrupole, Corrector, Quadrupole_Power_Supply, Corrector_Power_Supply, WireScanner, BPM, P_BPM, Screen) from virtaccl.site.SNS_Linac.virtual_devices_SNS import SNS_Dummy_BCM, SNS_Dummy_ICS from virtaccl.PyORBIT_Model.pyorbit_lattice_controller import OrbitModel from virtaccl.beam_line import BeamLine -from virtaccl.EPICS_Server.ca_server import EPICS_Server -from virtaccl.virtual_accelerator import va_parser, virtual_accelerator +from virtaccl.EPICS_Server.ca_server import EPICS_Server, add_epics_arguments +from virtaccl.virtual_accelerator import virtual_accelerator, VA_Parser from virtaccl.site.SNS_IDmp.IDmp_maker import get_IDMP_lattice_and_bunch def main(): loc = Path(__file__).parent - parser, va_version = va_parser() - parser.description = 'Run the SNS Injection Dump PyORBIT virtual accelerator server. Version ' + va_version + va_parser = VA_Parser() + va_parser.set_description('Run the SNS Injection Dump PyORBIT virtual accelerator server.') + va_parser.change_argument_help('--print_settings', + "Will only print setting PVs. Will NOT run the virtual accelerator.") + va_parser.remove_argument('--print_server_keys') + va_parser.add_argument('--print_server_pvs', dest='print_keys', action='store_true', + help="Will print all server PVs. Will NOT run the virtual accelerator.") + + va_parser = add_pyorbit_arguments(va_parser) + # Set the defaults for the PyORBIT model. + va_parser.remove_argument('--lattice') + va_parser.remove_argument('--start') + va_parser.remove_argument('end') + va_parser.remove_argument('--bunch') + + va_parser = add_epics_arguments(va_parser) # Json file that contains a dictionary connecting EPICS name of devices with their associated element model names. - parser.add_argument('--file', '-f', default=loc / 'va_config.json', type=str, - help='Pathname of config json file.') - - # Lattice xml input file and the sequences desired from that file. - parser.add_argument('--lattice', default=loc / 'orbit_model/sns_linac.xml', type=str, - help='Pathname of lattice file') - parser.add_argument("--start", default="MEBT", type=str, - help='Desired sequence of the lattice to start the model with (default=MEBT).') - parser.add_argument("end", nargs='?', default="HEBT1", type=str, - help='Desired sequence of the lattice to end the model with (default=HEBT1).') - - # Desired initial bunch file and the desired number of particles from that file. - parser.add_argument('--bunch', default=loc / 'orbit_model/MEBT_in.dat', type=str, - help='Pathname of input bunch file.') - parser.add_argument('--particle_number', default=1000, type=int, - help='Number of particles to use (default=1000).') - parser.add_argument('--beam_current', default=38.0, type=float, - help='Initial beam current in mA. (default=38.0).') - parser.add_argument('--save_bunch', const='end_bunch.dat', nargs='?', type=str, - help="Saves the bunch at the end of the lattice after each track in the given location. " - "If no location is given, the bunch is saved as 'end_bunch.dat' in the working directory. " - "(Default is that the bunch is not saved.)") + va_parser.add_argument('--config_file', '-f', default=loc / 'va_config.json', type=str, + help='Pathname of config json file.') # Json file that contains a dictionary connecting EPICS name of devices with their phase offset. - parser.add_argument('--phase_offset', default=None, type=str, - help='Pathname of phase offset file.') + va_parser.add_argument('--phase_offset', default=None, type=str, + help='Pathname of phase offset file.') - args = parser.parse_args() + va_parser = va_parser.initialize_arguments() + args = va_parser.parse_args() debug = args.debug - config_file = Path(args.file) + config_file = Path(args.config_file) with open(config_file, "r") as json_file: devices_dict = json.load(json_file) @@ -135,7 +129,7 @@ def main(): server = EPICS_Server() - virtual_accelerator(model, beam_line, server, parser) + virtual_accelerator(model, beam_line, server, va_parser) print('Exiting. Thank you for using our virtual accelerator!') diff --git a/virtaccl/site/SNS_Linac/virtual_SNS_linac.py b/virtaccl/site/SNS_Linac/virtual_SNS_linac.py index 8212a9f..4f22f22 100644 --- a/virtaccl/site/SNS_Linac/virtual_SNS_linac.py +++ b/virtaccl/site/SNS_Linac/virtual_SNS_linac.py @@ -1,5 +1,3 @@ -# Channel access server used to generate fake PV signals analogous to accelerator components. -# The main body of the script instantiates PVs from a file passed by command line argument. import json import math from pathlib import Path @@ -15,56 +13,48 @@ WireScanner, Quadrupole_Power_Supply, Corrector_Power_Supply, Bend_Power_Supply, Bend, Quadrupole_Power_Shunt from virtaccl.site.SNS_Linac.virtual_devices_SNS import SNS_Dummy_BCM, SNS_Cavity, SNS_Dummy_ICS -from virtaccl.EPICS_Server.ca_server import EPICS_Server -from virtaccl.beam_line import BeamLine from virtaccl.PyORBIT_Model.pyorbit_lattice_controller import OrbitModel -from virtaccl.virtual_accelerator import va_parser, virtual_accelerator - +from virtaccl.PyORBIT_Model.pyorbit_va_arguments import add_pyorbit_arguments from virtaccl.PyORBIT_Model.pyorbit_child_nodes import BPMclass, WSclass +from virtaccl.EPICS_Server.ca_server import EPICS_Server, add_epics_arguments +from virtaccl.beam_line import BeamLine + +from virtaccl.virtual_accelerator import virtual_accelerator, VA_Parser + def main(): loc = Path(__file__).parent - parser, va_version = va_parser() - parser.description = 'Run the SNS linac PyORBIT virtual accelerator server. Version ' + va_version + va_parser = VA_Parser() + va_parser.set_description('Run the SNS linac PyORBIT virtual accelerator server.') + va_parser.change_argument_help('--print_settings', + "Will only print setting PVs. Will NOT run the virtual accelerator.") + va_parser.remove_argument('--print_server_keys') + va_parser.add_argument('--print_server_pvs', dest='print_keys', action='store_true', + help="Will print all server PVs. Will NOT run the virtual accelerator.") + + va_parser = add_pyorbit_arguments(va_parser) + # Set the defaults for the PyORBIT model. + va_parser.change_argument_default('--lattice', loc / 'orbit_model/sns_linac.xml') + va_parser.change_argument_default('end', 'HEBT1') + va_parser.change_argument_default('--bunch', loc / 'orbit_model/MEBT_in.dat') + + va_parser = add_epics_arguments(va_parser) # Json file that contains a dictionary connecting EPICS name of devices with their associated element model names. - parser.add_argument('--file', '-f', default=loc / 'va_config.json', type=str, - help='Pathname of config json file.') - - # Lattice xml input file and the sequences desired from that file. - parser.add_argument('--lattice', default=loc / 'orbit_model/sns_linac.xml', type=str, - help='Pathname of lattice file') - parser.add_argument("--start", default="MEBT", type=str, - help='Desired sequence of the lattice to start the model with (default=MEBT).') - parser.add_argument("end", nargs='?', default="HEBT1", type=str, - help='Desired sequence of the lattice to end the model with (default=HEBT1).') - parser.add_argument('--space_charge', const=0.01, nargs='?', type=float, - help="Adds Uniform Ellipse Space Charge nodes to the lattice. The minimum distance in meters " - "between nodes can be specified; the default is 0.01m if no minimum is given. If the " - "argument is not used, no space charge nodes are added.") - - # Desired initial bunch file and the desired number of particles from that file. - parser.add_argument('--bunch', default=loc / 'orbit_model/MEBT_in.dat', type=str, - help='Pathname of input bunch file.') - parser.add_argument('--particle_number', default=1000, type=int, - help='Number of particles to use (default=1000).') - parser.add_argument('--beam_current', default=38.0, type=float, - help='Initial beam current in mA. (default=38.0).') - parser.add_argument('--save_bunch', const='end_bunch.dat', nargs='?', type=str, - help="Saves the bunch at the end of the lattice after each track in the given location. " - "If no location is given, the bunch is saved as 'end_bunch.dat' in the working directory. " - "(Default is that the bunch is not saved.)") + va_parser.add_argument('--config_file', '-f', default=loc / 'va_config.json', type=str, + help='Pathname of config json file.') # Json file that contains a dictionary connecting EPICS name of devices with their phase offset. - parser.add_argument('--phase_offset', default=None, type=str, - help='Pathname of phase offset file.') + va_parser.add_argument('--phase_offset', default=None, type=str, + help='Pathname of phase offset file.') - args = parser.parse_args() + va_parser = va_parser.initialize_arguments() + args = va_parser.parse_args() debug = args.debug save_bunch = args.save_bunch - config_file = Path(args.file) + config_file = Path(args.config_file) with open(config_file, "r") as json_file: devices_dict = json.load(json_file) @@ -238,7 +228,7 @@ def main(): server = EPICS_Server() - virtual_accelerator(model, beam_line, server, parser) + virtual_accelerator(model, beam_line, server, va_parser) if __name__ == '__main__': diff --git a/virtaccl/virtual_accelerator.py b/virtaccl/virtual_accelerator.py index 0332743..adf6d46 100644 --- a/virtaccl/virtual_accelerator.py +++ b/virtaccl/virtual_accelerator.py @@ -1,46 +1,72 @@ -# Channel access server used to generate fake PV signals analogous to accelerator components. -# The main body of the script instantiates PVs from a file passed by command line argument. import os import sys import time import argparse from datetime import datetime from importlib.metadata import version +from typing import Dict, Any from virtaccl.server import Server, not_ctrlc from virtaccl.beam_line import BeamLine from virtaccl.model import Model -def va_parser(): - va_version = version('virtaccl') - parser = argparse.ArgumentParser( - description='Run the Virac virtual accelerator server. Version ' + va_version) - # parser.add_argument('--prefix', '-p', default='test', type=str, help='Prefix for PVs') +class VA_Parser: + def __init__(self): + self.arguments: Dict[str, Dict[str, Any]] = {} + self.version = version('virtaccl') + self.description = 'Run the Virac virtual accelerator server.' + add_va_arguments(self) + + def set_description(self, new_description: str): + self.description = new_description + + def add_argument(self, *args, **kwargs): + self.arguments[args[0]] = {'positional': args, 'optional': kwargs} + + def remove_argument(self, name: str): + del self.arguments[name] + + def edit_argument(self, name: str, argument_keyword: str, new_value: Any): + self.arguments[name]['optional'][argument_keyword] = new_value + + def change_argument_default(self, name: str, new_value: Any): + self.arguments[name]['optional']['default'] = new_value + + def change_argument_help(self, name: str, new_help: Any): + self.arguments[name]['optional']['help'] = new_help + + def initialize_arguments(self) -> argparse.ArgumentParser: + va_parser = argparse.ArgumentParser( + description=self.description + ' Version ' + self.version, + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + + for argument_name, argument_dict in self.arguments.items(): + va_parser.add_argument(*argument_dict['positional'], **argument_dict['optional']) + + return va_parser + + +def add_va_arguments(va_parser: VA_Parser) -> VA_Parser: # Number (in Hz) determining the update rate for the virtual accelerator. - parser.add_argument('--refresh_rate', default=1.0, type=float, - help='Rate (in Hz) at which the virtual accelerator updates (default=1.0).') - parser.add_argument('--sync_time', dest='sync_time', action='store_true', - help="Synchronize timestamps for PVs.") + va_parser.add_argument('--refresh_rate', default=1.0, type=float, + help='Rate (in Hz) at which the virtual accelerator updates.') + va_parser.add_argument('--sync_time', dest='sync_time', action='store_true', + help="Synchronize timestamps for server parameters.") # Desired amount of output. - parser.add_argument('--debug', dest='debug', action='store_true', - help="Some debug info will be printed.") - parser.add_argument('--production', dest='debug', action='store_false', - help="DEFAULT: No additional info printed.") - parser.add_argument('--print_settings', dest='print_settings', action='store_true', - help="Will only print setting PVs. Will NOT run the virtual accelerator. (Default is off)") - parser.add_argument('--print_pvs', dest='print_pvs', action='store_true', - help="Will print all PVs. Will NOT run the virtual accelerator. (Default is off)") + va_parser.add_argument('--debug', dest='debug', action='store_true', + help="Some debug info will be printed.") + va_parser.add_argument('--production', dest='debug', action='store_false', + help="DEFAULT: No additional info printed.") - # Number (in seconds) that determine some delay parameter in the server. Not exactly sure how it works, so use at - # your own risk. - parser.add_argument('--ca_proc', default=0.1, type=float, - help='Number (in seconds) that determine some delay parameter in the server. Not exactly sure ' - 'how it works, so use at your own risk. (Default=0.1)') + va_parser.add_argument('--print_settings', dest='print_settings', action='store_true', + help="Will only print setting parameters. Will NOT run the virtual accelerator.") + va_parser.add_argument('--print_server_keys', dest='print_keys', action='store_true', + help="Will print all server keys. Will NOT run the virtual accelerator.") - return parser, va_version + return va_parser def virtual_accelerator(model: Model, beam_line: BeamLine, server: Server, arguments: argparse.ArgumentParser): @@ -59,7 +85,7 @@ def virtual_accelerator(model: Model, beam_line: BeamLine, server: Server, argum for setting in beam_line.get_setting_keys(): print(setting) sys.exit() - elif args.print_pvs: + elif args.print_keys: for key in server.get_parameter_keys(): print(key) sys.exit() @@ -81,8 +107,8 @@ def virtual_accelerator(model: Model, beam_line: BeamLine, server: Server, argum if sync_time: now = datetime.now() - server_pvs = server.get_parameters() - beam_line.update_settings_from_server(server_pvs) + server_parameters = server.get_parameters() + beam_line.update_settings_from_server(server_parameters) new_optics = beam_line.get_model_optics() model.update_optics(new_optics) From dd03f27ee0dddd83bcfcb836150931f8b2e38629 Mon Sep 17 00:00:00 2001 From: 4wc Date: Thu, 17 Oct 2024 14:33:13 -0400 Subject: [PATCH 2/2] Updated the VA parser to now keep track of separate arguments for the main va, model, server, and custom arguments. Updated the SNS linac, IDmp, and the BTF with the new format. --- virtaccl/EPICS_Server/ca_server.py | 22 +++- .../PyORBIT_Model/pyorbit_va_arguments.py | 36 +++--- virtaccl/site/BTF/btf_virtual_accelerator.py | 64 +++++------ virtaccl/site/SNS_IDmp/IDmp_maker.py | 22 ++-- .../site/SNS_IDmp/IDmp_virtual_accelerator.py | 18 +-- virtaccl/site/SNS_Linac/virtual_SNS_linac.py | 16 ++- virtaccl/virtual_accelerator.py | 106 ++++++++++++------ 7 files changed, 172 insertions(+), 112 deletions(-) diff --git a/virtaccl/EPICS_Server/ca_server.py b/virtaccl/EPICS_Server/ca_server.py index ea7919b..bb273f3 100644 --- a/virtaccl/EPICS_Server/ca_server.py +++ b/virtaccl/EPICS_Server/ca_server.py @@ -1,9 +1,10 @@ +import sys from threading import Thread from datetime import datetime from time import sleep from math import floor -from typing import Any +from typing import Any, Dict from pcaspy import Driver from pcaspy.cas import epicsTimeStamp @@ -16,9 +17,12 @@ def add_epics_arguments(va_parser: VA_Parser) -> VA_Parser: # Number (in seconds) that determine some delay parameter in the server. Not exactly sure how it works, so use at # your own risk. - va_parser.add_argument('--ca_proc', default=0.1, type=float, - help='Number (in seconds) that determine some delay parameter in the server. Not exactly ' - 'sure how it works, so use at your own risk.') + va_parser.add_server_argument('--ca_proc', default=0.1, type=float, + help='Number (in seconds) that determine some delay parameter in the server. Not ' + 'exactly sure how it works, so use at your own risk.') + + va_parser.add_server_argument('--print_pvs', dest='print_pvs', action='store_true', + help="Will print all server PVs. Will NOT run the virtual accelerator.") return va_parser @@ -49,16 +53,24 @@ def setParam(self, reason, value, timestamp=None): class EPICS_Server(Server): - def __init__(self, prefix='', process_delay=0.1): + def __init__(self, prefix='', process_delay=0.1, print_pvs=False): super().__init__() self.prefix = prefix self.driver = None self.process_delay = process_delay + self.print_pvs = print_pvs def _CA_events(self, server): while True: server.process(self.process_delay) + def add_parameters(self, new_parameters: Dict[str, Dict[str, Any]]): + super().add_parameters(new_parameters) + if self.print_pvs: + for key in self.get_parameter_keys(): + print(key) + sys.exit() + def set_parameter(self, reason: str, value: Any, timestamp: datetime = None): if timestamp is not None: timestamp = epics_now(timestamp) diff --git a/virtaccl/PyORBIT_Model/pyorbit_va_arguments.py b/virtaccl/PyORBIT_Model/pyorbit_va_arguments.py index a5d3e2a..5d5e73f 100644 --- a/virtaccl/PyORBIT_Model/pyorbit_va_arguments.py +++ b/virtaccl/PyORBIT_Model/pyorbit_va_arguments.py @@ -3,24 +3,24 @@ def add_pyorbit_arguments(va_parser: VA_Parser) -> VA_Parser: # Lattice xml input file and the sequences desired from that file. - va_parser.add_argument('--lattice', type=str, help='Pathname of lattice file.') - va_parser.add_argument("--start", default="MEBT", type=str, - help='Desired sequence of the lattice to start the model with.') - va_parser.add_argument("end", nargs='?', type=str, - help='Desired sequence of the lattice to end the model with.') - va_parser.add_argument('--space_charge', const=0.01, nargs='?', type=float, - help="Adds Uniform Ellipse Space Charge nodes to the lattice. The minimum distance in meters" - " between nodes can be specified; the default is 0.01m if no minimum is given. If the " - "argument is not used, no space charge nodes are added.") + va_parser.add_model_argument('--lattice', type=str, help='Pathname of lattice file.') + va_parser.add_model_argument("--start", default="MEBT", type=str, + help='Desired sequence of the lattice to start the model with.') + va_parser.add_model_argument("end", nargs='?', type=str, + help='Desired sequence of the lattice to end the model with.') + va_parser.add_model_argument('--space_charge', const=0.01, nargs='?', type=float, + help="Adds Uniform Ellipse Space Charge nodes to the lattice. The minimum distance " + "in meters between nodes can be specified; the default is 0.01m if no minimum " + "is given. If the argument is not used, no space charge nodes are added.") # Desired initial bunch file and the desired number of particles from that file. - va_parser.add_argument('--bunch', type=str, help='Pathname of input bunch file.') - va_parser.add_argument('--particle_number', default=1000, type=int, - help='Number of particles to use.') - va_parser.add_argument('--beam_current', default=38.0, type=float, - help='Initial beam current in mA.') - va_parser.add_argument('--save_bunch', const='end_bunch.dat', nargs='?', type=str, - help="Saves the bunch at the end of the lattice after each track in the given location. " - "If no location is given, the bunch is saved as 'end_bunch.dat' in the working " - "directory.") + va_parser.add_model_argument('--bunch', type=str, help='Pathname of input bunch file.') + va_parser.add_model_argument('--particle_number', default=1000, type=int, + help='Number of particles to use.') + va_parser.add_model_argument('--beam_current', default=38.0, type=float, + help='Initial beam current in mA.') + va_parser.add_model_argument('--save_bunch', const='end_bunch.dat', nargs='?', type=str, + help="Saves the bunch at the end of the lattice after each track in the given " + "location. If no location is given, the bunch is saved as 'end_bunch.dat' in " + "the working directory.") return va_parser diff --git a/virtaccl/site/BTF/btf_virtual_accelerator.py b/virtaccl/site/BTF/btf_virtual_accelerator.py index 02eb376..8c16a0d 100644 --- a/virtaccl/site/BTF/btf_virtual_accelerator.py +++ b/virtaccl/site/BTF/btf_virtual_accelerator.py @@ -12,6 +12,7 @@ from orbit.py_linac.lattice_modifications import Add_quad_apertures_to_lattice, Add_rfgap_apertures_to_lattice from virtaccl.PyORBIT_Model.pyorbit_child_nodes import BPMclass, FCclass, BCMclass +from virtaccl.PyORBIT_Model.pyorbit_va_arguments import add_pyorbit_arguments from virtaccl.site.BTF.orbit_model.btf_lattice_factory import PyORBIT_Lattice_Factory from orbit.core.bunch import Bunch @@ -25,49 +26,42 @@ from virtaccl.site.BTF.orbit_model.btf_child_nodes import BTF_Screenclass, BTF_Slitclass from virtaccl.PyORBIT_Model.pyorbit_lattice_controller import OrbitModel -from virtaccl.EPICS_Server.ca_server import EPICS_Server +from virtaccl.EPICS_Server.ca_server import EPICS_Server, add_epics_arguments -from virtaccl.virtual_accelerator import va_parser, virtual_accelerator +from virtaccl.virtual_accelerator import virtual_accelerator, VA_Parser def main(): loc = Path(__file__).parent - parser, va_version = va_parser() - parser.description = 'Run the SNS Linac PyORBIT virtual accelerator server. Version ' + va_version + va_parser = VA_Parser() + va_parser.set_description('Run the BTF PyORBIT virtual accelerator server.') + + va_parser = add_pyorbit_arguments(va_parser) + # Set the defaults for the PyORBIT model. + va_parser.change_argument_default('--lattice', loc / 'orbit_model/btf_lattice_straight.xml') + va_parser.change_argument_default('--start', 'MEBT1') + va_parser.change_argument_default('end', 'MEBT2') + va_parser.change_argument_default('--bunch', loc / 'orbit_model/parmteq_bunch_RFQ_output_1.00e+05.dat') + va_parser.change_argument_default('--beam_current', 50.0) + + va_parser = add_epics_arguments(va_parser) + va_parser.add_server_argument('--print_settings', action='store_true', + help="Will only print setting PVs. Will NOT run the virtual accelerator.") # Json file that contains a dictionary connecting EPICS name of devices with their associated element model names. - parser.add_argument('--file', '-f', default=loc / 'btf_config.json', type=str, - help='Pathname of config json file.') - - # Lattice xml input file and the sequences desired from that file. - parser.add_argument('--lattice', default=loc / 'orbit_model/btf_lattice_straight.xml', type=str, - help='Pathname of lattice file') - parser.add_argument("--start", default="MEBT1", type=str, - help='Desired sequence of the lattice to start the model with (default=MEBT1).') - parser.add_argument("end", nargs='?', default="MEBT2", type=str, - help='Desired sequence of the lattice to end the model with (default=MEBT2).') - - # Desired initial bunch file and the desired number of particles from that file. - parser.add_argument('--bunch', default=loc / 'orbit_model/parmteq_bunch_RFQ_output_1.00e+05.dat', type=str, - help='Pathname of input bunch file.') - parser.add_argument('--particle_number', default=10000, type=int, - help='Number of particles to use (default=1000).') - parser.add_argument('--beam_current', default=50.0, type=float, - help='Initial beam current in mA. (default=30.0).') - parser.add_argument('--save_bunch', const='end_bunch.dat', nargs='?', type=str, - help="Saves the bunch at the end of the lattice after each track in the given location. " - "If no location is given, the bunch is saved as 'end_bunch.dat' in the working directory. " - "(Default is that the bunch is not saved.)") + va_parser.add_argument('--config_file', '-f', default=loc / 'btf_config.json', type=str, + help='Pathname of config json file.') # Json file that contains a dictionary connecting EPICS name of devices with their phase offset. - parser.add_argument('--phase_offset', default=None, type=str, - help='Pathname of phase offset file.') + va_parser.add_argument('--phase_offset', default=None, type=str, + help='Pathname of phase offset file.') - args = parser.parse_args() + va_parser = va_parser.initialize_arguments() + args = va_parser.parse_args() debug = args.debug save_bunch = args.save_bunch - config_file = Path(args.file) + config_file = Path(args.config_file) with open(config_file, "r") as json_file: devices_dict = json.load(json_file) @@ -300,9 +294,15 @@ def main(): slit_device = BTF_Actuator(name, ele_name, speed=speed, limit=limit) beam_line.add_device(slit_device) - server = EPICS_Server() + if args.print_settings: + for key in beam_line.get_setting_keys(): + print(key) + sys.exit() + + delay = args.ca_proc + server = EPICS_Server(process_delay=delay, print_pvs=args.print_pvs) - virtual_accelerator(model, beam_line, server, parser) + virtual_accelerator(model, beam_line, server, va_parser) if __name__ == '__main__': diff --git a/virtaccl/site/SNS_IDmp/IDmp_maker.py b/virtaccl/site/SNS_IDmp/IDmp_maker.py index e8a2577..c632a68 100644 --- a/virtaccl/site/SNS_IDmp/IDmp_maker.py +++ b/virtaccl/site/SNS_IDmp/IDmp_maker.py @@ -16,7 +16,7 @@ from virtaccl.PyORBIT_Model.bunch_generator import BunchGenerator -def get_IDMP_lattice_and_bunch(particle_number=1000, x_off=0, xp_off=0, y_off=0, yp_off=0): +def get_IDMP_lattice_and_bunch(particle_number=1000, x_off=0, xp_off=0, y_off=0, yp_off=0, debug: bool = False): # Field strength and length of the quadrupoles quad_field = 0.5 dch_field = 0.01 @@ -107,7 +107,8 @@ def get_IDMP_lattice_and_bunch(particle_number=1000, x_off=0, xp_off=0, y_off=0, my_lattice = LinacAccLattice('My Lattice') my_lattice.setNodes(list_of_nodes) my_lattice.initialize() - print("Total length=", my_lattice.getLength()) + if debug: + print("Total length=", my_lattice.getLength()) # -----TWISS Parameters at the entrance of MEBT --------------- # transverse emittances are unnormalized and in pi*mm*mrad @@ -116,8 +117,9 @@ def get_IDMP_lattice_and_bunch(particle_number=1000, x_off=0, xp_off=0, y_off=0, mass = 0.939294 # in [GeV] gamma = (mass + e_kin_ini) / mass beta = math.sqrt(gamma * gamma - 1.0) / gamma - print("relat. gamma=", gamma) - print("relat. beta=", beta) + if debug: + print("relat. gamma=", gamma) + print("relat. beta=", beta) frequency = 402.5e6 v_light = 2.99792458e8 # in [m/sec] @@ -144,16 +146,18 @@ def get_IDMP_lattice_and_bunch(particle_number=1000, x_off=0, xp_off=0, y_off=0, emittZ = emittZ * gamma ** 3 * beta ** 2 * mass betaZ = betaZ / (gamma ** 3 * beta ** 2 * mass) - print(" ========= PyORBIT Twiss ===========") - print(" aplha beta emitt[mm*mrad] X= %6.4f %6.4f %6.4f " % (alphaX, betaX, emittX * 1.0e6)) - print(" aplha beta emitt[mm*mrad] Y= %6.4f %6.4f %6.4f " % (alphaY, betaY, emittY * 1.0e6)) - print(" aplha beta emitt[mm*MeV] Z= %6.4f %6.4f %6.4f " % (alphaZ, betaZ, emittZ * 1.0e6)) + if debug: + print(" ========= PyORBIT Twiss ===========") + print(" aplha beta emitt[mm*mrad] X= %6.4f %6.4f %6.4f " % (alphaX, betaX, emittX * 1.0e6)) + print(" aplha beta emitt[mm*mrad] Y= %6.4f %6.4f %6.4f " % (alphaY, betaY, emittY * 1.0e6)) + print(" aplha beta emitt[mm*MeV] Z= %6.4f %6.4f %6.4f " % (alphaZ, betaZ, emittZ * 1.0e6)) twissX = TwissContainer(alphaX, betaX, emittX) twissY = TwissContainer(alphaY, betaY, emittY) twissZ = TwissContainer(alphaZ, betaZ, emittZ) - print("Start Bunch Generation.") + if debug: + print("Start Bunch Generation.") bunch_gen = BunchGenerator(twissX, twissY, twissZ) # set the initial kinetic energy in GeV diff --git a/virtaccl/site/SNS_IDmp/IDmp_virtual_accelerator.py b/virtaccl/site/SNS_IDmp/IDmp_virtual_accelerator.py index c8b2671..8c597d4 100644 --- a/virtaccl/site/SNS_IDmp/IDmp_virtual_accelerator.py +++ b/virtaccl/site/SNS_IDmp/IDmp_virtual_accelerator.py @@ -1,4 +1,5 @@ import json +import sys from pathlib import Path from virtaccl.PyORBIT_Model.pyorbit_child_nodes import BPMclass, WSclass, ScreenClass @@ -19,11 +20,6 @@ def main(): loc = Path(__file__).parent va_parser = VA_Parser() va_parser.set_description('Run the SNS Injection Dump PyORBIT virtual accelerator server.') - va_parser.change_argument_help('--print_settings', - "Will only print setting PVs. Will NOT run the virtual accelerator.") - va_parser.remove_argument('--print_server_keys') - va_parser.add_argument('--print_server_pvs', dest='print_keys', action='store_true', - help="Will print all server PVs. Will NOT run the virtual accelerator.") va_parser = add_pyorbit_arguments(va_parser) # Set the defaults for the PyORBIT model. @@ -33,6 +29,8 @@ def main(): va_parser.remove_argument('--bunch') va_parser = add_epics_arguments(va_parser) + va_parser.add_server_argument('--print_settings', action='store_true', + help="Will only print setting PVs. Will NOT run the virtual accelerator.") # Json file that contains a dictionary connecting EPICS name of devices with their associated element model names. va_parser.add_argument('--config_file', '-f', default=loc / 'va_config.json', type=str, @@ -51,7 +49,7 @@ def main(): devices_dict = json.load(json_file) part_num = args.particle_number - lattice, bunch = get_IDMP_lattice_and_bunch(part_num, x_off=2, xp_off=0.3) + lattice, bunch = get_IDMP_lattice_and_bunch(part_num, x_off=2, xp_off=0.3, debug=debug) model = OrbitModel(input_bunch=bunch, debug=debug) model.define_custom_node(BPMclass.node_type, BPMclass.parameter_list, diagnostic=True) model.define_custom_node(WSclass.node_type, WSclass.parameter_list, diagnostic=True) @@ -127,7 +125,13 @@ def main(): dummy_device = SNS_Dummy_ICS("ICS_Tim") beam_line.add_device(dummy_device) - server = EPICS_Server() + if args.print_settings: + for key in beam_line.get_setting_keys(): + print(key) + sys.exit() + + delay = args.ca_proc + server = EPICS_Server(process_delay=delay, print_pvs=args.print_pvs) virtual_accelerator(model, beam_line, server, va_parser) diff --git a/virtaccl/site/SNS_Linac/virtual_SNS_linac.py b/virtaccl/site/SNS_Linac/virtual_SNS_linac.py index 4f22f22..0a81296 100644 --- a/virtaccl/site/SNS_Linac/virtual_SNS_linac.py +++ b/virtaccl/site/SNS_Linac/virtual_SNS_linac.py @@ -1,5 +1,6 @@ import json import math +import sys from pathlib import Path from orbit.lattice import AccNode @@ -27,11 +28,6 @@ def main(): loc = Path(__file__).parent va_parser = VA_Parser() va_parser.set_description('Run the SNS linac PyORBIT virtual accelerator server.') - va_parser.change_argument_help('--print_settings', - "Will only print setting PVs. Will NOT run the virtual accelerator.") - va_parser.remove_argument('--print_server_keys') - va_parser.add_argument('--print_server_pvs', dest='print_keys', action='store_true', - help="Will print all server PVs. Will NOT run the virtual accelerator.") va_parser = add_pyorbit_arguments(va_parser) # Set the defaults for the PyORBIT model. @@ -40,6 +36,8 @@ def main(): va_parser.change_argument_default('--bunch', loc / 'orbit_model/MEBT_in.dat') va_parser = add_epics_arguments(va_parser) + va_parser.add_server_argument('--print_settings', action='store_true', + help="Will only print setting PVs. Will NOT run the virtual accelerator.") # Json file that contains a dictionary connecting EPICS name of devices with their associated element model names. va_parser.add_argument('--config_file', '-f', default=loc / 'va_config.json', type=str, @@ -226,7 +224,13 @@ def main(): dummy_device = SNS_Dummy_ICS("ICS_Tim") beam_line.add_device(dummy_device) - server = EPICS_Server() + if args.print_settings: + for key in beam_line.get_setting_keys(): + print(key) + sys.exit() + + delay = args.ca_proc + server = EPICS_Server(process_delay=delay, print_pvs=args.print_pvs) virtual_accelerator(model, beam_line, server, va_parser) diff --git a/virtaccl/virtual_accelerator.py b/virtaccl/virtual_accelerator.py index adf6d46..b74bd2f 100644 --- a/virtaccl/virtual_accelerator.py +++ b/virtaccl/virtual_accelerator.py @@ -13,59 +13,106 @@ class VA_Parser: def __init__(self): - self.arguments: Dict[str, Dict[str, Any]] = {} + self._va_arguments_: Dict[str, Dict[str, Any]] = {} + self.model_arguments: Dict[str, Dict[str, Any]] = {} + self.server_arguments: Dict[str, Dict[str, Any]] = {} + self.custom_arguments: Dict[str, Dict[str, Any]] = {} + self.__all_arguments__ = {'va': self._va_arguments_, 'model': self.model_arguments, + 'server': self.server_arguments, 'custom': self.custom_arguments} + self.__all_argument_keys__ = set() + self.version = version('virtaccl') self.description = 'Run the Virac virtual accelerator server.' add_va_arguments(self) + def __find_argument_dict__(self, name) -> Dict[str, Dict[str, Any]]: + for argument_group, arguments in self.__all_arguments__.items(): + if name in arguments: + return arguments + def set_description(self, new_description: str): self.description = new_description def add_argument(self, *args, **kwargs): - self.arguments[args[0]] = {'positional': args, 'optional': kwargs} + arg_key = args[0] + if arg_key in self.__all_argument_keys__: + print(f'Warning: Argument name "{arg_key}" already exists. Argument not added.') + else: + self.custom_arguments[arg_key] = {'positional': args, 'optional': kwargs} + self.__all_argument_keys__.add(arg_key) + + def add_va_argument(self, *args, **kwargs): + arg_key = args[0] + if arg_key in self.__all_argument_keys__: + print(f'Warning: Argument name "{arg_key}" already exists. Argument not added.') + else: + self._va_arguments_[arg_key] = {'positional': args, 'optional': kwargs} + self.__all_argument_keys__.add(arg_key) + + def add_model_argument(self, *args, **kwargs): + arg_key = args[0] + if arg_key in self.__all_argument_keys__: + print(f'Warning: Argument name "{arg_key}" already exists. Argument not added.') + else: + self.custom_arguments[arg_key] = {'positional': args, 'optional': kwargs} + self.__all_argument_keys__.add(arg_key) + + def add_server_argument(self, *args, **kwargs): + arg_key = args[0] + if arg_key in self.__all_argument_keys__: + print(f'Warning: Argument name "{arg_key}" already exists. Argument not added.') + else: + self.custom_arguments[arg_key] = {'positional': args, 'optional': kwargs} + self.__all_argument_keys__.add(arg_key) def remove_argument(self, name: str): - del self.arguments[name] + if name not in self.__all_argument_keys__: + print(f'Warning: Argument name "{name}" was not found.') + else: + arguments = self.__find_argument_dict__(name) + del arguments[name] + self.__all_argument_keys__.remove(name) - def edit_argument(self, name: str, argument_keyword: str, new_value: Any): - self.arguments[name]['optional'][argument_keyword] = new_value + def edit_argument(self, name: str, new_options: Dict[str, Any]): + if name not in self.__all_argument_keys__: + print(f'Warning: Argument name "{name}" was not found.') + else: + arguments = self.__find_argument_dict__(name) + for option_key, new_value in new_options.items(): + arguments[name]['optional'][option_key] = new_value def change_argument_default(self, name: str, new_value: Any): - self.arguments[name]['optional']['default'] = new_value + arguments = self.__find_argument_dict__(name) + arguments[name]['optional']['default'] = new_value def change_argument_help(self, name: str, new_help: Any): - self.arguments[name]['optional']['help'] = new_help + arguments = self.__find_argument_dict__(name) + arguments[name]['optional']['help'] = new_help def initialize_arguments(self) -> argparse.ArgumentParser: va_parser = argparse.ArgumentParser( description=self.description + ' Version ' + self.version, formatter_class=argparse.ArgumentDefaultsHelpFormatter) - for argument_name, argument_dict in self.arguments.items(): - va_parser.add_argument(*argument_dict['positional'], **argument_dict['optional']) - + for group_key, argument_group in self.__all_arguments__.items(): + for argument_name, argument_dict in argument_group.items(): + va_parser.add_argument(*argument_dict['positional'], **argument_dict['optional']) return va_parser def add_va_arguments(va_parser: VA_Parser) -> VA_Parser: # Number (in Hz) determining the update rate for the virtual accelerator. - va_parser.add_argument('--refresh_rate', default=1.0, type=float, - help='Rate (in Hz) at which the virtual accelerator updates.') - va_parser.add_argument('--sync_time', dest='sync_time', action='store_true', - help="Synchronize timestamps for server parameters.") + va_parser.add_va_argument('--refresh_rate', default=1.0, type=float, + help='Rate (in Hz) at which the virtual accelerator updates.') + va_parser.add_va_argument('--sync_time', dest='sync_time', action='store_true', + help="Synchronize timestamps for server parameters.") # Desired amount of output. - va_parser.add_argument('--debug', dest='debug', action='store_true', - help="Some debug info will be printed.") - va_parser.add_argument('--production', dest='debug', action='store_false', - help="DEFAULT: No additional info printed.") - - va_parser.add_argument('--print_settings', dest='print_settings', action='store_true', - help="Will only print setting parameters. Will NOT run the virtual accelerator.") - va_parser.add_argument('--print_server_keys', dest='print_keys', action='store_true', - help="Will print all server keys. Will NOT run the virtual accelerator.") - + va_parser.add_va_argument('--debug', dest='debug', action='store_true', + help="Some debug info will be printed.") + va_parser.add_va_argument('--production', dest='debug', action='store_false', + help="DEFAULT: No additional info printed.") return va_parser @@ -81,17 +128,6 @@ def virtual_accelerator(model: Model, beam_line: BeamLine, server: Server, argum sever_parameters = beam_line.get_server_parameter_definitions() server.add_parameters(sever_parameters) - if args.print_settings: - for setting in beam_line.get_setting_keys(): - print(setting) - sys.exit() - elif args.print_keys: - for key in server.get_parameter_keys(): - print(key) - sys.exit() - - delay = args.ca_proc - server.process_delay = delay if debug: print(server)