diff --git a/lcls_tools/superconducting/scLinac.py b/lcls_tools/superconducting/sc_linac.py similarity index 79% rename from lcls_tools/superconducting/scLinac.py rename to lcls_tools/superconducting/sc_linac.py index 8519e8a9..f6354a62 100644 --- a/lcls_tools/superconducting/scLinac.py +++ b/lcls_tools/superconducting/sc_linac.py @@ -14,11 +14,26 @@ class SSA(utils.SCLinacObject): + """ + Python representation of LCLS II SSAs. This class provides utility functions + for powering off/on, resetting, and calibrating + + """ + def __init__(self, cavity): # type: (Cavity) -> None + """ + @param cavity: the cavity object powered by this SSA + """ + self.cavity: Cavity = cavity self._pv_prefix = self.cavity.pv_addr("SSA:") + # HL cryomodules only have 4 physical SSAs such that they each power two + # cavities (SSA 1 powers cavities 1 and 5, SSA 2 powers cavities 2 and 6, + # etc.) Because of that, HL cavities have a subset of shared SSA + # controls (on/off/reset/voltage). HL SSAs also have a lower expected + # forward power. if self.cavity.cryomodule.is_harmonic_linearizer: cavity_num = utils.HL_SSA_MAP[self.cavity.number] self.hl_prefix = "ACCL:{LINAC}:{CRYOMODULE}{CAVITY}0:SSA:".format( @@ -82,6 +97,12 @@ def pv_prefix(self): return self._pv_prefix def pv_addr(self, suffix: str): + """ + @param suffix: The specific PV signal to be appended to the appropriate + prefix (that will depend on if the signal is shared + between HL SSAs) + @return: Full PV address of the form ACCL:L{X}B:{CM}{CAV}0:SSA:SUFFIX + """ if ( self.cavity.cryomodule.is_harmonic_linearizer and suffix in utils.HL_SSA_SHARED_PVS @@ -134,21 +155,26 @@ def drive_max(self, value: float): self._drive_max_setpoint_pv_obj = PV(self.drive_max_setpoint_pv) self._drive_max_setpoint_pv_obj.put(value) - def calibrate(self, drivemax, attempt=0): - print(f"Trying {self} calibration with drivemax {drivemax}") - if drivemax < 0.4: + def calibrate(self, drive_max, attempt=0): + """ + @param drive_max: drive max to use for SSA calibration + @param attempt: recursively incremented upon calibration failure + @return: None + """ + print(f"Trying {self} calibration with drive max {drive_max}") + if drive_max < 0.4: raise utils.SSACalibrationError(f"Requested {self} drive max too low") print(f"Setting {self} max drive") - self.drive_max = drivemax + self.drive_max = drive_max try: self.cavity.check_abort() - self.runCalibration() + self.run_calibration() except (utils.SSACalibrationToleranceError, utils.SSACalibrationError) as e: if attempt < 3: - self.calibrate(drivemax, attempt + 1) + self.calibrate(drive_max, attempt + 1) else: raise utils.SSACalibrationError(e) @@ -172,8 +198,11 @@ def turn_on_pv_obj(self) -> PV: def turn_on(self): if not self.is_on: - print(f"Turning {self} on") + # Check to see if SSA is hard faulted first (self.reset() tries a set + # number of times before raising an error) self.reset() + + print(f"Turning {self} on") self.turn_on_pv_obj.put(1) while not self.is_on: @@ -220,8 +249,10 @@ def reset(self): while self.is_resetting: sleep(1) - if self.is_faulted and reset_attempt > 2: - raise utils.SSAFaultError(f"{self} failed to reset 3x") + if self.is_faulted and reset_attempt >= utils.INTERLOCK_RESET_ATTEMPTS: + raise utils.SSAFaultError( + f"{self} failed to reset {utils.INTERLOCK_RESET_ATTEMPTS}x" + ) reset_attempt += 1 @@ -256,12 +287,15 @@ def cal_result_status_pv_obj(self) -> PV: def calibration_result_good(self) -> bool: return self.cal_result_status_pv_obj.get() == utils.SSA_RESULT_GOOD_STATUS_VALUE - def runCalibration(self, save_slope: bool = False): + def run_calibration(self, save_slope: bool = False): """ Runs the SSA through its range and finds the slope that describes the relationship between SSA drive signal and output power - :return: + @param save_slope: Whether to update the saved slope PV with the newly + calculated value or not + @return: None """ + self.reset() self.turn_on() @@ -314,8 +348,17 @@ def measured_slope_in_tolerance(self) -> bool: class StepperTuner(utils.SCLinacObject): + """ + Python representation of LCLS II stepper tuners. This class provides wrappers + for common stepper controls including sending move commands, checking movement + status, and retrieving stored movement parameters + """ + def __init__(self, cavity): # type (Cavity) -> None + """ + @param cavity: the cavity object tuned by this stepper + """ self.cavity: Cavity = cavity self._pv_prefix: str = self.cavity.pv_addr("STEP:") @@ -383,6 +426,12 @@ def hz_per_microstep(self): return abs(self.hz_per_microstep_pv_obj.get()) def check_abort(self): + """ + This function raises an error if either a stepper abort or a cavity abort + has been requested. + @return: None + """ + self.cavity.check_abort() if self.abort_flag: self.abort() self.abort_flag = False @@ -475,63 +524,73 @@ def speed(self): def speed(self, value: int): self.speed_pv_obj.put(value) - def restoreDefaults(self): + def restore_defaults(self): self.max_steps = utils.DEFAULT_STEPPER_MAX_STEPS self.speed = utils.DEFAULT_STEPPER_SPEED def move( self, - numSteps: int, - maxSteps: int = utils.DEFAULT_STEPPER_MAX_STEPS, + num_steps: int, + max_steps: int = utils.DEFAULT_STEPPER_MAX_STEPS, speed: int = utils.DEFAULT_STEPPER_SPEED, - changeLimits: bool = True, + change_limits: bool = True, check_detune: bool = True, ): """ - :param numSteps: positive for increasing cavity length, negative for decreasing - :param maxSteps: the maximum number of steps allowed at once + :param num_steps: positive for increasing cavity length, negative for decreasing + :param max_steps: the maximum number of steps allowed at once :param speed: the speed of the motor in steps/second - :param changeLimits: whether to change the speed and steps + :param change_limits: whether to change the speed and steps :param check_detune: whether to check for valid detune after each move - :return: + :return: None """ self.check_abort() - maxSteps = abs(maxSteps) + max_steps = abs(max_steps) - if changeLimits: + if change_limits: # on the off chance that someone tries to write a negative maximum - self.max_steps = maxSteps + self.max_steps = max_steps # make sure that we don't exceed the speed limit as defined by the tuner experts self.speed = ( speed if speed < utils.MAX_STEPPER_SPEED else utils.MAX_STEPPER_SPEED ) - if abs(numSteps) <= maxSteps: - print(f"{self.cavity} {abs(numSteps)} steps <= {maxSteps} max") - self.step_des = abs(numSteps) - self.issueMoveCommand(numSteps, check_detune=check_detune) - self.restoreDefaults() + if abs(num_steps) <= max_steps: + print(f"{self.cavity} {abs(num_steps)} steps <= {max_steps} max") + self.step_des = abs(num_steps) + self.issue_move_command(num_steps, check_detune=check_detune) + self.restore_defaults() else: - print(f"{self.cavity} {abs(numSteps)} steps > {maxSteps} max") - self.step_des = maxSteps - self.issueMoveCommand(numSteps, check_detune=check_detune) - print(f"{self.cavity} moving {numSteps - (sign(numSteps) * maxSteps)}") + print(f"{self.cavity} {abs(num_steps)} steps > {max_steps} max") + self.step_des = max_steps + self.issue_move_command(num_steps, check_detune=check_detune) + print(f"{self.cavity} moving {num_steps - (sign(num_steps) * max_steps)}") self.move( - numSteps - (sign(numSteps) * maxSteps), - maxSteps, + num_steps - (sign(num_steps) * max_steps), + max_steps, speed, - changeLimits=False, + change_limits=False, check_detune=check_detune, ) - def issueMoveCommand(self, numSteps: int, check_detune: bool = True): + def issue_move_command(self, num_steps: int, check_detune: bool = True): + """ + Determine whether to move positive or negative depending on the requested + number of steps + @param num_steps: Signed number of steps to move the stepper + @param check_detune: Whether to check for a valid detune during move + (this should only be false when we cannot see + cavity frequency, i.e. when we are not at 2 K) + @return: None + """ + # this is necessary because the tuners for the HLs move the other direction if self.cavity.cryomodule.is_harmonic_linearizer: - numSteps *= -1 + num_steps *= -1 - if sign(numSteps) == 1: + if sign(num_steps) == 1: self.move_positive() else: self.move_negative() @@ -554,8 +613,18 @@ def issueMoveCommand(self, numSteps: int, check_detune: bool = True): class Piezo(utils.SCLinacObject): + """ + Python representation of LCLS II piezo tuners. This class provides utility + functions for toggling feedback mode and changing bias voltage and DC offset + + """ + def __init__(self, cavity): # type (Cavity) -> None + """ + @param cavity: The cavity object tuned by this piezo + """ + self.cavity: Cavity = cavity self._pv_prefix: str = self.cavity.pv_addr("PZT:") @@ -708,26 +777,28 @@ def disable_feedback(self): class Cavity(utils.SCLinacObject): + """ + Python representation of LCLS II cavities. This class provides utility + functions for commonly used tasks including powering on/off, changing RF mode, + setting amplitude, characterizing, and tuning to resonance + + """ + def __init__( self, - cavityNum, - rackObject, - ssaClass=SSA, - stepperClass=StepperTuner, - piezoClass=Piezo, + cavity_num, + rack_object, ): - # type: (int, Rack, Type[SSA], Type[StepperTuner], Type[Piezo]) -> None + # type: (int, Rack) -> None """ - Parameters - ---------- - cavityNum: int cavity number i.e. 1 - 8 - rackObject: the rack object the cavities belong to + @param cavity_num: int cavity number i.e. 1 - 8 + @param rack_object: the rack object the cavities belong to """ - self.number = cavityNum - self.rack: Rack = rackObject + self.number: int = cavity_num + self.rack: Rack = rack_object self.cryomodule: Cryomodule = self.rack.cryomodule - self.linac = self.cryomodule.linac + self.linac: Linac = self.cryomodule.linac if self.cryomodule.is_harmonic_linearizer: self.length = 0.346 @@ -749,10 +820,12 @@ def __init__( ) self.chirp_prefix = self._pv_prefix + "CHIRP:" + self.abort_flag: bool = False - self.ssa = ssaClass(self) - self.steppertuner = stepperClass(self) - self.piezo = piezoClass(self) + # These need to be created after all the base cavity properties are defined + self.ssa: SSA = self.rack.ssa_class(cavity=self) + self.stepper_tuner: StepperTuner = self.rack.stepper_class(cavity=self) + self.piezo: Piezo = self.rack.piezo_class(cavity=self) self._calc_probe_q_pv_obj: Optional[PV] = None self.calc_probe_q_pv: str = self.pv_addr("QPROBE_CALC1.PROC") @@ -863,8 +936,6 @@ def __init__( self.freq_stop_pv: str = self.chirp_prefix + "FREQ_STOP" self._freq_stop_pv_obj: Optional[PV] = None - self.abort_flag: bool = False - self.hw_mode_pv: str = self.pv_addr("HWMODE") self._hw_mode_pv_obj: Optional[PV] = None @@ -880,7 +951,7 @@ def pv_prefix(self): @property def microsteps_per_hz(self): - return 1 / self.steppertuner.hz_per_microstep + return 1 / self.stepper_tuner.hz_per_microstep def start_characterization(self): if not self._characterization_start_pv_obj: @@ -1115,11 +1186,13 @@ def edm_macro_string(self): rfs = rfs_map[self.number] - r = self.rack.rackName - cm = self.cryomodule.pv_prefix[ - :-3 - ] # need to remove trailing colon and zeroes to match needed format - id = self.cryomodule.name + r = self.rack.rack_name + + # need to remove trailing colon and zeroes to match needed format + cm = self.cryomodule.pv_prefix[:-3] + + # "id" shadows built-in id, renaming + macro_id = self.cryomodule.name ch = 2 if self.number in [2, 4] else 1 @@ -1129,7 +1202,7 @@ def edm_macro_string(self): "RFS={rfs}".format(rfs=rfs), "R={r}".format(r=r), "CM={cm}".format(cm=cm), - "ID={id}".format(id=id), + "ID={id}".format(id=macro_id), "CH={ch}".format(ch=ch), ] ) @@ -1300,7 +1373,7 @@ def _auto_tune( steps_moved: int = 0 if reset_signed_steps: - self.steppertuner.reset_signed_steps() + self.stepper_tuner.reset_signed_steps() self.tune_config_pv_obj.put(utils.TUNE_CONFIG_OTHER_VALUE) @@ -1310,9 +1383,9 @@ def _auto_tune( print(f"Moving stepper for {self} {est_steps} steps") - self.steppertuner.move( + self.stepper_tuner.move( est_steps, - maxSteps=int(abs(est_steps) * 1.1), + max_steps=int(abs(est_steps) * 1.1), speed=utils.MAX_STEPPER_SPEED, ) @@ -1335,7 +1408,7 @@ def check_detune(self): f"Cannot tune {self} in SELA with invalid detune" ) - def checkAndSetOnTime(self): + def check_and_set_on_time(self): """ In pulsed mode the cavity has a duty cycle determined by the on time and off time. We want the on time to be 70 ms or else the various cavity @@ -1351,7 +1424,7 @@ def checkAndSetOnTime(self): ) ) self.pulse_on_time = utils.NOMINAL_PULSED_ONTIME - self.pushGoButton() + self.push_go_button() @property def pulse_go_pv_obj(self) -> PV: @@ -1359,7 +1432,7 @@ def pulse_go_pv_obj(self) -> PV: self._pulse_go_pv_obj = PV(self._pv_prefix + "PULSE_DIFF_SUM") return self._pulse_go_pv_obj - def pushGoButton(self): + def push_go_button(self): """ Many of the changes made to a cavity don't actually take effect until the go button is pressed @@ -1388,7 +1461,7 @@ def turn_on(self): else: raise utils.CavityHWModeError(f"{self} not online") - def turnOff(self): + def turn_off(self): print(f"turning {self} off") self.rf_control = 0 while self.is_on: @@ -1397,30 +1470,30 @@ def turnOff(self): sleep(1) print(f"{self} off") - def setup_selap(self, desAmp: float = 5): - self.setup_rf(desAmp) + def setup_selap(self, des_amp: float = 5): + self.setup_rf(des_amp) self.set_selap_mode() print(f"{self} set up in SELAP") - def setup_sela(self, desAmp: float = 5): - self.setup_rf(desAmp) + def setup_sela(self, des_amp: float = 5): + self.setup_rf(des_amp) self.set_sela_mode() print(f"{self} set up in SELA") def check_abort(self): if self.abort_flag: self.abort_flag = False - self.turnOff() + self.turn_off() raise utils.CavityAbortError(f"Abort requested for {self}") - def setup_rf(self, desAmp): - if desAmp > self.ades_max: + def setup_rf(self, des_amp): + if des_amp > self.ades_max: print( f"Requested amplitude for {self} too high - ramping up to AMAX instead" ) - desAmp = self.ades_max + des_amp = self.ades_max print(f"setting up {self}") - self.turnOff() + self.turn_off() self.ssa.calibrate(self.ssa.drive_max) self.move_to_resonance() @@ -1433,19 +1506,19 @@ def setup_rf(self, desAmp): self.check_abort() - self.ades = min(5, desAmp) + self.ades = min(5, des_amp) self.set_sel_mode() self.piezo.enable_feedback() self.set_sela_mode() self.check_abort() - if desAmp <= 10: - self.walk_amp(desAmp, 0.5) + if des_amp <= 10: + self.walk_amp(des_amp, 0.5) else: self.walk_amp(10, 0.5) - self.walk_amp(desAmp, 0.1) + self.walk_amp(des_amp, 0.1) def reset_data_decimation(self): print(f"Setting data decimation for {self}") @@ -1501,10 +1574,10 @@ def reset_interlocks(self, wait: int = 3, attempt: int = 0): print(f"Checking {self} RF permit") if self.rf_inhibited: - if attempt >= utils.INTERLOCK_RESET_ATTEMPS: + if attempt >= utils.INTERLOCK_RESET_ATTEMPTS: raise utils.CavityFaultError( f"{self} still faulted after" - f" {utils.INTERLOCK_RESET_ATTEMPS} " + f" {utils.INTERLOCK_RESET_ATTEMPTS} " f"reset attempts" ) else: @@ -1595,7 +1668,7 @@ def walk_amp(self, des_amp, step_size): while self.ades <= (des_amp - step_size): self.check_abort() if self.is_quenched: - raise utils.QuenchError(f"{self} quench detected, aborting rampup") + raise utils.QuenchError(f"{self} quench detected, aborting RF ramp") self.ades = self.ades + step_size # to avoid tripping sensitive interlock sleep(0.1) @@ -1607,12 +1680,22 @@ def walk_amp(self, des_amp, step_size): class Magnet(utils.SCLinacObject): - def __init__(self, magnettype, cryomodule): + """ + Python representation of LCLS II magnets. This class provides wrappers for + common magnet controls including powering on/off and changing strength + + """ + + def __init__(self, magnet_type, cryomodule): # type: (str, Cryomodule) -> None - self._pv_prefix = "{magnettype}:{linac}:{cm}85:".format( - magnettype=magnettype, linac=cryomodule.linac.name, cm=cryomodule.name - ) - self.name = magnettype + """ + @param magnet_type: One of QUAD, XCOR, or YCOR + @param cryomodule: the cryomodule object in which this magnet is contained + """ + + self._pv_prefix = f"{magnet_type}:{cryomodule.linac.name}:{cryomodule.name}85:" + + self.name = magnet_type self.cryomodule: Cryomodule = cryomodule self.bdes_pv: str = self.pv_addr("BDES") @@ -1653,10 +1736,10 @@ def bdes(self, value): def reset(self): self.control_pv_obj.put(utils.MAGNET_RESET_VALUE) - def turnOn(self): + def turn_on(self): self.control_pv_obj.put(utils.MAGNET_ON_VALUE) - def turnOff(self): + def turn_off(self): self.control_pv_obj.put(utils.MAGNET_OFF_VALUE) def degauss(self): @@ -1667,55 +1750,54 @@ def trim(self): class Rack(utils.SCLinacObject): + """ + Python representation of LCLS II RF Racks. This class functions mostly as a + container for cavities. + Rack A has cavities 1 through 4, Rack B has cavities 5 through 8. + """ + def __init__( self, - rackName, - cryoObject, - cavityClass=Cavity, - ssaClass=SSA, - stepperClass=StepperTuner, - piezoClass=Piezo, + rack_name, + cryomodule_object, ): - # type: (str, Cryomodule, Type[Cavity], Type[SSA], Type[StepperTuner], Type[Piezo]) -> None + # type: (str, Cryomodule) -> None """ Parameters ---------- - rackName: str name of rack (always either "A" or "B") - cryoObject: the cryomodule object this rack belongs to - cavityClass: cavity object + rack_name: str name of rack (always either "A" or "B") + cryomodule_object: the cryomodule object this rack belongs to """ - self.cryomodule = cryoObject - self.rackName = rackName + self.cryomodule: Cryomodule = cryomodule_object + self.rack_name = rack_name + + self.cavity_class: Type[Cavity] = self.cryomodule.cavity_class + self.ssa_class = self.cryomodule.ssa_class + self.stepper_class = self.cryomodule.stepper_class + self.piezo_class = self.cryomodule.piezo_class + self.cavities: Dict[int, Cavity] = {} self._pv_prefix = self.cryomodule.pv_addr( - "RACK{RACK}:".format(RACK=self.rackName) + "RACK{RACK}:".format(RACK=self.rack_name) ) - if rackName == "A": + if rack_name == "A": # rack A always has cavities 1 - 4 for cavityNum in range(1, 5): - self.cavities[cavityNum] = cavityClass( - cavityNum=cavityNum, - rackObject=self, - ssaClass=ssaClass, - stepperClass=stepperClass, - piezoClass=piezoClass, + self.cavities[cavityNum] = self.cavity_class( + cavity_num=cavityNum, rack_object=self ) - elif rackName == "B": + elif rack_name == "B": # rack B always has cavities 5 - 8 for cavityNum in range(5, 9): - self.cavities[cavityNum] = cavityClass( - cavityNum=cavityNum, - rackObject=self, - ssaClass=ssaClass, - stepperClass=stepperClass, - piezoClass=piezoClass, + self.cavities[cavityNum] = self.cavity_class( + cavity_num=cavityNum, rack_object=self ) else: - raise Exception(f"Bad rack name {rackName}") + raise Exception(f"Bad rack name {rack_name}") @property def pv_prefix(self): @@ -1723,78 +1805,65 @@ def pv_prefix(self): class Cryomodule(utils.SCLinacObject): + """ + Python representation of an LCLS II cryomodule. This class functions mostly + as a container for racks and cryo-level PVs + + """ + def __init__( self, cryo_name, linac_object, - cavity_class=Cavity, - magnet_class=Magnet, - rack_class=Rack, - is_harmonic_linearizer=False, - ssa_class=SSA, - stepper_class=StepperTuner, - piezo_class=Piezo, ): - # type: (str, Linac, Type[Cavity], Type[Magnet], Type[Rack], bool, Type[SSA], Type[StepperTuner], Type[Piezo]) -> None # noqa: E501 + # type: (str, Linac) -> None # noqa: E501 """ - Parameters - ---------- - cryo_name: str name of Cryomodule i.e. "02", "03", "H1", "H2" - linac_object: the linac object this cryomodule belongs to i.e. CM02 is in linac L1B - cavity_class: cavity object + @param cryo_name: str name of Cryomodule i.e. "02", "03", "H1", "H2" + @param linac_object: the linac object this cryomodule belongs to i.e. + CM02 is in linac L1B """ self.name: str = cryo_name self.linac: Linac = linac_object - self.is_harmonic_linearizer: bool = is_harmonic_linearizer - if not is_harmonic_linearizer: - self.quad: Magnet = magnet_class("QUAD", self) - self.xcor: Magnet = magnet_class("XCOR", self) - self.ycor: Magnet = magnet_class("YCOR", self) + self.magnet_class: Type[Magnet] = self.linac.magnet_class + self.rack_class: Type[Rack] = self.linac.rack_class + self.cavity_class: Type[Cavity] = self.linac.cavity_class + self.ssa_class: Type[SSA] = self.linac.ssa_class + self.stepper_class: Type[StepperTuner] = self.linac.stepper_class + self.piezo_class: Type[Piezo] = self.linac.piezo_class - self._pv_prefix = "ACCL:{LINAC}:{CRYOMODULE}00:".format( - LINAC=self.linac.name, CRYOMODULE=self.name - ) - self.cte_prefix = "CTE:CM{cm}:".format(cm=self.name) - self.cvt_prefix = "CVT:CM{cm}:".format(cm=self.name) - self.cpv_prefix = "CPV:CM{cm}:".format(cm=self.name) + if not self.is_harmonic_linearizer: + self.quad: Magnet = self.magnet_class(magnet_type="QUAD", cryomodule=self) + self.xcor: Magnet = self.magnet_class(magnet_type="XCOR", cryomodule=self) + self.ycor: Magnet = self.magnet_class(magnet_type="YCOR", cryomodule=self) + + self._pv_prefix = f"ACCL:{self.linac.name}:{self.name}00:" + + self.cte_prefix = f"CTE:CM{self.name}:" + self.cvt_prefix = f"CVT:CM{self.name}:" + self.cpv_prefix = f"CPV:CM{self.name}:" if not self.is_harmonic_linearizer: - self.jt_prefix = "CLIC:CM{cm}:3001:PVJT:".format(cm=self.name) + self.jt_prefix = f"CLIC:CM{self.name}:3001:PVJT:" else: name_map: Dict[str, str] = {"H1": "HL01", "H2": "HL02"} - self.jt_prefix = "CLIC:{cm}:3001:PVJT:".format(cm=name_map[self.name]) + self.jt_prefix = f"CLIC:{name_map[self.name]}:3001:PVJT:" - self.ds_level_pv: str = "CLL:CM{cm}:2301:DS:LVL".format(cm=self.name) - self.us_level_pv: str = "CLL:CM{cm}:2601:US:LVL".format(cm=self.name) - self.ds_pressure_pv: str = "CPT:CM{cm}:2302:DS:PRESS".format(cm=self.name) + self.ds_level_pv: str = f"CLL:CM{self.name}:2301:DS:LVL" + self.us_level_pv: str = f"CLL:CM{self.name}:2601:US:LVL" + self.ds_pressure_pv: str = f"CPT:CM{self.name}:2302:DS:PRESS" self.jt_valve_readback_pv: str = self.jt_prefix + "ORBV" self.heater_readback_pv: str = f"CPIC:CM{self.name}:0000:EHCV:ORBV" - self.rack_a: rack_class = rack_class( - rackName="A", - cryoObject=self, - cavityClass=cavity_class, - ssaClass=ssa_class, - stepperClass=stepper_class, - piezoClass=piezo_class, - ) - - self.rack_b: rack_class = rack_class( - rackName="B", - cryoObject=self, - cavityClass=cavity_class, - ssaClass=ssa_class, - stepperClass=stepper_class, - piezoClass=piezo_class, - ) + self.rack_a: Rack = self.rack_class(rack_name="A", cryomodule_object=self) + self.rack_b: Rack = self.rack_class(rack_name="B", cryomodule_object=self) - self.cavities: Dict[int, cavity_class] = {} + self.cavities: Dict[int, Cavity] = {} self.cavities.update(self.rack_a.cavities) self.cavities.update(self.rack_b.cavities) - if is_harmonic_linearizer: + if self.is_harmonic_linearizer: self.coupler_vacuum_pvs: List[str] = [ self.linac.vacuum_prefix + "{cm}09:COMBO_P".format(cm=self.name), self.linac.vacuum_prefix + "{cm}19:COMBO_P".format(cm=self.name), @@ -1810,23 +1879,51 @@ def __init__( + self.linac.insulating_vacuum_pvs ) + @property + def is_harmonic_linearizer(self): + return self.name in ["H1", "H2"] + @property def pv_prefix(self): return self._pv_prefix class Linac: + """ + Python representation of LCLS II linac sections. This class functions mostly + as a container for cryomodules and linac-level vacuum PVs + + """ + def __init__( - self, linac_name, beamline_vacuum_infixes, insulating_vacuum_cryomodules + self, + linac_section, + beamline_vacuum_infixes, + insulating_vacuum_cryomodules, + machine, ): - # type: (str, List[str], List[str]) -> None + # type: (int, List[str], List[str], Machine) -> None """ - Parameters - ---------- - linac_name: str name of Linac i.e. "L0B", "L1B", "L2B", "L3B" + @param linac_section: int of Linac index i.e. "L0B" -> 0 + @param beamline_vacuum_infixes: str list of vacuum infixes for that + section as found in + utils.BEAMLINE_VACUUM_INFIXES + @param insulating_vacuum_cryomodules: str list of cryomodules with + insulated vacuum readback in that + section as found in + utils.INSULATING_VACUUM_CRYOMODULES + @param machine: Machine object that this linac belongs to """ - self.name = linac_name + self.cryomodule_class: Type[Cryomodule] = machine.cryomodule_class + self.cavity_class = machine.cavity_class + self.rack_class = machine.rack_class + self.magnet_class = machine.magnet_class + self.ssa_class = machine.ssa_class + self.stepper_class = machine.stepper_class + self.piezo_class = machine.piezo_class + + self.name = f"L{linac_section}B" self.cryomodules: Dict[str, Cryomodule] = {} self.vacuum_prefix = "VGXX:{linac}:".format(linac=self.name) @@ -1839,177 +1936,78 @@ def __init__( for cm in insulating_vacuum_cryomodules ] + for cm_name in utils.LINAC_CM_MAP[linac_section]: + self.cryomodules[cm_name] = self.cryomodule_class( + cryo_name=cm_name, linac_object=self + ) + def add_cryomodules( self, cryomodule_string_list: List[str], - cryomoduleClass: Type[Cryomodule] = Cryomodule, - cavityClass: Type[Cavity] = Cavity, - rackClass: Type[Rack] = Rack, - magnetClass: Type[Magnet] = Magnet, - is_harmonic_linearizer: bool = False, - ssaClass: Type[SSA] = SSA, - stepperClass: Type[StepperTuner] = StepperTuner, - piezoClass: Type[Piezo] = Piezo, ): for cryomoduleString in cryomodule_string_list: self.add_cryomodule( cryomodule_name=cryomoduleString, - cryomoduleClass=cryomoduleClass, - cavityClass=cavityClass, - rackClass=rackClass, - magnetClass=magnetClass, - is_harmonic_linearizer=is_harmonic_linearizer, - ssaClass=ssaClass, - stepperClass=stepperClass, - piezoClass=piezoClass, ) - def add_cryomodule( - self, - cryomodule_name: str, - cryomoduleClass: Type[Cryomodule] = Cryomodule, - cavityClass: Type[Cavity] = Cavity, - rackClass: Type[Rack] = Rack, - magnetClass: Type[Magnet] = Magnet, - is_harmonic_linearizer: bool = False, - ssaClass: Type[SSA] = SSA, - stepperClass: Type[StepperTuner] = StepperTuner, - piezoClass: Type[Piezo] = Piezo, - ): - self.cryomodules[cryomodule_name] = cryomoduleClass( - cryo_name=cryomodule_name, - linac_object=self, - cavity_class=cavityClass, - rack_class=rackClass, - magnet_class=magnetClass, - is_harmonic_linearizer=is_harmonic_linearizer, - ssa_class=ssaClass, - stepper_class=stepperClass, - piezo_class=piezoClass, + def add_cryomodule(self, cryomodule_name: str): + self.cryomodules[cryomodule_name] = self.cryomodule_class( + cryo_name=cryomodule_name, linac_object=self ) -def make_linac(section: int, LinacClass: Type[Linac] = Linac) -> Linac: - return LinacClass( - f"L{section}B", - beamline_vacuum_infixes=utils.BEAMLINEVACUUM_INFIXES[section], - insulating_vacuum_cryomodules=utils.INSULATINGVACUUM_CRYOMODULES[section], - ) +class Machine: + """ + Python representation of the entire LCLS II accelerator. This class functions + as a generator for lower level accelerator objects, as well as a container + for generated cryomodule objects + """ -class LinacDict(dict): def __init__( self, - LinacClass: Type[Linac] = Linac, - cryomoduleClass: Type[Cryomodule] = Cryomodule, - cavityClass: Type[Cavity] = Cavity, - magnetClass: Type[Magnet] = Magnet, - rackClass: Type[Rack] = Rack, - stepperClass: Type[StepperTuner] = StepperTuner, - ssaClass: Type[SSA] = SSA, - piezoClass: Type[Piezo] = Piezo, + linac_class: Type[Linac] = Linac, + cryomodule_class: Type[Cryomodule] = Cryomodule, + cavity_class: Type[Cavity] = Cavity, + magnet_class: Type[Magnet] = Magnet, + rack_class: Type[Rack] = Rack, + stepper_class: Type[StepperTuner] = StepperTuner, + ssa_class: Type[SSA] = SSA, + piezo_class: Type[Piezo] = Piezo, ): - super().__init__() - self.cryomoduleClass = cryomoduleClass - self.cavityClass = cavityClass - self.magnetClass = magnetClass - self.rackClass = rackClass - self.stepperClass = stepperClass - self.ssaClass = ssaClass - self.piezoClass = piezoClass - self.LinacClass: Type[Linac] = LinacClass - - def __missing__(self, key: int) -> Linac: - if key not in range(4): - raise KeyError(f"Linac section {key} does not exist") - else: - linac = make_linac(section=key, LinacClass=self.LinacClass) - self[key] = linac - return linac - - def populate(self): - for section, cm_list in utils.LINAC_CM_DICT.items(): - self[section].add_cryomodules( - cm_list, - cryomoduleClass=self.cryomoduleClass, - cavityClass=self.cavityClass, - rackClass=self.rackClass, - magnetClass=self.magnetClass, - ssaClass=self.ssaClass, - stepperClass=self.stepperClass, - piezoClass=self.piezoClass, - is_harmonic_linearizer=False, - ) - - self[1].add_cryomodules( - utils.L1BHL, - cryomoduleClass=self.cryomoduleClass, - cavityClass=self.cavityClass, - rackClass=self.rackClass, - magnetClass=self.magnetClass, - ssaClass=self.ssaClass, - stepperClass=self.stepperClass, - piezoClass=self.piezoClass, - is_harmonic_linearizer=True, - ) - + """ + All inputs are optional, but allow for object customization for more + specific use cases. Only functionality used by at least two applications + is put in default classes -class CryoDict(dict): - def __init__( - self, - cryomoduleClass: Type[Cryomodule] = Cryomodule, - cavityClass: Type[Cavity] = Cavity, - magnetClass: Type[Magnet] = Magnet, - rackClass: Type[Rack] = Rack, - stepperClass: Type[StepperTuner] = StepperTuner, - ssaClass: Type[SSA] = SSA, - piezoClass: Type[Piezo] = Piezo, - linacClass: Type[Linac] = Linac, - ): - super().__init__() - - self.cryomoduleClass = cryomoduleClass - self.cavityClass = cavityClass - self.magnetClass = magnetClass - self.rackClass = rackClass - self.stepperClass = stepperClass - self.ssaClass = ssaClass - self.piezoClass = piezoClass - self.linacClass = linacClass - - self.linac_dict = LinacDict(LinacClass=linacClass) - - def __missing__(self, key): - if key in utils.L0B: - linac: Linac = self.linac_dict[0] - elif key in utils.L1B: - linac: Linac = self.linac_dict[1] - elif key in utils.L1BHL: - linac: Linac = self.linac_dict[1] - elif key in utils.L2B: - linac: Linac = self.linac_dict[2] - elif key in utils.L3B: - linac: Linac = self.linac_dict[3] - else: - raise KeyError(f"Cryomodule {key} not found in any linac region.") - - if key not in linac.cryomodules.keys(): - linac.add_cryomodule( - cryomodule_name=key, - cryomoduleClass=self.cryomoduleClass, - cavityClass=self.cavityClass, - rackClass=self.rackClass, - magnetClass=self.magnetClass, - is_harmonic_linearizer=(key in utils.L1BHL), - ssaClass=self.ssaClass, - stepperClass=self.stepperClass, - piezoClass=self.piezoClass, + """ + self.linac_class = linac_class + self.cryomodule_class = cryomodule_class + self.cavity_class = cavity_class + self.magnet_class = magnet_class + self.rack_class = rack_class + self.stepper_class = stepper_class + self.ssa_class = ssa_class + self.piezo_class = piezo_class + + self.linacs: List[Linac] = [] + + for section in range(4): + self.linacs.append( + linac_class( + linac_section=section, + beamline_vacuum_infixes=utils.BEAMLINE_VACUUM_INFIXES[section], + insulating_vacuum_cryomodules=utils.INSULATING_VACUUM_CRYOMODULES[ + section + ], + machine=self, + ) ) - cryomodule = linac.cryomodules[key] - self[key] = cryomodule - - return cryomodule + self.cryomodules: Dict[str, Cryomodule] = {} + for linac in self.linacs: + for cm_name, cm_obj in linac.cryomodules.items(): + self.cryomodules[cm_name] = cm_obj -CRYOMODULE_OBJECTS: Dict[str, Cryomodule] = CryoDict() +MACHINE = Machine() diff --git a/lcls_tools/superconducting/sc_linac_utils.py b/lcls_tools/superconducting/sc_linac_utils.py index 7d5d64e9..4a27d6f7 100644 --- a/lcls_tools/superconducting/sc_linac_utils.py +++ b/lcls_tools/superconducting/sc_linac_utils.py @@ -32,14 +32,15 @@ LINAC_TUPLES = [("L0B", L0B), ("L1B", L1B), ("L2B", L2B), ("L3B", L3B)] LINAC_CM_DICT = {0: L0B, 1: L1B, 2: L2B, 3: L3B} +LINAC_CM_MAP = [L0B, L1B + L1BHL, L2B, L3B] -BEAMLINEVACUUM_INFIXES = [ +BEAMLINE_VACUUM_INFIXES = [ ["0198"], ["0202", "H292"], ["0402", "1592"], ["1602", "2594", "2598", "3592"], ] -INSULATINGVACUUM_CRYOMODULES = [ +INSULATING_VACUUM_CRYOMODULES = [ ["01"], ["02", "H1"], ["04", "06", "08", "10", "12", "14"], @@ -110,7 +111,7 @@ MAX_STEPPER_SPEED = 60000 STEPPER_ON_LIMIT_SWITCH_VALUE = 1 -# these values are based on the list of enum states found by probing {Magnettype}:L{x}B:{cm}85:CTRL +# these values are based on the list of enum states found by probing {magnet_type}:L{x}B:{cm}85:CTRL MAGNET_RESET_VALUE = 10 MAGNET_ON_VALUE = 11 MAGNET_OFF_VALUE = 12 @@ -123,7 +124,7 @@ PIEZO_FEEDBACK_VALUE = 1 PIEZO_SCRIPT_RUNNING_VALUE = 2 PIEZO_SCRIPT_COMPLETE_VALUE = 1 -PIEZO_PRERF_CHECKOUT_PASS_VALUE = 0 +PIEZO_PRE_RF_CHECKOUT_PASS_VALUE = 0 PIEZO_WITH_RF_GRAD = 6.5 PIEZO_CENTER_VOLTAGE = 25 PIEZO_HZ_PER_VOLT = 20 @@ -148,10 +149,15 @@ HW_MODE_MAIN_DONE_VALUE = 3 HW_MODE_READY_VALUE = 4 -INTERLOCK_RESET_ATTEMPS = 3 +INTERLOCK_RESET_ATTEMPTS = 5 class SCLinacObject(ABC, object): + """ + Base class used to represent all components of the LCLS II superconducting + accelerator (linacs, cryomodules, racks, cavities, SSAs, and tuners) + """ + @property @abstractmethod def pv_prefix(self):