Skip to content

Commit

Permalink
Fitting Base Class and Gaussian Model from PR 143 (#172)
Browse files Browse the repository at this point in the history
* Added MethodBase as an abstract class for all fit methods
Added GaussianModel class using MethodBase
Added ProjectionFit class
Added tests for all the above including datasets
Separated from 8c85a56 on phys-cgarnier/lcls-tools/dev

* method_base.py flake8

* changed code base to be more pydantic this cleans up some things involving params, might have to remove priors still but its working

* updated fit code to incorporate pydantic structures for method parameters

* made changes as requested in the comment (minus removing _forward)

* flake8

* committing GaussianFIt with property that returns beamsize

* checked that GaussianFit works (roughly) and set processor and fit to have a default field, also set method in projection_fit.py to have a default file. This will make using the tool much easier. All that has to be down is call gaussian fit and give it an image to use

* committing linted gaussian_fit.py

* removed empty test for projection

* more flake8

---------

Co-authored-by: Chris Garnier <cgarnier@slac.stanford.edu>
  • Loading branch information
eloiseyang and phys-cgarnier committed Aug 2, 2024
1 parent b6949b6 commit c950303
Show file tree
Hide file tree
Showing 8 changed files with 597 additions and 71 deletions.
126 changes: 126 additions & 0 deletions lcls_tools/common/data_analysis/fit/gaussian_fit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import numpy as np
from pydantic import BaseModel, ConfigDict, PositiveFloat
from typing import Optional
from lcls_tools.common.data_analysis.fit.projection import ProjectionFit
from lcls_tools.common.image.processing import ImageProcessor


class GaussianFit(BaseModel):
# should rename to BeamsizeEvaluator or something ?
"""
Gaussian Fitting class that takes in a fitting tool (with method),
ImageProcessor, and Image and returns the beamsize and position.
"""

model_config = ConfigDict(arbitrary_types_allowed=True)
processor: ImageProcessor = ImageProcessor()
fit: ProjectionFit = ProjectionFit()
min_log_intensity: float = 4.0
bounding_box_half_width: PositiveFloat = 3.0
apply_bounding_box_constraint: bool = True
image: Optional[np.ndarray] = None

@property
def beamsize(self):
beamspot_chars = self.gaussian_fit(self.image)
beamsize = self.calculate_beamsize(beamspot_chars)
return beamsize

def gaussian_fit(self, image):
self.processor.auto_process(image)
x_proj, y_proj = self.get_projections(image)
x_parameters = self.fit.fit_projection(x_proj)
y_parameters = self.fit.fit_projection(y_proj)

return {
"centroid": np.array((x_parameters['mean'],
y_parameters['mean'])),
"rms_sizes": np.array((x_parameters['sigma'],
y_parameters['sigma'])),
"total_intensity": image.sum(),
}

def get_projections(self, image):
x_projection = np.array(np.sum(image, axis=0))
y_projection = np.array(np.sum(image, axis=1))
return x_projection, y_projection

def calculate_beamsize(self, beamspot_chars: dict):
'''
Conditional beamsize calculation:
if condition1 (total_intensity is to small): then return NULL result.
else: do beamsize calculation and check that the beamspot is
within the ROI
'''
if (np.log10(beamspot_chars['total_intensity'])
< self.min_log_intensity):
# TODO: character count is really not liking this one line
print(("log10 image intensity"
+ f"{np.log10(beamspot_chars['total_intensity'])}"
+ "below threshold"))
result = {
"Cx": np.NaN,
"Cy": np.NaN,
"Sx": np.NaN,
"Sy": np.NaN,
"bb_penalty": np.NaN,
"total_intensity": beamspot_chars['total_intensity'],
}
return result
else:
centroid = beamspot_chars['centroid']
sizes = beamspot_chars['rms_sizes']

if np.all(~np.isnan(np.stack((centroid, sizes)))):
# get beam region bounding box
n_stds = self.bounding_box_half_width

bounding_box_corner_pts = np.array(
(
centroid - n_stds * sizes,
centroid + n_stds * sizes,
centroid - n_stds * sizes * np.array((-1, 1)),
centroid + n_stds * sizes * np.array((-1, 1)),
)
)

if self.processor.roi is not None:
roi_radius = self.processor.roi.radius
else:
roi_radius = np.min(np.array(self.image.shape) / 1.5)
# TODO: maybe need to change this ^^^

# This is a check whether or not the beamspot is within
# the bounding box.
temp = bounding_box_corner_pts - np.array((roi_radius,
roi_radius))
distances = np.linalg.norm(temp, axis=1)
bounding_box_penalty = np.max(distances) - roi_radius

result = {
"Cx": centroid[0],
"Cy": centroid[1],
"Sx": sizes[0],
"Sy": sizes[1],
"bb_penalty": bounding_box_penalty,
"total_intensity": beamspot_chars["total_intensity"]
}

# set results to none if the beam extends beyond the roi
# and the bounding box constraint is active
if (bounding_box_penalty > 0
and self.apply_bounding_box_constraint):
for name in ["Cx", "Cy", "Sx", "Sy"]:
result[name] = np.NaN

else:
result = {
"Cx": np.NaN,
"Cy": np.NaN,
"Sx": np.NaN,
"Sy": np.NaN,
"bb_penalty": np.NaN,
"total_intensity": beamspot_chars["total_intensity"]
}

return result
148 changes: 148 additions & 0 deletions lcls_tools/common/data_analysis/fit/method_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
from abc import ABC, abstractmethod
import numpy as np

from pydantic import BaseModel, ConfigDict
from scipy.stats import rv_continuous


class Parameter(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
bounds: list
initial_value: float = None
prior: rv_continuous = None


class ModelParameters(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
name: str
parameters: dict[str, Parameter]

@property
def bounds(self):
return np.vstack(
[
np.array(parameter.bounds)
for parameter in self.parameters.values()
]
)

@property
def initial_values(self):
return np.array(
[parameter.initial_value
for parameter in self.parameters.values()]
)

@initial_values.setter
def initial_values(self, initial_values: dict[str, float]):
for parameter, initial_value in initial_values.items():
self.parameters[parameter].initial_value = initial_value

@property
def priors(self):
return np.array(
[self.parameters[parameter].prior for parameter in self.parameters]
)

@priors.setter
def priors(self, priors: dict[str, float]):
for parameter, prior in priors.items():
self.parameters[parameter].prior = prior


# TODO: define properties

class MethodBase(ABC):
"""
Base abstract class for all fit methods, which serves as the bare minimum
skeleton code needed. Should be used only as a parent class to all method
models.
---------------------------
Arguments:
param_names: list (list all of param names that the model will contain)
param_guesses: np.ndarray (array that contains a guess as to what
each param value is organized in the same order as param_names)
param_bounds: np.ndarray (array that contains the lower
and upper bound on for acceptable values of each parameter)
"""

parameters: ModelParameters = None

@abstractmethod
def find_init_values(self) -> list:
...

@abstractmethod
def find_priors(self, data: np.ndarray) -> dict:
...

def forward(
self, x: np.ndarray, method_parameter_dict: dict[str, float]
) -> np.ndarray:
method_parameter_list = np.array(
[
method_parameter_dict[parameter_name]
for parameter_name in self.parameters.parameters
]
)
return self._forward(x, method_parameter_list)

@staticmethod
@abstractmethod
def _forward(x: np.ndarray, params: np.ndarray) -> np.ndarray:
...

def log_prior(self, method_parameter_dict: dict[str, rv_continuous]):
method_parameter_list = np.array(
[
method_parameter_dict[parameter_name]
for parameter_name in self.parameters.parameters
]
)
return self._log_prior(method_parameter_list)

@abstractmethod
def _log_prior(self, params: np.ndarray):
...

def log_likelihood(self, x: np.ndarray, y: np.ndarray,
method_parameter_dict: dict):
method_parameter_list = np.array(
[
method_parameter_dict[parameter_name]
for parameter_name in self.parameters.parameters
]
)
return self._log_likelihood(x, y, method_parameter_list)

def _log_likelihood(
self, x: np.ndarray, y: np.ndarray, method_parameter_list: np.ndarray
):
return -np.sum((y - self._forward(x, method_parameter_list)) ** 2)

def loss(
self,
method_parameter_list: np.ndarray,
x: np.ndarray,
y: np.ndarray,
use_priors: bool = False,
):
loss_temp = -self._log_likelihood(x, y, method_parameter_list)
if use_priors:
loss_temp = loss_temp - self._log_prior(method_parameter_list)
return loss_temp

@property
def profile_data(self):
"""1D array typically projection data"""
return self._profile_data

@profile_data.setter
def profile_data(self, profile_data):
if not isinstance(profile_data, np.ndarray):
raise TypeError("Input must be ndarray")
self._profile_data = profile_data

self.find_init_values()
self.find_priors()
self.fitted_params_dict = {}
103 changes: 103 additions & 0 deletions lcls_tools/common/data_analysis/fit/methods.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import numpy as np
from scipy.stats import norm, gamma
from lcls_tools.common.data_analysis.fit.method_base import (
MethodBase,
ModelParameters,
Parameter,
)


gaussian_parameters = ModelParameters(
name="Gaussian Parameters",
parameters={
"mean": Parameter(bounds=[0.01, 1.0]),
"sigma": Parameter(bounds=[0.01, 5.0]),
"amplitude": Parameter(bounds=[0.01, 1.0]),
"offset": Parameter(bounds=[0.01, 1.0]),
},
)


class GaussianModel(MethodBase):
"""
GaussianModel Class that finds initial parameter values for gaussian
distribution and builds probability density functions for the
likelyhood a parameter to be that value based on those initial
parameter values. Passing this class the variable profile_data
automatically updates the initial values and and probability
density functions to match that data.
"""

parameters = gaussian_parameters

def find_init_values(self) -> dict:
"""Fit data without optimization, return values."""

data = self._profile_data
x = np.linspace(0, 1, len(data))
offset = data.min() + 0.01
amplitude = data.max() - offset

weighted_mean = np.average(x, weights=data)
weighted_sigma = np.sqrt(np.cov(x, aweights=data))

init_values = {
"mean": weighted_mean,
"sigma": weighted_sigma,
"amplitude": amplitude,
"offset": offset,
}
self.parameters.initial_values = init_values
return init_values

def find_priors(self) -> dict:
"""
Do initial guesses based on data and make distribution from that guess.
"""
# TODO: profile data setter method cals this and find values.
# should remove call to find values in that method.
# but since priors aren't supposed to be included (i think?)
# in this PR I will not update this.
init_values = self.find_init_values()
amplitude_mean = init_values["amplitude"]
amplitude_var = 0.05
amplitude_alpha = (amplitude_mean**2) / amplitude_var
amplitude_beta = amplitude_mean / amplitude_var
amplitude_prior = gamma(amplitude_alpha, loc=0,
scale=1 / amplitude_beta)

# Creating a normal distribution of points around the inital mean.
mean_prior = norm(init_values["mean"], 0.1)
sigma_alpha = 2.5
sigma_beta = 5.0
sigma_prior = gamma(sigma_alpha, loc=0, scale=1 / sigma_beta)

# Creating a normal distribution of points around initial offset.
offset_prior = norm(init_values["offset"], 0.5)
priors = {
"mean": mean_prior,
"sigma": sigma_prior,
"amplitude": amplitude_prior,
"offset": offset_prior,
}

self.parameters.priors = priors
return priors

def _forward(self, x: np.array, method_parameter_list: np.array):
# Load distribution parameters
# needs to be array for scipy.minimize
mean = method_parameter_list[0]
sigma = method_parameter_list[1]
amplitude = method_parameter_list[2]
offset = method_parameter_list[3]
return (np.sqrt(2 * np.pi) * amplitude
* norm.pdf((x - mean) / sigma) + offset)

def _log_prior(self, method_parameter_list: np.ndarray) -> float:
return np.sum(
[
prior.logpdf(method_parameter_list[i])
for i, prior in enumerate(self.parameters.priors)
]
)
Loading

0 comments on commit c950303

Please sign in to comment.