Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Add boilerplate code for wait_for_robot_load_then_centre #1034

Merged
merged 1 commit into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions src/hyperion/experiment_plans/wait_for_robot_load_then_centre.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from __future__ import annotations

import json
from typing import TYPE_CHECKING

from blueapi.core import BlueskyContext, MsgGenerator

from hyperion.experiment_plans.grid_detect_then_xray_centre_plan import (
GridDetectThenXRayCentreComposite,
)
from hyperion.experiment_plans.pin_centre_then_xray_centre_plan import (
pin_tip_centre_then_xray_centre,
)
from hyperion.parameters.plan_specific.pin_centre_then_xray_centre_params import (
PinCentreThenXrayCentreInternalParameters,
)

if TYPE_CHECKING:
from hyperion.parameters.plan_specific.wait_for_robot_load_then_center_params import (
WaitForRobotLoadThenCentreInternalParameters,
)


def create_devices(context: BlueskyContext) -> GridDetectThenXRayCentreComposite:
from hyperion.utils.context import device_composite_from_context

return device_composite_from_context(context, GridDetectThenXRayCentreComposite)


def wait_for_robot_load_then_centre(
composite: GridDetectThenXRayCentreComposite,
parameters: WaitForRobotLoadThenCentreInternalParameters,
) -> MsgGenerator:
# Start arming the detector

# Move backlight in

# Wait for robot to finish moving

# Take snapshot

# Put robot load into ispyb

# Do centering
params_json = json.loads(parameters.json())
pin_centre_params = PinCentreThenXrayCentreInternalParameters(**params_json)
yield from pin_tip_centre_then_xray_centre(composite, pin_centre_params)
4 changes: 4 additions & 0 deletions src/hyperion/external_interaction/ispyb/ispyb_dataclass.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ def wavelength_angstroms(self):
return convert_eV_to_angstrom(self.current_energy_ev)


class RobotLoadIspybParams(IspybParams):
...


class RotationIspybParams(IspybParams):
...

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
from __future__ import annotations

from typing import Any

import numpy as np
from dodal.devices.detector import TriggerMode
from dodal.devices.eiger import DetectorParams
from pydantic import validator
from pydantic.dataclasses import dataclass

from hyperion.external_interaction.ispyb.ispyb_dataclass import (
GRIDSCAN_ISPYB_PARAM_DEFAULTS,
RobotLoadIspybParams,
)
from hyperion.parameters.internal_parameters import (
HyperionParameters,
InternalParameters,
extract_experiment_params_from_flat_dict,
extract_hyperion_params_from_flat_dict,
)


class WaitForRobotLoadThenCentreHyperionParameters(HyperionParameters):
ispyb_params: RobotLoadIspybParams = RobotLoadIspybParams(
**GRIDSCAN_ISPYB_PARAM_DEFAULTS
)

class Config:
arbitrary_types_allowed = True
json_encoders = {
**DetectorParams.Config.json_encoders,
**RobotLoadIspybParams.Config.json_encoders,
}


@dataclass
class WaitForRobotLoadThenCentreParams:
"""
Holder class for the parameters of a plan that waits for robot load then does a
centre.
"""

exposure_time: float
detector_distance: float
omega_start: float
snapshot_dir: str


class WaitForRobotLoadThenCentreInternalParameters(InternalParameters):
experiment_params: WaitForRobotLoadThenCentreParams
hyperion_params: WaitForRobotLoadThenCentreHyperionParameters

class Config:
arbitrary_types_allowed = True
json_encoders = {
**HyperionParameters.Config.json_encoders,
}

@staticmethod
def _hyperion_param_key_definitions() -> tuple[list[str], list[str], list[str]]:
(
hyperion_param_field_keys,
detector_field_keys,
ispyb_field_keys,
) = InternalParameters._hyperion_param_key_definitions()
ispyb_field_keys += list(RobotLoadIspybParams.__annotations__.keys())
return hyperion_param_field_keys, detector_field_keys, ispyb_field_keys

@validator("experiment_params", pre=True)
def _preprocess_experiment_params(
cls,
experiment_params: dict[str, Any],
):
return WaitForRobotLoadThenCentreParams(
**extract_experiment_params_from_flat_dict(
WaitForRobotLoadThenCentreParams, experiment_params
)
)

