-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fitting Base Class and Gaussian Model from PR 143 #172
Changes from all commits
37567e7
fda0b90
318397a
640010c
2a47d30
8a38fed
06444bf
a34fbf9
5d4a149
387fae4
eb32f59
59bf2ae
8b23286
8dbafca
b70e68c
cb1cec9
dfe72bc
ca54c90
6090968
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
import numpy as np | ||
from pydantic import BaseModel, ConfigDict, PositiveFloat | ||
from typing import Optional | ||
from lcls_tools.common.data_analysis.fit.projection import ProjectionFit | ||
from lcls_tools.common.image.processing import ImageProcessor | ||
|
||
|
||
class GaussianFit(BaseModel): | ||
# should rename to BeamsizeEvaluator or something ? | ||
""" | ||
Gaussian Fitting class that takes in a fitting tool (with method), | ||
ImageProcessor, and Image and returns the beamsize and position. | ||
""" | ||
|
||
model_config = ConfigDict(arbitrary_types_allowed=True) | ||
processor: ImageProcessor = ImageProcessor() | ||
fit: ProjectionFit = ProjectionFit() | ||
min_log_intensity: float = 4.0 | ||
bounding_box_half_width: PositiveFloat = 3.0 | ||
apply_bounding_box_constraint: bool = True | ||
image: Optional[np.ndarray] = None | ||
|
||
@property | ||
def beamsize(self): | ||
beamspot_chars = self.gaussian_fit(self.image) | ||
beamsize = self.calculate_beamsize(beamspot_chars) | ||
return beamsize | ||
|
||
def gaussian_fit(self, image): | ||
self.processor.auto_process(image) | ||
x_proj, y_proj = self.get_projections(image) | ||
x_parameters = self.fit.fit_projection(x_proj) | ||
y_parameters = self.fit.fit_projection(y_proj) | ||
|
||
return { | ||
"centroid": np.array((x_parameters['mean'], | ||
y_parameters['mean'])), | ||
"rms_sizes": np.array((x_parameters['sigma'], | ||
y_parameters['sigma'])), | ||
"total_intensity": image.sum(), | ||
} | ||
|
||
def get_projections(self, image): | ||
x_projection = np.array(np.sum(image, axis=0)) | ||
y_projection = np.array(np.sum(image, axis=1)) | ||
return x_projection, y_projection | ||
|
||
def calculate_beamsize(self, beamspot_chars: dict): | ||
''' | ||
Conditional beamsize calculation: | ||
if condition1 (total_intensity is to small): then return NULL result. | ||
else: do beamsize calculation and check that the beamspot is | ||
within the ROI | ||
''' | ||
if (np.log10(beamspot_chars['total_intensity']) | ||
< self.min_log_intensity): | ||
# TODO: character count is really not liking this one line | ||
print(("log10 image intensity" | ||
+ f"{np.log10(beamspot_chars['total_intensity'])}" | ||
+ "below threshold")) | ||
result = { | ||
"Cx": np.NaN, | ||
"Cy": np.NaN, | ||
"Sx": np.NaN, | ||
"Sy": np.NaN, | ||
"bb_penalty": np.NaN, | ||
"total_intensity": beamspot_chars['total_intensity'], | ||
} | ||
return result | ||
else: | ||
centroid = beamspot_chars['centroid'] | ||
sizes = beamspot_chars['rms_sizes'] | ||
|
||
if np.all(~np.isnan(np.stack((centroid, sizes)))): | ||
# get beam region bounding box | ||
n_stds = self.bounding_box_half_width | ||
|
||
bounding_box_corner_pts = np.array( | ||
( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we be more descriptive than pts? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, I will rename it |
||
centroid - n_stds * sizes, | ||
centroid + n_stds * sizes, | ||
centroid - n_stds * sizes * np.array((-1, 1)), | ||
centroid + n_stds * sizes * np.array((-1, 1)), | ||
) | ||
) | ||
|
||
if self.processor.roi is not None: | ||
roi_radius = self.processor.roi.radius | ||
else: | ||
roi_radius = np.min(np.array(self.image.shape) / 1.5) | ||
# TODO: maybe need to change this ^^^ | ||
|
||
# This is a check whether or not the beamspot is within | ||
# the bounding box. | ||
temp = bounding_box_corner_pts - np.array((roi_radius, | ||
roi_radius)) | ||
distances = np.linalg.norm(temp, axis=1) | ||
bounding_box_penalty = np.max(distances) - roi_radius | ||
|
||
result = { | ||
"Cx": centroid[0], | ||
"Cy": centroid[1], | ||
"Sx": sizes[0], | ||
"Sy": sizes[1], | ||
"bb_penalty": bounding_box_penalty, | ||
"total_intensity": beamspot_chars["total_intensity"] | ||
} | ||
|
||
# set results to none if the beam extends beyond the roi | ||
# and the bounding box constraint is active | ||
if (bounding_box_penalty > 0 | ||
and self.apply_bounding_box_constraint): | ||
for name in ["Cx", "Cy", "Sx", "Sy"]: | ||
result[name] = np.NaN | ||
|
||
else: | ||
result = { | ||
"Cx": np.NaN, | ||
"Cy": np.NaN, | ||
"Sx": np.NaN, | ||
"Sy": np.NaN, | ||
"bb_penalty": np.NaN, | ||
"total_intensity": beamspot_chars["total_intensity"] | ||
} | ||
|
||
return result |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This file in general still has a lot of the ML terminology and not enough comments to be descriptive. Maybe it's ok to leave it in for now, but comments should be added to priors, forward, likelyhood, and loss functions to be clear on what those terms are. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
from abc import ABC, abstractmethod | ||
import numpy as np | ||
|
||
from pydantic import BaseModel, ConfigDict | ||
from scipy.stats import rv_continuous | ||
|
||
|
||
class Parameter(BaseModel): | ||
model_config = ConfigDict(arbitrary_types_allowed=True) | ||
bounds: list | ||
initial_value: float = None | ||
prior: rv_continuous = None | ||
|
||
|
||
class ModelParameters(BaseModel): | ||
model_config = ConfigDict(arbitrary_types_allowed=True) | ||
name: str | ||
parameters: dict[str, Parameter] | ||
|
||
@property | ||
def bounds(self): | ||
return np.vstack( | ||
[ | ||
np.array(parameter.bounds) | ||
for parameter in self.parameters.values() | ||
] | ||
) | ||
|
||
@property | ||
def initial_values(self): | ||
return np.array( | ||
[parameter.initial_value | ||
for parameter in self.parameters.values()] | ||
) | ||
|
||
@initial_values.setter | ||
def initial_values(self, initial_values: dict[str, float]): | ||
for parameter, initial_value in initial_values.items(): | ||
self.parameters[parameter].initial_value = initial_value | ||
|
||
@property | ||
def priors(self): | ||
return np.array( | ||
[self.parameters[parameter].prior for parameter in self.parameters] | ||
) | ||
|
||
@priors.setter | ||
def priors(self, priors: dict[str, float]): | ||
for parameter, prior in priors.items(): | ||
self.parameters[parameter].prior = prior | ||
|
||
|
||
# TODO: define properties | ||
|
||
class MethodBase(ABC): | ||
""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we consider the name FitMethodBase here? To be more descriptive? |
||
Base abstract class for all fit methods, which serves as the bare minimum | ||
skeleton code needed. Should be used only as a parent class to all method | ||
models. | ||
--------------------------- | ||
Arguments: | ||
param_names: list (list all of param names that the model will contain) | ||
param_guesses: np.ndarray (array that contains a guess as to what | ||
each param value is organized in the same order as param_names) | ||
param_bounds: np.ndarray (array that contains the lower | ||
and upper bound on for acceptable values of each parameter) | ||
""" | ||
|
||
parameters: ModelParameters = None | ||
|
||
@abstractmethod | ||
def find_init_values(self) -> list: | ||
... | ||
|
||
@abstractmethod | ||
def find_priors(self, data: np.ndarray) -> dict: | ||
... | ||
|
||
def forward( | ||
self, x: np.ndarray, method_parameter_dict: dict[str, float] | ||
) -> np.ndarray: | ||
method_parameter_list = np.array( | ||
[ | ||
method_parameter_dict[parameter_name] | ||
for parameter_name in self.parameters.parameters | ||
] | ||
) | ||
return self._forward(x, method_parameter_list) | ||
|
||
@staticmethod | ||
@abstractmethod | ||
def _forward(x: np.ndarray, params: np.ndarray) -> np.ndarray: | ||
... | ||
|
||
def log_prior(self, method_parameter_dict: dict[str, rv_continuous]): | ||
method_parameter_list = np.array( | ||
[ | ||
method_parameter_dict[parameter_name] | ||
for parameter_name in self.parameters.parameters | ||
] | ||
) | ||
return self._log_prior(method_parameter_list) | ||
|
||
@abstractmethod | ||
def _log_prior(self, params: np.ndarray): | ||
... | ||
|
||
def log_likelihood(self, x: np.ndarray, y: np.ndarray, | ||
method_parameter_dict: dict): | ||
method_parameter_list = np.array( | ||
[ | ||
method_parameter_dict[parameter_name] | ||
for parameter_name in self.parameters.parameters | ||
] | ||
) | ||
return self._log_likelihood(x, y, method_parameter_list) | ||
|
||
def _log_likelihood( | ||
self, x: np.ndarray, y: np.ndarray, method_parameter_list: np.ndarray | ||
): | ||
return -np.sum((y - self._forward(x, method_parameter_list)) ** 2) | ||
|
||
def loss( | ||
self, | ||
method_parameter_list: np.ndarray, | ||
x: np.ndarray, | ||
y: np.ndarray, | ||
use_priors: bool = False, | ||
): | ||
loss_temp = -self._log_likelihood(x, y, method_parameter_list) | ||
if use_priors: | ||
loss_temp = loss_temp - self._log_prior(method_parameter_list) | ||
return loss_temp | ||
|
||
@property | ||
def profile_data(self): | ||
"""1D array typically projection data""" | ||
return self._profile_data | ||
|
||
@profile_data.setter | ||
def profile_data(self, profile_data): | ||
if not isinstance(profile_data, np.ndarray): | ||
raise TypeError("Input must be ndarray") | ||
self._profile_data = profile_data | ||
|
||
self.find_init_values() | ||
self.find_priors() | ||
self.fitted_params_dict = {} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
import numpy as np | ||
from scipy.stats import norm, gamma | ||
from lcls_tools.common.data_analysis.fit.method_base import ( | ||
MethodBase, | ||
ModelParameters, | ||
Parameter, | ||
) | ||
|
||
|
||
gaussian_parameters = ModelParameters( | ||
name="Gaussian Parameters", | ||
parameters={ | ||
"mean": Parameter(bounds=[0.01, 1.0]), | ||
"sigma": Parameter(bounds=[0.01, 5.0]), | ||
"amplitude": Parameter(bounds=[0.01, 1.0]), | ||
"offset": Parameter(bounds=[0.01, 1.0]), | ||
}, | ||
) | ||
|
||
|
||
class GaussianModel(MethodBase): | ||
""" | ||
GaussianModel Class that finds initial parameter values for gaussian | ||
distribution and builds probability density functions for the | ||
likelyhood a parameter to be that value based on those initial | ||
parameter values. Passing this class the variable profile_data | ||
automatically updates the initial values and and probability | ||
density functions to match that data. | ||
""" | ||
|
||
parameters = gaussian_parameters | ||
|
||
def find_init_values(self) -> dict: | ||
"""Fit data without optimization, return values.""" | ||
|
||
data = self._profile_data | ||
x = np.linspace(0, 1, len(data)) | ||
offset = data.min() + 0.01 | ||
amplitude = data.max() - offset | ||
|
||
weighted_mean = np.average(x, weights=data) | ||
weighted_sigma = np.sqrt(np.cov(x, aweights=data)) | ||
|
||
init_values = { | ||
"mean": weighted_mean, | ||
"sigma": weighted_sigma, | ||
"amplitude": amplitude, | ||
"offset": offset, | ||
} | ||
self.parameters.initial_values = init_values | ||
return init_values | ||
|
||
def find_priors(self) -> dict: | ||
eloiseyang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
Do initial guesses based on data and make distribution from that guess. | ||
""" | ||
# TODO: profile data setter method cals this and find values. | ||
# should remove call to find values in that method. | ||
# but since priors aren't supposed to be included (i think?) | ||
# in this PR I will not update this. | ||
init_values = self.find_init_values() | ||
amplitude_mean = init_values["amplitude"] | ||
amplitude_var = 0.05 | ||
amplitude_alpha = (amplitude_mean**2) / amplitude_var | ||
amplitude_beta = amplitude_mean / amplitude_var | ||
amplitude_prior = gamma(amplitude_alpha, loc=0, | ||
scale=1 / amplitude_beta) | ||
|
||
# Creating a normal distribution of points around the inital mean. | ||
mean_prior = norm(init_values["mean"], 0.1) | ||
sigma_alpha = 2.5 | ||
sigma_beta = 5.0 | ||
sigma_prior = gamma(sigma_alpha, loc=0, scale=1 / sigma_beta) | ||
|
||
# Creating a normal distribution of points around initial offset. | ||
offset_prior = norm(init_values["offset"], 0.5) | ||
priors = { | ||
"mean": mean_prior, | ||
"sigma": sigma_prior, | ||
"amplitude": amplitude_prior, | ||
"offset": offset_prior, | ||
} | ||
|
||
self.parameters.priors = priors | ||
return priors | ||
|
||
def _forward(self, x: np.array, method_parameter_list: np.array): | ||
# Load distribution parameters | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the function doing the fit right? can we rename it to something with 'fit' in the name? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe a #TODO comment to revisit would be enough for now. |
||
# needs to be array for scipy.minimize | ||
mean = method_parameter_list[0] | ||
sigma = method_parameter_list[1] | ||
amplitude = method_parameter_list[2] | ||
offset = method_parameter_list[3] | ||
return (np.sqrt(2 * np.pi) * amplitude | ||
* norm.pdf((x - mean) / sigma) + offset) | ||
|
||
def _log_prior(self, method_parameter_list: np.ndarray) -> float: | ||
return np.sum( | ||
[ | ||
prior.logpdf(method_parameter_list[i]) | ||
for i, prior in enumerate(self.parameters.priors) | ||
] | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we comment the two cases here for calc beamsize, i.e. you do the first if the intensity is low and the else if intensity is high enough? and how was the default threshold picked?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will comment both on both cases. The default was selected by the ML group, I just used theirs so I am sure how the decided on it.