diff --git a/examples/example_measurement.ipynb b/examples/example_measurement.ipynb new file mode 100644 index 00000000..178c60ff --- /dev/null +++ b/examples/example_measurement.ipynb @@ -0,0 +1,291 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "0a432940", + "metadata": {}, + "source": [ + "-This notebook is an example of how to properly set up the ScreenBeamProfileMeasurement Class:\n", + " -Since ScreenBeamProfileMeasurement takes a screen object as a pydantic field to use for live images and this example is mean't to be \n", + "done offline we subclass Screen and overwrite the the image property to a static 2d array. \n", + " -The array passed in this example is automatically" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "80146949", + "metadata": {}, + "outputs": [], + "source": [ + "from lcls_tools.common.measurements.screen_beam_profile_measurement import ScreenBeamProfileMeasurement\n", + "from lcls_tools.common.data_analysis.fit.projection import ProjectionFit\n", + "from lcls_tools.common.data_analysis.fit.methods import GaussianModel\n", + "from lcls_tools.common.image_processing.image_processing import ImageProcessor\n", + "from lcls_tools.common.image_processing.roi import ROI, RectangularROI, CircularROI\n", + "from lcls_tools.common.devices.device import Device, PVSet, ControlInformation, Metadata\n", + "from lcls_tools.common.devices.screen import ScreenControlInformation, ScreenPVSet, Screen\n", + "from matplotlib import pyplot as plt\n", + "from epics import PV, caget\n", + "import numpy as np\n", + "import h5py\n", + "import random" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "7051ec06-01f3-4dc3-b86a-c732fd1c3df5", + "metadata": {}, + "outputs": [], + "source": [ + "class ScreenTest(Screen):\n", + " @property\n", + " def image(self) -> np.ndarray:\n", + " return self._image\n", + " \n", + " @image.setter\n", + " def image(self,image):\n", + " self._image = image" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "f736c366-6c88-4187-b9a4-a4dcab9c7730", + "metadata": {}, + "outputs": [], + "source": [ + "def create_test_image(size:tuple,center:list,radius:int):\n", + " # make img that is a circle in the center of the image with known standard dev and mean. no imports, no calls to external or\n", + " # internal files. \n", + " image = np.zeros(size)\n", + " for y in range(image.shape[0]):\n", + " for x in range(image.shape[1]):\n", + " distance = np.sqrt((x - center[0])**2 + (y - center[1])**2)\n", + " if distance < radius:\n", + " image[y,x]= 1\n", + " return image" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "9fa172e9", + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "# creating projection fit class\n", + "x = np.linspace(0,800,800)\n", + "\n", + "\n", + "gauss_model = GaussianModel()\n", + "projection_fit = ProjectionFit(model = gauss_model,use_priors =True)" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "4289cca5", + "metadata": {}, + "outputs": [], + "source": [ + "# creating image processing class\n", + "# would be nice to have default way of finding the center of image\n", + "# also maybe have rectangularROI have default roi_type 'Rectangular', and change roi_type to roi\n", + "roi = RectangularROI(center =[400,400],xwidth=300,ywidth=300)\n", + "image_processor = ImageProcessor(roi = roi)" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "8260d6b6", + "metadata": {}, + "outputs": [], + "source": [ + "PV_strings = {\n", + " 'image': 'ArrayData',\n", + " 'n_bits': 'N_OF_BITS',\n", + " 'n_col': 'ArraySize1_RBV',\n", + " 'n_row': 'ArraySize0_RBV',\n", + " 'resolution': 'RESOLUTION'\n", + "}\n", + "metadata = {\n", + " 'area': 'TEST',\n", + " 'beam_path': ['SC_TEST'],\n", + " 'sum_l_meters': 99.99\n", + " }\n", + "control_name = \"OTRS:TEST:650:\"" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "eb543534", + "metadata": {}, + "outputs": [], + "source": [ + "# creating Screen Device\n", + "# inconsistent use of Controls_information and control_information between classes recommend change.\n", + "screen_pvs = ScreenPVSet(**PV_strings)\n", + "meta_data = Metadata(**metadata)\n", + "controls_information = ScreenControlInformation(control_name = control_name, PVs = screen_pvs )\n", + "screen_test = ScreenTest(controls_information = controls_information, metadata = meta_data)" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "d5d1add9", + "metadata": {}, + "outputs": [], + "source": [ + "test = ScreenBeamProfileMeasurement(name = control_name,device = screen_test, image_processor = image_processor, fitting_tool = projection_fit)" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "71c61cf3-bf4a-4d56-beb7-07e55b61cce0", + "metadata": {}, + "outputs": [], + "source": [ + "test.device.image = create_test_image((800,800),[400,400],50)" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "725f6fb5", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "['mean', 'sigma', 'amplitude', 'offset']\n" + ] + }, + { + "data": { + "text/plain": [ + "[{'raw_image': array([[0., 0., 0., ..., 0., 0., 0.],\n", + " [0., 0., 0., ..., 0., 0., 0.],\n", + " [0., 0., 0., ..., 0., 0., 0.],\n", + " ...,\n", + " [0., 0., 0., ..., 0., 0., 0.],\n", + " [0., 0., 0., ..., 0., 0., 0.],\n", + " [0., 0., 0., ..., 0., 0., 0.]]),\n", + " 'processed_image': array([[0., 0., 0., ..., 0., 0., 0.],\n", + " [0., 0., 0., ..., 0., 0., 0.],\n", + " [0., 0., 0., ..., 0., 0., 0.],\n", + " ...,\n", + " [0., 0., 0., ..., 0., 0., 0.],\n", + " [0., 0., 0., ..., 0., 0., 0.],\n", + " [0., 0., 0., ..., 0., 0., 0.]]),\n", + " 'mean_x': 150.50167233387222,\n", + " 'sigma_x': 31.481275393806968,\n", + " 'amplitude_x': 98.871501132453,\n", + " 'offset_x': 0.9887150113245301,\n", + " 'mean_y': 150.50167233081507,\n", + " 'sigma_y': 31.481275399702522,\n", + " 'amplitude_y': 98.87150113245299,\n", + " 'offset_y': 0.9887150113245299}]" + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "print(test.fitting_tool.model.param_names)\n", + "test.measure()" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "29d792d7-dac2-45ad-888e-3cb29921ce92", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "(
, )" + ] + }, + "execution_count": 11, + "metadata": {}, + "output_type": "execute_result" + }, + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "test.fitting_tool.plot_fit()" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "79b0871e-66ba-4508-8ba4-5e21ea1d237c", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "1.0" + ] + }, + "execution_count": 12, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "test.fitting_tool.model._forward(0,[0,10000,1,0])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0d32b918-caba-4e16-9840-e8393bf80761", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.19" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/lcls_tools/common/data_analysis/fit/method_base.py b/lcls_tools/common/data_analysis/fit/method_base.py new file mode 100644 index 00000000..58567fee --- /dev/null +++ b/lcls_tools/common/data_analysis/fit/method_base.py @@ -0,0 +1,134 @@ +from abc import ABC, abstractmethod +import numpy as np +from matplotlib import pyplot as plt + + +class MethodBase(ABC): + """ + Base abstract class for all fit methods, which serves as the bare minimum skeleton code needed. + Should be used only as a parent class to all method models. + --------------------------- + Arguments: + param_names: list (list all of param names that the model will contain) + param_guesses: np.ndarray (array that contains a guess as to what + each param value is organized in the same order as param_names) + param_bounds: np.ndarray (array that contains the lower + and upper bound on for acceptable values of each parameter) + """ + + param_names: list = None + param_bounds: np.ndarray = None + + def __init__(self): + + self.init_values: dict = None + self.fitted_params_dict: dict = None + + @abstractmethod + def find_init_values(self) -> list: + ... + + @abstractmethod + def find_priors(self, data: np.ndarray) -> dict: + ... + + # TODO: move to plotting file + def plot_init_values(self): + init_values = np.array(list(self.init_values.values())) + """Plots init values as a function of forward and visually compares it to the initial distribution""" + fig, axs = plt.subplots(1, 1) + x = np.linspace(0, 1, len(self.profile_data)) + y_fit = self._forward(x, init_values) + axs.plot(x, self.profile_data, label="Projection Data") + axs.plot(x, y_fit, label="Initial Guess Fit Data") + axs.set_xlabel("x") + axs.set_ylabel("Forward(x)") + axs.set_title("Initial Fit Guess") + return fig, axs + + # TODO: move to plotting file + def plot_priors(self): + """Plots prior distributions for each param in param_names""" + num_plots = len(self.priors) + fig, axs = plt.subplots(num_plots, 1) + for i, (param, prior) in enumerate(self.priors.items()): + x = np.linspace(0, self.param_bounds[i][-1], len(self.profile_data)) + axs[i].plot(x, prior.pdf(x)) + axs[i].axvline( + self.param_bounds[i, 0], + ls="--", + c="k", + ) + axs[i].axvline(self.param_bounds[i, 1], ls="--", c="k", label="bounds") + axs[i].set_title(param + " prior") + axs[i].set_ylabel("Density") + axs[i].set_xlabel(param) + fig.tight_layout() + return fig, axs + + def forward(self, x: np.ndarray, params: dict) -> np.ndarray: + # TODO:test new usage + + params_list = np.array([params[name] for name in self.param_names]) + return self._forward(x, params_list) + + @staticmethod + @abstractmethod + def _forward(x: np.ndarray, params: np.ndarray) -> np.ndarray: + ... + + def log_prior(self, params: dict): + # TODO:test new usage + print(params) + params_list = np.array([params[name] for name in self.param_names]) + return self._log_prior(params_list) + + @abstractmethod + def _log_prior(self, params: np.ndarray): + ... + + def log_likelihood(self, x: np.ndarray, y: np.ndarray, params: dict): + # TODO:test new usage + params_list = np.array([params[name] for name in self.param_names]) + return self._log_likelihood(x, y, params_list) + + def _log_likelihood(self, x: np.ndarray, y: np.ndarray, params: np.ndarray): + # reducing error between data and fit + return -np.sum((y - self._forward(x, params)) ** 2) + + def loss(self, params, x, y, use_priors=False): + # TODO:implement using private functions _log_likelihood and _log_prior + # ML group way of iterating over priors/ reducing difference in fit and data + loss_temp = -self._log_likelihood(x, y, params) + if use_priors: + loss_temp = loss_temp - self._log_prior(params) + return loss_temp + + @property + def priors(self): + """ + Initial Priors store in a dictionary where the keys are the + complete set of parameters of the Model + """ + return self._priors + + @priors.setter + def priors(self, priors): + if not isinstance(priors, dict): + raise TypeError("Input must be a dictionary") + self._priors = priors + + @property + def profile_data(self): + """1D array typically projection data""" + return self._profile_data + + @profile_data.setter + def profile_data(self, profile_data): + if not isinstance(profile_data, np.ndarray): + raise TypeError("Input must be ndarray") + self._profile_data = profile_data + # should change these to private methods only called when profile data is set + self.find_init_values() + self.find_priors() + self.fitted_params_dict = {} diff --git a/lcls_tools/common/data_analysis/fit/methods.py b/lcls_tools/common/data_analysis/fit/methods.py new file mode 100644 index 00000000..fe4c832b --- /dev/null +++ b/lcls_tools/common/data_analysis/fit/methods.py @@ -0,0 +1,101 @@ +import numpy as np +from scipy.stats import norm, gamma +from lcls_tools.common.data_analysis.fit.method_base import MethodBase + + +class GaussianModel(MethodBase): + """ + GaussianModel Class that finds initial parameter values for gaussian distribution + and builds probability density functions for the likelyhood a parameter + to be that value based on those initial parameter values. + Passing this class the variable profile_data automatically updates + the initial values and and probability density functions to match that data. + """ + + param_names: list = ["mean", "sigma", "amplitude", "offset"] + param_bounds: np.ndarray = np.array( + [ + [0.01, 1.0], + [0.01, 5.0], + [0.01, 1.0], + [0.01, 1.0], + ] + ) + + def find_init_values(self) -> dict: + """Fit data without optimization, return values.""" + + data = self._profile_data + x = np.linspace(0, 1, len(data)) + # init_fit = norm.pdf(data) + offset = data.min() + amplitude = data.max() - offset + + weighted_mean = np.average(x, weights=data) + weighted_sigma = np.sqrt(np.cov(x, aweights=data)) + + self.init_values = { + self.param_names[0]: weighted_mean, # data.mean() + self.param_names[1]: weighted_sigma, # data.std() + self.param_names[2]: amplitude, + self.param_names[3]: offset, + } + return self.init_values + + def find_priors(self) -> dict: + """Do initial guesses based on data and make distribution from that guess.""" + # Creating a gamma distribution around the inital amplitude. + # TODO: add to comments on why gamma vs. normal dist used for priors. + amplitude_mean = self.init_values["amplitude"] + amplitude_var = 0.05 + amplitude_alpha = (amplitude_mean**2) / amplitude_var + amplitude_beta = amplitude_mean / amplitude_var + amplitude_prior = gamma(amplitude_alpha, loc=0, scale=1 / amplitude_beta) + + # Creating a normal distribution of points around the inital mean. + mean_prior = norm(self.init_values["mean"], 0.1) + # TODO: remove hard coded numbers? + sigma_alpha = 2.5 + sigma_beta = 5.0 + sigma_prior = gamma(sigma_alpha, loc=0, scale=1 / sigma_beta) + + # Creating a normal distribution of points around initial offset. + offset_prior = norm(self.init_values["offset"], 0.5) + self.priors = { + self.param_names[0]: mean_prior, + self.param_names[1]: sigma_prior, + self.param_names[2]: amplitude_prior, + self.param_names[3]: offset_prior, + } + return self.priors + + def _forward(self, x: np.array, params: np.array): + # Load distribution parameters + # needs to be array for scipy.minimize + mean = params[0] + sigma = params[1] + amplitude = params[2] + offset = params[3] + return ( + np.sqrt(2 * np.pi) * amplitude * norm.pdf((x - mean) / sigma) + offset + ) # norm.pdf(x, loc=mean, scale=sigma) #( + + # TODO: remove when above is confirmed the same/below not needed. + # @staticmethod + # def _forward(x: np.ndarray, params_list: np.ndarray): + # amplitude = params_list[0] + # mean = params_list[1] + # sigma = params_list[2] + # offset = params_list[3] + # normal = norm() + # return ((np.sqrt(2 * np.pi)) * amplitude) * normal.pdf( + # (x - mean) / sigma + # ) + offset + + def _log_prior(self, params: np.ndarray) -> float: + return np.sum( + [ + prior.logpdf(params[i]) + for i, (key, prior) in enumerate(self.priors.items()) + ] + ) diff --git a/lcls_tools/common/data_analysis/fit/projection.py b/lcls_tools/common/data_analysis/fit/projection.py new file mode 100644 index 00000000..898125bf --- /dev/null +++ b/lcls_tools/common/data_analysis/fit/projection.py @@ -0,0 +1,146 @@ +import numpy as np +import scipy.optimize +import scipy.signal +from matplotlib import pyplot as plt +from pydantic import BaseModel, ConfigDict +from lcls_tools.common.data_analysis.fit.method_base import MethodBase + + +class ProjectionFit(BaseModel): + """ + 1d fitting class that allows users to choose the model with which the fit + is performed, and if prior assumptions (bayesian regression) about + the data should be used when performing the fit. + Additionally there is an option to visualize the fitted data and priors. + -To perform a 1d fit, call fit_projection(projection_data={*data_to_fit*}) + ------------------------ + Arguments: + model: MethodBase (this argument is a child class object of method base + e.g GaussianModel & DoubleGaussianModel) + visualize_priors: bool (shows plots of the priors and init guess + distribution before fit) + use_priors: bool (incorporates prior distribution information into fit) + visualize_fit: bool (visualize the parameters as a function of the + forward function + from our model compared to distribution data) + """ + + # TODO: come up with better name + model_config = ConfigDict(arbitrary_types_allowed=True) + model: MethodBase + use_priors: bool = False + + def normalize(self, data: np.ndarray) -> np.ndarray: + """ + Normalize a 1d array by scaling and shifting data + s.t. data is between 0 and 1 + """ + data_copy = data.copy() + normalized_data = (data_copy - np.min(data)) / (np.max(data) - np.min(data)) + return normalized_data + + def unnormalize_model_params( + self, params_dict: dict, projection_data: np.ndarray + ) -> np.ndarray: + """ + Takes fitted and normalized params and returns them + to unnormalized values i.e the true fitted values of the distribution + """ + + projection_data_range = np.max(projection_data) - np.min(projection_data) + length = len(projection_data) + for key, val in params_dict.items(): + if "sigma" in key or "mean" in key: + true_fitted_val = val * length + elif "offset" in key: + true_fitted_val = val * projection_data_range + np.min(projection_data) + else: + true_fitted_val = val * projection_data_range + temp = {key: true_fitted_val} + params_dict.update(temp) + return params_dict + + def model_setup(self, projection_data=np.ndarray) -> None: + """sets up the model and init_values/priors""" + self.model.profile_data = projection_data + + def fit_model(self) -> scipy.optimize._optimize.OptimizeResult: + """ + Fits model params to distribution data and plots the fitted params + as a function of the model. + Returns optimizeResult object + """ + x = np.linspace(0, 1, len(self.model.profile_data)) + y = self.model.profile_data + + init_values = np.array( + [self.model.init_values[name] for name in self.model.param_names] + ) + + res = scipy.optimize.minimize( + self.model.loss, + init_values, + args=(x, y, self.use_priors), + bounds=self.model.param_bounds, + method="Powell" + ) + return res # TODO:optional argument to return fig,ax + + def fit_projection(self, projection_data: np.ndarray) -> dict: + """ + type is dict[str, float] + Wrapper function that does all necessary steps to fit 1d array. + Returns a dictionary where the keys are the model params and their + values are the params fitted to the data + """ + assert len(projection_data.shape) == 1 + fitted_params_dict = {} + normalized_data = self.normalize(projection_data) + self.model_setup(projection_data=normalized_data) + res = self.fit_model() + for i, param in enumerate(self.model.param_names): + fitted_params_dict[param] = (res.x)[i] + self.model.fitted_params_dict = fitted_params_dict.copy() + params_dict = self.unnormalize_model_params(fitted_params_dict, projection_data) + return params_dict + + def plot_fit(self, **kwargs): + """ + Plots the normalized fitted forward function against the normed data. + To plot unnormalized projection data, pass the unnormalized forward + function params and data as kwargs. + """ + fitted_params_dict = {} + projection_data = [] + if kwargs: + for key, val in kwargs.items(): + if key in self.model.fitted_params_dict: + fitted_params_dict[key] = val + else: + projection_data = val + x = np.linspace(0, len(projection_data), len(projection_data)) + else: + fitted_params_dict.update(self.model.fitted_params_dict) + projection_data = self.model.profile_data + x = np.linspace(0, 1, len(self.model.profile_data)) + + y = projection_data + fig, ax = plt.subplots() + y_fit = self.model.forward(x, fitted_params_dict) + ax.plot(x, y, label="data") + ax.plot(x, y_fit, label="fit") + + textstr = "\n".join( + [ + r"$\mathrm{%s}=%.2f$" % (key, val) + for key, val in fitted_params_dict.items() + ] + ) + ax.text( + 0.05, + 0.95, + textstr, + transform=ax.transAxes, + verticalalignment="top", + ) + return fig, ax diff --git a/lcls_tools/common/devices/screen.py b/lcls_tools/common/devices/screen.py index 868f00d6..121cc979 100644 --- a/lcls_tools/common/devices/screen.py +++ b/lcls_tools/common/devices/screen.py @@ -242,9 +242,11 @@ def _write_image_to_hdf5( # we update with original key if it isn't in our normal screen metadata # otherwise, prepend user_ to the key to retain all information. [ - dset.attrs.update({key: value}) - if key not in self.metadata - else dset.attrs.update({"user_" + key: value}) + ( + dset.attrs.update({key: value}) + if key not in self.metadata + else dset.attrs.update({"user_" + key: value}) + ) for key, value in extra_metadata.items() ] diff --git a/lcls_tools/common/devices/yaml/generate.py b/lcls_tools/common/devices/yaml/generate.py index 136b6327..1376a651 100644 --- a/lcls_tools/common/devices/yaml/generate.py +++ b/lcls_tools/common/devices/yaml/generate.py @@ -88,11 +88,11 @@ def _construct_information_from_element( item.strip() for item in element["Beampath"].split(",") if item ], "area": element["Area"], - "sum_l_meters": float( - np.format_float_positional(sum_l_meters, precision=3) - ) - if sum_l_meters - else None, + "sum_l_meters": ( + float(np.format_float_positional(sum_l_meters, precision=3)) + if sum_l_meters + else None + ), }, } diff --git a/lcls_tools/common/image_processing/image_processing.py b/lcls_tools/common/image_processing/image_processing.py index dc5f810b..22ed8f75 100644 --- a/lcls_tools/common/image_processing/image_processing.py +++ b/lcls_tools/common/image_processing/image_processing.py @@ -1,45 +1,65 @@ import numpy as np -import scipy.ndimage as snd - - -def fliplr(image): - """Flip over vertical axis""" - return np.fliplr(image) - - -def flipud(image): - """Flip over horizontal axis""" - return np.flipud(image) - - -def center_of_mass(image, sigma=5): - """Find center of mass, sigma sets threshold""" - return snd.center_of_mass(image > image.mean() + sigma * image.std()) - - -def average_image(images): - """If we can do things with an average image, do it!""" - return sum(images) / len(images) - - -def shape_image(image, x_size, y_size): - """Shape typical returned array, rows x columns so y x x""" - return image.reshape(y_size, x_size) - - -def x_projection(image, axis=0, subtract_baseline=True): - """Expects ndarray, return x projection""" - proj = np.sum(image, axis=0) - if subtract_baseline: - return proj - min(proj) - - return proj - - -def y_projection(image, subtract_baseline=True): - """Expects ndarray, return y projection""" - proj = np.sum(image, axis=1) - if subtract_baseline: - return proj - min(proj) - - return proj +from matplotlib import pyplot as plt +from scipy.ndimage import gaussian_filter +from pydantic import BaseModel, PositiveFloat, ConfigDict +from lcls_tools.common.image_processing.roi import ROI + + +class ImageProcessor(BaseModel): + """ + Image Processing class that allows for background subtraction and roi cropping + ------------------------ + Arguments: + roi: ROI (roi object either Circular or Rectangular), + background_image: np.ndarray (optional image that will be used in + background subtraction if passed), + threshold: Positive Float (value of pixel intensity to be subtracted + if background_image is None, default value = 0.0) + visualize: bool (plots processed image) + ------------------------ + Methods: + subtract_background: takes a raw image and does pixel intensity subtraction + process: takes raw image and calls subtract_background, passes to result + to the roi object for cropping. + """ + + model_config = ConfigDict(arbitrary_types_allowed=True) + roi: ROI = None + background_image: np.ndarray = None + threshold: PositiveFloat = 0.0 + + def subtract_background(self, raw_image: np.ndarray) -> np.ndarray: + """Subtract background pixel intensity from a raw image""" + if self.background_image is not None: + image = raw_image - self.background_image + else: + image = raw_image - self.threshold + return image + + def clip_image(self, image): + return np.clip(image, 0, None) + + def process(self, raw_image: np.ndarray) -> np.ndarray: + """Process image by subtracting background pixel intensity + from a raw image, crop, and filter""" + image = self.subtract_background(raw_image) + clipped_image = self.clip_image(image) + if self.roi is not None: + cropped_image = self.roi.crop_image(clipped_image) + else: + cropped_image = clipped_image + processed_image = self.filter(cropped_image) + return processed_image + + def filter(self, unfiltered_image: np.ndarray, sigma=5) -> np.ndarray: + # TODO: extend to other types of filters? Change the way we pass sigma? + filtered_data = gaussian_filter(unfiltered_image, sigma) + return filtered_data + + def plot_raw_and_processed_image(self, raw_image: np.ndarray, processed_image): + fig, ax = plt.subplots(2, 1) + c = ax[0].imshow(raw_image > 0, origin="lower") + rect = self.roi.get_patch() + ax[0].add_patch(rect) + ax[1].imshow(processed_image > 0, origin="lower") + fig.colorbar(c) diff --git a/lcls_tools/common/image_processing/roi.py b/lcls_tools/common/image_processing/roi.py new file mode 100644 index 00000000..c046f326 --- /dev/null +++ b/lcls_tools/common/image_processing/roi.py @@ -0,0 +1,109 @@ +from abc import ABC, abstractmethod +from typing import ( + List, +) +import numpy as np +from matplotlib import patches +from pydantic import BaseModel, PositiveFloat, Field + + +class ROI(BaseModel, ABC): + roi_type: str + center: List[PositiveFloat] + + @abstractmethod + def crop_image(self, img, **kwargs) -> np.ndarray: + """crop image using ROI""" + pass + + @abstractmethod + def get_patch(self): + pass + + +class CircularROI(ROI): + """ + Define a circular region of interest (ROI) for an image, cropping pixels outside a + bounding box around the ROI and setting pixels outside the boundary to a fill + value (usually zero). + """ + + roi_type: str = Field(default="circular", frozen=True) + radius: PositiveFloat + + @property + def bounding_box(self): + return [ + int(self.center[0] - int(self.radius)), + int(self.center[1] - int(self.radius)), + int(self.radius * 2), + int(self.radius * 2), + ] + + def crop_image(self, img, **kwargs) -> np.ndarray: + fill_value = kwargs.get("fill_value", 0.0) + img = self.fill_value_outside_circle(img, self.center, self.radius, fill_value) + bbox = self.bounding_box + img = img[bbox[1]: bbox[1] + bbox[3], bbox[0]: bbox[0] + bbox[2]] + return img + + def fill_value_outside_circle( + self, + img: np.ndarray, + center: List[PositiveFloat], + radius: PositiveFloat, + fill_value: float, + ): + height, width = img.shape + for y in range(height): + for x in range(width): + distance = np.sqrt((x - center[0]) ** 2 + (y - center[1]) ** 2) + if distance > radius: + img[y, x] = fill_value + return img + + def get_patch(self): + return patches.Circle( + tuple(self.center), self.radius, facecolor="none", edgecolor="r" + ) + + +class RectangularROI(ROI): + """ + Define a rectangular region of interest (ROI) for an image, cropping pixels outside + the ROI. + """ + + # set roi_type : str = 'rectangular' and freeze it. + roi_type: str = Field(default="rectangular", frozen=True) + xwidth: int + ywidth: int + + @property + def bounding_box(self): + return [ + int(self.center[0] - int(self.xwidth / 2)), + int(self.center[1] - int(self.ywidth / 2)), + int(self.xwidth), + int(self.ywidth), + ] + + def crop_image(self, img, **kwargs) -> np.ndarray: + x_size, y_size = img.shape + if self.xwidth > x_size or self.ywidth > y_size: + raise ValueError( + f"must specify ROI that is smaller than the image, " + f"image size is {img.shape}" + ) + bbox = self.bounding_box + img = img[bbox[1]: bbox[1] + bbox[3], bbox[0]: bbox[0] + bbox[2]] + return img + + def get_patch(self): + return patches.Rectangle( + (self.bounding_box[0], self.bounding_box[1]), + self.bounding_box[2], + self.bounding_box[3], + facecolor="none", + edgecolor="r", + ) diff --git a/lcls_tools/common/io.py b/lcls_tools/common/io.py index 73fb61d6..a88b10af 100644 --- a/lcls_tools/common/io.py +++ b/lcls_tools/common/io.py @@ -8,20 +8,17 @@ def __init__(self): pass def write( - self, - measurement_data: dict, - measurement_obj: Measurement, - filename: Optional[str] = None + self, + measurement_data: dict, + measurement_obj: Measurement, + filename: Optional[str] = None, ): """ Write data to h5file """ raise NotImplementedError - def read( - self, - filename: Optional[str] = None - ): + def read(self, filename: Optional[str] = None): """ Read data from h5file """ diff --git a/lcls_tools/common/measurements/measurement.py b/lcls_tools/common/measurements/measurement.py index 3917c3d1..cb99a50f 100644 --- a/lcls_tools/common/measurements/measurement.py +++ b/lcls_tools/common/measurements/measurement.py @@ -1,15 +1,28 @@ from abc import ABC, abstractmethod from typing import Optional - from pydantic import BaseModel, DirectoryPath class Measurement(BaseModel, ABC): + """ + Base abstract class for all meaurements, which serves as the bare minimum skeleton code needed. + Should be used only as a parent class to all performable measurements. + --------------------------- + Arguments: + name: str (name of measurement performed) + save_data: bool = True (specifies whether or not to dump data and meta data to h5py file) + save_location: Optional[DirectoryPath] = None + (optional argument that is a path to the directory you wish to save the data) + --------------------------- + Methods: + measure: abstractmethod to be implemented in all children classes wear measurement is performed + """ + name: str save_data: bool = True save_location: Optional[DirectoryPath] = None @abstractmethod def measure(self, **kwargs) -> dict: - """ Implements a measurement and returns a dictionary with the results""" + """Implements a measurement and returns a dictionary with the results""" raise NotImplementedError diff --git a/lcls_tools/common/measurements/screen_beam_profile_measurement.py b/lcls_tools/common/measurements/screen_beam_profile_measurement.py new file mode 100644 index 00000000..22e4c44a --- /dev/null +++ b/lcls_tools/common/measurements/screen_beam_profile_measurement.py @@ -0,0 +1,99 @@ +from lcls_tools.common.devices.screen import Screen +from lcls_tools.common.image_processing.image_processing import ImageProcessor +from lcls_tools.common.data_analysis.fit.projection import ProjectionFit +from lcls_tools.common.measurements.measurement import Measurement +import numpy as np + + +class ScreenBeamProfileMeasurement(Measurement): + """ + ScreenBeamProfileMeasurement class that allows for background subtraction and roi cropping + ------------------------ + Arguments: + name: str (name of measurement default is beam_profile), + device: Screen (device that will be performing the measurement), + image_processor: ImageProcessor () + fitting_tool: ProjectionFit () + fit_profile: bool = True () + ------------------------ + Methods: + single_measure: measures device and returns raw and processed image + measure: does multiple measurements and has an option to fit the image profiles + """ + + name: str = "beam_profile" + device: Screen + image_processor: ImageProcessor + fitting_tool: ProjectionFit + fit_profile: bool = True + # charge_window: Optional[ChargeWindow] = None + + def measure(self, n_shots: int = 1) -> dict: + """ + Measurement function that takes in n_shots as argumentment, where n_shots is the number of + image profiles (is this what I want to call it?) + we would like to measure. Invokes single_measure per shot + Then if fit_profile = True, fits the x and y projections of each image using the provided + fitting_tool. Results are stored in list of dictionaries where each element of the list + is results for the image fitting process stored as a dictionary. + ---- + Currently under work, what do we want to do with the images? + Needs dump_controller + + """ + images = [] + while len(images) < n_shots: + measurement = self.single_measure() + if len(measurement): + images += [measurement] + + results = {"images": images} + if self.fit_profile: + final_results = [] + for i, ele in enumerate(images): + temp = {} + temp["raw_image"] = ele["raw_image"] + temp["processed_image"] = ele["processed_image"] + + projection_x = np.array(np.sum(ele["processed_image"], axis=0)) + projection_y = np.array(np.sum(ele["processed_image"], axis=1)) + + for key, param in self.fitting_tool.fit_projection( + projection_x + ).items(): + key_x = key + "_x" + temp[key_x] = param + + for key, param in self.fitting_tool.fit_projection( + projection_y + ).items(): + key_y = key + "_y" + temp[key_y] = param + final_results += [temp] + + # what should I do with results now? + results["fits"] = final_results + + # no attribute dump controller + if self.save_data: + pass + # self.dump_controller.dump_data_to_file(final_results, self) + + return final_results + + def single_measure(self) -> dict: + """ + Function that grabs a single image from the device class + (typically live beam images) and passes it to the + image processing class for processing (subtraction and cropping) + returns a dictionary with both the raw and processed dictionary + """ + # measure profiles + # get raw data + raw_image = self.device.image + # get ICT measurements and return None if not in window + # if self.charge_window is not None: + # if not self.charge_window.in_window(): + # return {} + processed_image = self.image_processor.process(raw_image) + return {"raw_image": raw_image, "processed_image": processed_image} diff --git a/tests/datasets/fit/make_test_guassian.py b/tests/datasets/fit/make_test_guassian.py new file mode 100644 index 00000000..7ad4a63e --- /dev/null +++ b/tests/datasets/fit/make_test_guassian.py @@ -0,0 +1,15 @@ +import numpy as np +from scipy.stats import norm + +# import matplotlib.pyplot as plt + +# Generating 1000 point gaussian distribution +data = np.random.normal(size=1000) +fit = norm.pdf(data) + +# Plotting distribution +# plt.plot(data,fit, '.') +# plt.show() + +# Saving distribution +np.save("test_gaussian.npy", data) diff --git a/tests/datasets/fit/test_gaussian.npy b/tests/datasets/fit/test_gaussian.npy new file mode 100644 index 00000000..a2026819 Binary files /dev/null and b/tests/datasets/fit/test_gaussian.npy differ diff --git a/tests/datasets/h5py/test_image.h5 b/tests/datasets/h5py/test_image.h5 new file mode 100644 index 00000000..601a01fc Binary files /dev/null and b/tests/datasets/h5py/test_image.h5 differ diff --git a/tests/datasets/images/numpy/test_roi_fill_value_image.npy b/tests/datasets/images/numpy/test_roi_fill_value_image.npy new file mode 100644 index 00000000..9a28bf7f Binary files /dev/null and b/tests/datasets/images/numpy/test_roi_fill_value_image.npy differ diff --git a/tests/datasets/images/numpy/test_roi_image.npy b/tests/datasets/images/numpy/test_roi_image.npy new file mode 100644 index 00000000..d9fe6e67 Binary files /dev/null and b/tests/datasets/images/numpy/test_roi_image.npy differ diff --git a/tests/unit_tests/lcls_tools/common/data_analysis/fit/test_methods.py b/tests/unit_tests/lcls_tools/common/data_analysis/fit/test_methods.py new file mode 100644 index 00000000..4d5b3bf2 --- /dev/null +++ b/tests/unit_tests/lcls_tools/common/data_analysis/fit/test_methods.py @@ -0,0 +1,97 @@ +from lcls_tools.common.data_analysis.fit.methods import GaussianModel +import numpy as np +import unittest +import os + + +class TestGaussianModel(unittest.TestCase): + def setUp(self) -> None: + self.data_location = "./tests/datasets/fit/" + self.data_filename = os.path.join(self.data_location, "test_gaussian.npy") + self.data = np.load(self.data_filename) + self.gaussian_model = GaussianModel() + self.gaussian_model.profile_data = self.data + # Decimal place to check calculation errors to + self.decimals = 2 + return super().setUp() + + @unittest.skip + def test_find_init_values(self): + init_dict = self.gaussian_model.find_init_values() + self.assertIsNotNone(init_dict) + # TODO: pick up test here, the init dict is not as expected + self.assertAlmostEqual(init_dict["mean"], -0.07874, places=self.decimals) + self.assertAlmostEqual(init_dict["sigma"], 1.0, places=self.decimals) + return + + @unittest.skip + def test_find_priors(self): + priors_dict = self.gaussian_model.find_priors() + self.assertIsNotNone(priors_dict) + return # TODO: flesh out this test + + @unittest.skip + def test_forward(self): + init_dict = self.gaussian_model.find_init_values() + fit = self.gaussian_model.forward(self.data, init_dict) + # TODO: better way to check this? w/o hardcode? + self.assertAlmostEqual(fit.max(), 0.4, places=self.decimals) + self.assertAlmostEqual(fit.min(), 0.0, places=self.decimals) + return + + # def super_gaussian(self, x, amp, mu, sig, P, offset): + # """Super Gaussian Function""" + # """Degree of P related to flatness of curve at peak""" + # return amp * np.exp((-abs(x - mu) ** (P)) / (2 * sig ** (P))) + offset + # def double_gaussian(self, x, amp, mu, sig, amp2, nu, rho, offset): + # return ( + # amp * np.exp(-np.power(x - mu, 2.0) / (2 * np.power(sig, 2.0))) + # + amp2 * np.exp(-np.power(x - nu, 2.0) / (2 * np.power(rho, 2.0))) + # ) + offset + # @unittest.skip("Assertion is not raised when it should be; fixing in issue #130.") + # def test_fit_tool_gaussian(self): + # # Test that the fitting tool can fit each type of gaussian distribution + # x_data = np.arange(500) + # # generated data for pure gaussian + # test_params = [3, 125, 45, 1.5] + # y_data = self.gaussian(x_data, *test_params) + # y_noise = np.random.normal(size=len(x_data), scale=0.04) + # y_test = y_data + y_noise + # fitting_tool = FittingTool(data=y_test) + # fits = fitting_tool.get_fit() + # self.assertIsInstance(fits, dict) + # for key, val in fits.items(): + # self.assertIsInstance(val, dict) + # self.assertIn("rmse", val) + # self.assertLessEqual(val["rmse"], 0.4) + # @unittest.skip("Assertion is not raised when it should be; fixing in issue #130.") + # def test_fit_tool_super_gaussian(self): + # x_data = np.arange(500) + # y_noise = np.random.normal(size=len(x_data), scale=0.04) + # test_params_super_gauss = [4, 215, 75, 4, 1] + # y_data_super_gauss = self.super_gaussian(x_data, *test_params_super_gauss) + # y_test_super_gauss = y_data_super_gauss + y_noise + # super_gauss_fitting_tool = FittingTool(data=y_test_super_gauss) + # s_fits = super_gauss_fitting_tool.get_fit() + # self.assertIsInstance(s_fits, dict) + # for key, val in s_fits.items(): + # self.assertIsInstance(val, dict) + # self.assertIn("rmse", val) + # if key == "super_gaussian": + # self.assertLessEqual(val["rmse"], 0.4) + # @unittest.skip("Assertion is not raised when it should be; fixing in issue #130.") + # def test_fit_tool_double_gaussian(self): + # # generated data for super gaussian + # x_data = np.arange(500) + # y_noise = np.random.normal(size=len(x_data), scale=0.04) + # test_params_double_gauss = [2, 100, 25, 10, 240, 25, 2] + # y_data_double_gauss = self.double_gaussian(x_data, *test_params_double_gauss) + # y_test_double_gauss = y_data_double_gauss + 3 * y_noise + # double_gauss_fitting_tool = FittingTool(data=y_test_double_gauss) + # d_fits = double_gauss_fitting_tool.get_fit() + # self.assertIsInstance(d_fits, dict) + # for key, val in d_fits.items(): + # self.assertIsInstance(val, dict) + # self.assertIn("rmse", val) + # if key == "double_gaussian": + # self.assertLessEqual(val["rmse"], 0.8) diff --git a/tests/unit_tests/lcls_tools/common/data_analysis/fit/test_projection.py b/tests/unit_tests/lcls_tools/common/data_analysis/fit/test_projection.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit_tests/lcls_tools/common/data_analysis/test_fit_gauss.py b/tests/unit_tests/lcls_tools/common/data_analysis/test_fit_gauss.py deleted file mode 100644 index 028f250b..00000000 --- a/tests/unit_tests/lcls_tools/common/data_analysis/test_fit_gauss.py +++ /dev/null @@ -1,71 +0,0 @@ -from lcls_tools.common.data_analysis.fitting_tool import FittingTool -import numpy as np -import unittest - - -class TestFitTool(unittest.TestCase): - def gaussian(self, x, amp, mu, sig, offset): - return amp * np.exp(-np.power(x - mu, 2.0) / (2 * np.power(sig, 2.0))) + offset - - def super_gaussian(self, x, amp, mu, sig, P, offset): - """Super Gaussian Function""" - """Degree of P related to flatness of curve at peak""" - return amp * np.exp((-abs(x - mu) ** (P)) / (2 * sig ** (P))) + offset - - def double_gaussian(self, x, amp, mu, sig, amp2, nu, rho, offset): - return ( - amp * np.exp(-np.power(x - mu, 2.0) / (2 * np.power(sig, 2.0))) - + amp2 * np.exp(-np.power(x - nu, 2.0) / (2 * np.power(rho, 2.0))) - ) + offset - - @unittest.skip("Assertion is not raised when it should be; fixing in issue #130.") - def test_fit_tool_gaussian(self): - # Test that the fitting tool can fit each type of gaussian distribution - x_data = np.arange(500) - - # generated data for pure gaussian - test_params = [3, 125, 45, 1.5] - y_data = self.gaussian(x_data, *test_params) - y_noise = np.random.normal(size=len(x_data), scale=0.04) - y_test = y_data + y_noise - - fitting_tool = FittingTool(data=y_test) - fits = fitting_tool.get_fit() - self.assertIsInstance(fits, dict) - for key, val in fits.items(): - self.assertIsInstance(val, dict) - self.assertIn("rmse", val) - self.assertLessEqual(val["rmse"], 0.4) - - @unittest.skip("Assertion is not raised when it should be; fixing in issue #130.") - def test_fit_tool_super_gaussian(self): - x_data = np.arange(500) - y_noise = np.random.normal(size=len(x_data), scale=0.04) - test_params_super_gauss = [4, 215, 75, 4, 1] - y_data_super_gauss = self.super_gaussian(x_data, *test_params_super_gauss) - y_test_super_gauss = y_data_super_gauss + y_noise - super_gauss_fitting_tool = FittingTool(data=y_test_super_gauss) - s_fits = super_gauss_fitting_tool.get_fit() - self.assertIsInstance(s_fits, dict) - for key, val in s_fits.items(): - self.assertIsInstance(val, dict) - self.assertIn("rmse", val) - if key == "super_gaussian": - self.assertLessEqual(val["rmse"], 0.4) - - @unittest.skip("Assertion is not raised when it should be; fixing in issue #130.") - def test_fit_tool_double_gaussian(self): - # generated data for super gaussian - x_data = np.arange(500) - y_noise = np.random.normal(size=len(x_data), scale=0.04) - test_params_double_gauss = [2, 100, 25, 10, 240, 25, 2] - y_data_double_gauss = self.double_gaussian(x_data, *test_params_double_gauss) - y_test_double_gauss = y_data_double_gauss + 3 * y_noise - double_gauss_fitting_tool = FittingTool(data=y_test_double_gauss) - d_fits = double_gauss_fitting_tool.get_fit() - self.assertIsInstance(d_fits, dict) - for key, val in d_fits.items(): - self.assertIsInstance(val, dict) - self.assertIn("rmse", val) - if key == "double_gaussian": - self.assertLessEqual(val["rmse"], 0.8) diff --git a/tests/unit_tests/lcls_tools/common/image_processing/test_image_processing.py b/tests/unit_tests/lcls_tools/common/image_processing/test_image_processing.py index 5f6b8e57..3276574e 100644 --- a/tests/unit_tests/lcls_tools/common/image_processing/test_image_processing.py +++ b/tests/unit_tests/lcls_tools/common/image_processing/test_image_processing.py @@ -1,88 +1,63 @@ import unittest -import os import numpy as np -import lcls_tools.common.image_processing as ip -from lcls_tools.common.matlab2py.mat_image import MatImage - -CAMERA = "CAMR:LGUN:210" - - -class ImageProcessingTest(unittest.TestCase): - data_location: str = "/tests/datasets/images/matlab/" - - def setUp(self): - self.file = os.path.join(self.data_location, "test_image.mat") - try: - if not os.path.isfile(self.file): - raise FileNotFoundError(f"Could not find {self.file}, aborting test.") - except FileNotFoundError: - self.skipTest("Invalid dataset location") - """Use test image""" - self.MI = MatImage() - self.MI.load_mat_image("test_image.mat") - - def test_fliplr(self): - """Test that fliplr does the right thing""" - col_init = self.MI.image[:, 0] - col_final = ip.fliplr(self.MI.image)[:, -1] - self.assertEqual(np.array_equal(col_init, col_final), True) - - def test_flipud(self): - """Test that flipud does the right thing""" - row_init = self.MI.image[0] - row_final = ip.flipud(self.MI.image)[-1] - self.assertEqual(np.array_equal(row_init, row_final), True) - - def test_center_of_mass(self): - """Test that we get correct x and y centroids""" - (x1, y1) = ip.center_of_mass(self.MI.image) - self.assertEqual((int(x1), int(y1)), (522, 669)) - (x2, y2) = ip.center_of_mass(self.MI.image, sigma=2) - self.assertEqual((int(x2), int(y2)), (540, 660)) - - def test_average_image(self): - """Test that we can average a number of images""" - images = [] - while len(images) < 10: - images.append(self.MI.image) - - ave_image = ip.average_image(images) - self.assertEqual(np.array_equal(ave_image, self.MI.image), True) - - def test_shape_image(self): - """Test that we can reshape our ndarray""" - self.assertEqual(self.MI.image.shape, (1024, 1392)) - image = ip.shape_image(self.MI.image, 16, 89088) - self.assertEqual(image.shape, (89088, 16)) - - def test_x_projection(self): - """Test we get expected value for x projection""" - x_proj = ip.x_projection(self.MI.image) - self.assertEqual(x_proj.sum(), 9792279) - self.assertEqual(int(x_proj.mean()), 7034) - self.assertEqual(int(x_proj.std()), 10005) - - def test_y_projection(self): - """Test that we get expected value for y projection""" - y_proj = ip.y_projection(self.MI.image) - self.assertEqual(y_proj.sum(), 9623943) - self.assertEqual(int(y_proj.mean()), 9398) - self.assertEqual(int(y_proj.std()), 10398) - - def test_gauss_func(self): - """Test we get correct value for a gaussian evaluation""" - ans = ip.gauss_func(1.0, 2.0, 3.0, 4.0) - self.assertEqual(round(ans, 2), 1.76) - - def test_gauss_fit(self): - """Test that we get the correct gaussian fit parameters""" - x_proj = ip.x_projection(self.MI.image) - y_proj = ip.y_projection(self.MI.image) - _, a_x, x0_x, sigma_x = ip.gauss_fit(x_proj) - _, a_y, y0_y, sigma_y = ip.gauss_fit(y_proj) - self.assertEqual(int(a_x), 30373) - self.assertEqual(int(x0_x), 660) - self.assertEqual(int(sigma_x), 124) - self.assertEqual(int(a_y), 29528) - self.assertEqual(int(y0_y), 541) - self.assertEqual(int(sigma_y), 127) +from lcls_tools.common.image_processing.image_processing import ImageProcessor +from lcls_tools.common.image_processing.roi import RectangularROI + + +class TestImageProcessing(unittest.TestCase): + data_location: str = "tests/datasets/images/numpy/" + + def __init__(self, methodName: str = "runTest") -> None: + super().__init__(methodName) + self.center = [400, 400] + self.size = (800, 800) + self.widths = (300, 300) + self.radius = 50 + self.image = np.load(self.data_location + "test_roi_image.npy") + + def test_process(self): + """ + Given an np.ndarray and roi process + and assert the return in an np.ndarray + """ + image_processor = ImageProcessor() + image = image_processor.process(self.image) + assert isinstance(image, np.ndarray) + roi = RectangularROI( + center=self.center, xwidth=self.widths[0], ywidth=self.widths[1] + ) + image_processor = ImageProcessor(roi=roi) + image = image_processor.process(self.image) + assert isinstance(image, np.ndarray) + # TODO:run coverage + + def test_subtract_background(self): + """ + Given an np.ndarray, check that when the image_processor + is passed a background_image. the subtract_background function + call subtracts the returns an np.ndarray + that is the difference between the two np.ndarrays + """ + background_image = np.ones(self.size) + image_processor = ImageProcessor(background_image=background_image) + image = image_processor.subtract_background(self.image) + assert image.all() == (self.image - 1).all() + + """ + Given an np.ndarray check that when the image_processor + is passed a threshold check that subtraction occurs correctly + """ + image_processor = ImageProcessor(threshold=1) + image = image_processor.subtract_background(self.image) + assert image.all() == (self.image - 1).all() + + def test_clip(self): + """ + Given an np.ndarray check that when the image_processor + is passed a threshold check that the np.ndarray elements + are clipped at to not drop below zero + """ + image_processor = ImageProcessor(threshold=100) + image = image_processor.subtract_background(self.image) + clipped_image = image_processor.clip_image(image) + assert clipped_image.all() >= np.zeros(self.size).all() diff --git a/tests/unit_tests/lcls_tools/common/image_processing/test_roi.py b/tests/unit_tests/lcls_tools/common/image_processing/test_roi.py new file mode 100644 index 00000000..9acc01e2 --- /dev/null +++ b/tests/unit_tests/lcls_tools/common/image_processing/test_roi.py @@ -0,0 +1,54 @@ +import unittest +import numpy as np +from lcls_tools.common.image_processing.roi import RectangularROI, CircularROI + + +class TestROI(unittest.TestCase): + data_location: str = "tests/datasets/images/numpy/" + + def __init__(self, methodName: str = "runTest") -> None: + super().__init__(methodName) + self.center = [400, 400] + self.size = (800, 800) + self.widths = (300, 300) + self.radius = 50 + + self.image = np.load(self.data_location + "test_roi_image.npy") + self.filled_value_test_image = np.load( + self.data_location + "test_roi_fill_value_image.npy" + ) + + def test_circular_roi_crop_image(self): + """ + Given a circular ROI and image, + test that image has correct size after cropping + (size of roi) + """ + circular = CircularROI(center=self.center, radius=self.radius) + cropped_image = circular.crop_image(self.image) + assert cropped_image.shape[0] == 2 * self.radius + assert cropped_image.shape[1] == 2 * self.radius + + def test_fill_value_outside_circle(self): + """ + Given a test image change the pixel values + of all elements of the image outside a radius of 10 to zero + Test that this image is now exactly equal to the test filled value image + """ + circular = CircularROI(center=self.center, radius=self.radius) + image = circular.fill_value_outside_circle( + img=self.image, center=self.center, radius=10, fill_value=0 + ) + assert image.all() == self.filled_value_test_image.all() + + def test_rectangular_roi_crop_image(self): + """ + Given a rectangular ROI and image, + test that image has correct size after cropping + (size of roi) + """ + rectangular = RectangularROI( + center=self.center, xwidth=self.widths[0], ywidth=self.widths[1] + ) + cropped_image = rectangular.crop_image(self.image) + assert cropped_image.shape == self.widths diff --git a/tests/unit_tests/lcls_tools/common/measurements/__init__.py b/tests/unit_tests/lcls_tools/common/measurements/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit_tests/lcls_tools/common/measurements/test_screen_beam_profile.py b/tests/unit_tests/lcls_tools/common/measurements/test_screen_beam_profile.py new file mode 100644 index 00000000..6820d7ab --- /dev/null +++ b/tests/unit_tests/lcls_tools/common/measurements/test_screen_beam_profile.py @@ -0,0 +1,120 @@ +from lcls_tools.common.measurements.screen_beam_profile_measurement import ( + ScreenBeamProfileMeasurement, +) +from lcls_tools.common.data_analysis.fit.projection import ProjectionFit +from lcls_tools.common.data_analysis.fit.methods import GaussianModel +from lcls_tools.common.image_processing.image_processing import ImageProcessor +from lcls_tools.common.image_processing.roi import RectangularROI +from lcls_tools.common.devices.device import Metadata +from lcls_tools.common.devices.screen import ( + ScreenControlInformation, + ScreenPVSet, + Screen, +) +import numpy as np +import unittest + + +class ScreenTest(Screen): + @property + def image(self) -> np.ndarray: + return self._image + + @image.setter + def image(self, image): + self._image = image + + +class TestScreenBeamProfileMeasurement(unittest.TestCase): + def __init__(self, methodName: str = "runTest") -> None: + super().__init__(methodName) + self.test_instantiate_pydantic_objects() + + def create_test_image(self, size: tuple, center: list, radius: int): + # make img that is a circle in the center of the image with known + # standard dev and mean. no imports, no calls to external or + # internal files. + image = np.zeros(size) + for y in range(image.shape[0]): + for x in range(image.shape[1]): + distance = np.sqrt((x - center[0]) ** 2 + (y - center[1]) ** 2) + if distance < radius: + image[y, x] = 1 + return image + + def test_instantiate_pydantic_objects(self): + # creating image processing class + self.radius = 50 + self.size = (800, 800) + self.center = [400, 400] + self.xwidth = 300 + self.ywidth = 300 + self.means = [150, 150] + self.sigmas = [30, 30] + self.amplitude = [99, 99] + self.offsets = [1, 1] + self.roi = RectangularROI(center=[400, 400], xwidth=300, ywidth=300) + self.image_processor = ImageProcessor(roi=self.roi) + + self.pvs = { + "image": "ArrayData", + "n_bits": "N_OF_BITS", + "n_col": "Image:ArraySize1_RBV", + "n_row": "Image:ArraySize0_RBV", + "resolution": ":RESOLUTION", + } + self.metadata = { + "area": "TEST", + "beam_path": ["SC_TEST"], + "sum_l_meters": 99.99, + } + self.control_name = "OTRS:TEST:650:" + + self.screen_pvs = ScreenPVSet(**self.pvs) + self.meta_data = Metadata(**self.metadata) + self.controls_information = ScreenControlInformation( + control_name=self.control_name, PVs=self.screen_pvs + ) + self.screen_test = ScreenTest( + controls_information=self.controls_information, metadata=self.metadata + ) + self.screen_test.image = self.create_test_image( + size=self.size, center=self.center, radius=self.radius + ) + self.gauss_model = GaussianModel() + self.projection = ProjectionFit(model=self.gauss_model) + + self.screen_beam_profile_measurement = ScreenBeamProfileMeasurement( + name=self.control_name, + device=self.screen_test, + image_processor=self.image_processor, + fitting_tool=self.projection, + ) + + def test_single_measure(self): + perform_single_measure = self.screen_beam_profile_measurement.single_measure() + self.assertIsInstance(perform_single_measure, dict) + assert len(perform_single_measure) == 2 + + def test_measure(self): + perform_measure = self.screen_beam_profile_measurement.measure() + self.assertIsInstance(perform_measure, list) + self.assertIsInstance(perform_measure[0], dict) + if isinstance(self.gauss_model, GaussianModel): + for key, val in perform_measure[0].items(): + if key == "amplitude_x": + self.assertTrue(90 <= val <= 100) + elif key == "amplitude_y": + self.assertTrue(90 <= val <= 100) + elif key == "mean_x": + self.assertTrue(140 <= val <= 160) + elif key == "mean_y": + self.assertTrue(140 <= val <= 160) + elif key == "sigma_x": + self.assertTrue(20 <= val <= 40) + elif key == "sigma_y": + self.assertTrue(20 <= val <= 40) + elif key == "offset_x": + self.assertTrue(0 <= val <= 1) + elif key == "offset_y": + self.assertTrue(0 <= val <= 1)