@validator("hyperion_params", pre=True)
def _preprocess_hyperion_params(
cls, all_params: dict[str, Any], values: dict[str, Any]
):
experiment_params: WaitForRobotLoadThenCentreParams = values[
"experiment_params"
]
all_params["num_images"] = 0
all_params["exposure_time"] = experiment_params.exposure_time
all_params["position"] = np.array(all_params["position"])
all_params["omega_increment"] = 0
all_params["num_triggers"] = all_params["num_images"]
all_params["num_images_per_trigger"] = 1
all_params["trigger_mode"] = TriggerMode.FREE_RUN
all_params["upper_left"] = np.zeros(3, dtype=np.int32)
return WaitForRobotLoadThenCentreHyperionParameters(
**extract_hyperion_params_from_flat_dict(
all_params, cls._hyperion_param_key_definitions()
)
)

def get_data_shape(self):
raise TypeError("Data shape does not apply to this type of experiment!")

def get_scan_points(self):
raise TypeError("Scan points do not apply to this type of experiment!")
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"$schema": "http://json-schema.org/draft-07/schema",
"type": "object",
"properties": {
"exposure_time": {
"type": "number"
},
"snapshot_dir": {
"type": "string"
},
"detector_distance": {
"type": "number"
},
"omega_start": {
"type": "number"
}
},
"required": [
"exposure_time",
"snapshot_dir",
"detector_distance",
"omega_start"
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"$ref": "hyperion_parameters_schema.json"
},
"experiment_params": {
"oneOf": [
"anyOf": [
{
"$ref": "experiment_schemas/grid_scan_params_schema.json"
},
Expand All @@ -19,6 +19,9 @@
},
{
"$ref": "experiment_schemas/grid_scan_with_edge_detect_params_schema.json"
},
{
"$ref": "experiment_schemas/wait_for_robot_load_then_centre_schema.json"
}
]
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{
"params_version": "4.0.1",
"hyperion_params": {
"zocalo_environment": "artemis",
"beamline": "BL03I",
"insertion_prefix": "SR03S",
"experiment_type": "wait_for_robot_load_then_xray_centre",
"detector_params": {
"current_energy_ev": 100,
"directory": "/tmp/",
"prefix": "file_name",
"run_number": 0,
"use_roi_mode": false,
"det_dist_to_beam_converter_path": "tests/test_data/test_lookup_table.txt"
},
"ispyb_params": {
"visit_path": "/tmp/cm31105-4",
"microns_per_pixel_x": 0.0,
"microns_per_pixel_y": 0.0,
"position": [
0,
0,
0
],
"transmission_fraction": 1.0,
"flux": 10.0,
"beam_size_x": 0.1,
"beam_size_y": 0.1,
"focal_spot_size_x": 0.0,
"focal_spot_size_y": 0.0,
"comment": "Descriptive comment.",
"resolution": 1,
"sample_id": null,
"sample_barcode": null
}
},
"experiment_params": {
"omega_start": 0,
"exposure_time": 0.004,
"detector_distance": 255,
"snapshot_dir": "/tmp"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from unittest.mock import MagicMock, patch

import pytest
from bluesky.run_engine import RunEngine

from hyperion.experiment_plans.wait_for_robot_load_then_centre import (
wait_for_robot_load_then_centre,
)
from hyperion.parameters.external_parameters import from_file as raw_params_from_file
from hyperion.parameters.plan_specific.pin_centre_then_xray_centre_params import (
PinCentreThenXrayCentreInternalParameters,
)
from hyperion.parameters.plan_specific.wait_for_robot_load_then_center_params import (
WaitForRobotLoadThenCentreInternalParameters,
)


@pytest.fixture
def wait_for_robot_load_then_centre_params():
params = raw_params_from_file(
"tests/test_data/parameter_json_files/good_test_wait_for_robot_load_params.json"
)
return WaitForRobotLoadThenCentreInternalParameters(**params)


@patch(
"hyperion.experiment_plans.wait_for_robot_load_then_centre.pin_tip_centre_then_xray_centre"
)
def test_when_plan_run_then_centring_plan_run_with_expected_parameters(
mock_centring_plan: MagicMock,
wait_for_robot_load_then_centre_params: WaitForRobotLoadThenCentreInternalParameters,
):
mock_composite = MagicMock()
RE = RunEngine()
RE(
wait_for_robot_load_then_centre(
mock_composite, wait_for_robot_load_then_centre_params
)
)
composite_passed = mock_centring_plan.call_args[0][0]
params_passed: PinCentreThenXrayCentreInternalParameters = (
mock_centring_plan.call_args[0][1]
)

assert composite_passed == mock_composite
assert isinstance(params_passed, PinCentreThenXrayCentreInternalParameters)