Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fitting Base Class and Gaussian Model from PR 143 #172

Merged
merged 19 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
37567e7
Added MethodBase as an abstract class for all fit methods
phys-cgarnier Jun 12, 2024
fda0b90
method_base.py flake8
eloiseyang Jun 26, 2024
318397a
Merge pull request #1 from slaclab/fit
phys-cgarnier Jul 10, 2024
640010c
changed code base to be more pydantic this cleans up some things invo…
phys-cgarnier Jul 11, 2024
2a47d30
updated fit code to incorporate pydantic structures for method parame…
phys-cgarnier Jul 12, 2024
8a38fed
Merge commit 'b6949b679df756059f6edd29b83d8dd165d6d4bf' into fit
eloiseyang Jul 12, 2024
06444bf
Merge branch 'fit' into fit_pydantic
eloiseyang Jul 12, 2024
a34fbf9
made changes as requested in the comment (minus removing _forward)
phys-cgarnier Jul 15, 2024
5d4a149
Merge pull request #3 from phys-cgarnier/main
eloiseyang Jul 15, 2024
387fae4
Merge pull request #2 from slaclab/fit_pydantic
eloiseyang Jul 15, 2024
eb32f59
flake8
eloiseyang Jul 15, 2024
59bf2ae
committing GaussianFIt with property that returns beamsize
phys-cgarnier Jul 24, 2024
8b23286
checked that GaussianFit works (roughly) and set processor and fit to…
phys-cgarnier Jul 24, 2024
8dbafca
Merge pull request #4 from phys-cgarnier/main
phys-cgarnier Jul 29, 2024
b70e68c
committing linted gaussian_fit.py
phys-cgarnier Jul 31, 2024
cb1cec9
removed empty test for projection
phys-cgarnier Jul 31, 2024
dfe72bc
Merge branch 'phys-cgarnier:main' into fit
phys-cgarnier Jul 31, 2024
ca54c90
more flake8
phys-cgarnier Jul 31, 2024
6090968
Merge branch 'phys-cgarnier:main' into fit
phys-cgarnier Jul 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 126 additions & 0 deletions lcls_tools/common/data_analysis/fit/gaussian_fit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import numpy as np
from pydantic import BaseModel, ConfigDict, PositiveFloat
from typing import Optional
from lcls_tools.common.data_analysis.fit.projection import ProjectionFit
from lcls_tools.common.image.processing import ImageProcessor


class GaussianFit(BaseModel):
# should rename to BeamsizeEvaluator or something ?
"""
Gaussian Fitting class that takes in a fitting tool (with method),
ImageProcessor, and Image and returns the beamsize and position.
"""

model_config = ConfigDict(arbitrary_types_allowed=True)
processor: ImageProcessor = ImageProcessor()
fit: ProjectionFit = ProjectionFit()
min_log_intensity: float = 4.0
bounding_box_half_width: PositiveFloat = 3.0
apply_bounding_box_constraint: bool = True
image: Optional[np.ndarray] = None

@property
def beamsize(self):
beamspot_chars = self.gaussian_fit(self.image)
beamsize = self.calculate_beamsize(beamspot_chars)
return beamsize

def gaussian_fit(self, image):
self.processor.auto_process(image)
x_proj, y_proj = self.get_projections(image)
x_parameters = self.fit.fit_projection(x_proj)
y_parameters = self.fit.fit_projection(y_proj)

return {
"centroid": np.array((x_parameters['mean'],
y_parameters['mean'])),
"rms_sizes": np.array((x_parameters['sigma'],
y_parameters['sigma'])),
"total_intensity": image.sum(),
}

def get_projections(self, image):
x_projection = np.array(np.sum(image, axis=0))
y_projection = np.array(np.sum(image, axis=1))
return x_projection, y_projection

def calculate_beamsize(self, beamspot_chars: dict):
'''
Conditional beamsize calculation:
if condition1 (total_intensity is to small): then return NULL result.
else: do beamsize calculation and check that the beamspot is
within the ROI
'''
if (np.log10(beamspot_chars['total_intensity'])
< self.min_log_intensity):
# TODO: character count is really not liking this one line
print(("log10 image intensity"
+ f"{np.log10(beamspot_chars['total_intensity'])}"
+ "below threshold"))
result = {
"Cx": np.NaN,
"Cy": np.NaN,
"Sx": np.NaN,
"Sy": np.NaN,
"bb_penalty": np.NaN,
"total_intensity": beamspot_chars['total_intensity'],
}
return result
else:
centroid = beamspot_chars['centroid']
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we comment the two cases here for calc beamsize, i.e. you do the first if the intensity is low and the else if intensity is high enough? and how was the default threshold picked?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will comment both on both cases. The default was selected by the ML group, I just used theirs so I am sure how the decided on it.

sizes = beamspot_chars['rms_sizes']

if np.all(~np.isnan(np.stack((centroid, sizes)))):
# get beam region bounding box
n_stds = self.bounding_box_half_width

bounding_box_corner_pts = np.array(
(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we be more descriptive than pts?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will rename it

centroid - n_stds * sizes,
centroid + n_stds * sizes,
centroid - n_stds * sizes * np.array((-1, 1)),
centroid + n_stds * sizes * np.array((-1, 1)),
)
)

if self.processor.roi is not None:
roi_radius = self.processor.roi.radius
else:
roi_radius = np.min(np.array(self.image.shape) / 1.5)
# TODO: maybe need to change this ^^^

# This is a check whether or not the beamspot is within
# the bounding box.
temp = bounding_box_corner_pts - np.array((roi_radius,
roi_radius))
distances = np.linalg.norm(temp, axis=1)
bounding_box_penalty = np.max(distances) - roi_radius

result = {
"Cx": centroid[0],
"Cy": centroid[1],
"Sx": sizes[0],
"Sy": sizes[1],
"bb_penalty": bounding_box_penalty,
"total_intensity": beamspot_chars["total_intensity"]
}

# set results to none if the beam extends beyond the roi
# and the bounding box constraint is active
if (bounding_box_penalty > 0
and self.apply_bounding_box_constraint):
for name in ["Cx", "Cy", "Sx", "Sy"]:
result[name] = np.NaN

else:
result = {
"Cx": np.NaN,
"Cy": np.NaN,
"Sx": np.NaN,
"Sy": np.NaN,
"bb_penalty": np.NaN,
"total_intensity": beamspot_chars["total_intensity"]
}

return result
148 changes: 148 additions & 0 deletions lcls_tools/common/data_analysis/fit/method_base.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file in general still has a lot of the ML terminology and not enough comments to be descriptive. Maybe it's ok to leave it in for now, but comments should be added to priors, forward, likelyhood, and loss functions to be clear on what those terms are.

Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
from abc import ABC, abstractmethod
import numpy as np

from pydantic import BaseModel, ConfigDict
from scipy.stats import rv_continuous


class Parameter(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
bounds: list
initial_value: float = None
prior: rv_continuous = None


class ModelParameters(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
name: str
parameters: dict[str, Parameter]

@property
def bounds(self):
return np.vstack(
[
np.array(parameter.bounds)
for parameter in self.parameters.values()
]
)

@property
def initial_values(self):
return np.array(
[parameter.initial_value
for parameter in self.parameters.values()]
)

@initial_values.setter
def initial_values(self, initial_values: dict[str, float]):
for parameter, initial_value in initial_values.items():
self.parameters[parameter].initial_value = initial_value

@property
def priors(self):
return np.array(
[self.parameters[parameter].prior for parameter in self.parameters]
)

@priors.setter
def priors(self, priors: dict[str, float]):
for parameter, prior in priors.items():
self.parameters[parameter].prior = prior


# TODO: define properties

class MethodBase(ABC):
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider the name FitMethodBase here? To be more descriptive?
The comment is also clear, so maybe that's enough.

Base abstract class for all fit methods, which serves as the bare minimum
skeleton code needed. Should be used only as a parent class to all method
models.
---------------------------
Arguments:
param_names: list (list all of param names that the model will contain)
param_guesses: np.ndarray (array that contains a guess as to what
each param value is organized in the same order as param_names)
param_bounds: np.ndarray (array that contains the lower
and upper bound on for acceptable values of each parameter)
"""

parameters: ModelParameters = None

@abstractmethod
def find_init_values(self) -> list:
...

@abstractmethod
def find_priors(self, data: np.ndarray) -> dict:
...

def forward(
self, x: np.ndarray, method_parameter_dict: dict[str, float]
) -> np.ndarray:
method_parameter_list = np.array(
[
method_parameter_dict[parameter_name]
for parameter_name in self.parameters.parameters
]
)
return self._forward(x, method_parameter_list)

@staticmethod
@abstractmethod
def _forward(x: np.ndarray, params: np.ndarray) -> np.ndarray:
...

def log_prior(self, method_parameter_dict: dict[str, rv_continuous]):
method_parameter_list = np.array(
[
method_parameter_dict[parameter_name]
for parameter_name in self.parameters.parameters
]
)
return self._log_prior(method_parameter_list)

@abstractmethod
def _log_prior(self, params: np.ndarray):
...

def log_likelihood(self, x: np.ndarray, y: np.ndarray,
method_parameter_dict: dict):
method_parameter_list = np.array(
[
method_parameter_dict[parameter_name]
for parameter_name in self.parameters.parameters
]
)
return self._log_likelihood(x, y, method_parameter_list)

def _log_likelihood(
self, x: np.ndarray, y: np.ndarray, method_parameter_list: np.ndarray
):
return -np.sum((y - self._forward(x, method_parameter_list)) ** 2)

def loss(
self,
method_parameter_list: np.ndarray,
x: np.ndarray,
y: np.ndarray,
use_priors: bool = False,
):
loss_temp = -self._log_likelihood(x, y, method_parameter_list)
if use_priors:
loss_temp = loss_temp - self._log_prior(method_parameter_list)
return loss_temp

@property
def profile_data(self):
"""1D array typically projection data"""
return self._profile_data

@profile_data.setter
def profile_data(self, profile_data):
if not isinstance(profile_data, np.ndarray):
raise TypeError("Input must be ndarray")
self._profile_data = profile_data

self.find_init_values()
self.find_priors()
self.fitted_params_dict = {}
103 changes: 103 additions & 0 deletions lcls_tools/common/data_analysis/fit/methods.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import numpy as np
from scipy.stats import norm, gamma
from lcls_tools.common.data_analysis.fit.method_base import (
MethodBase,
ModelParameters,
Parameter,
)


gaussian_parameters = ModelParameters(
name="Gaussian Parameters",
parameters={
"mean": Parameter(bounds=[0.01, 1.0]),
"sigma": Parameter(bounds=[0.01, 5.0]),
"amplitude": Parameter(bounds=[0.01, 1.0]),
"offset": Parameter(bounds=[0.01, 1.0]),
},
)


class GaussianModel(MethodBase):
"""
GaussianModel Class that finds initial parameter values for gaussian
distribution and builds probability density functions for the
likelyhood a parameter to be that value based on those initial
parameter values. Passing this class the variable profile_data
automatically updates the initial values and and probability
density functions to match that data.
"""

parameters = gaussian_parameters

def find_init_values(self) -> dict:
"""Fit data without optimization, return values."""

data = self._profile_data
x = np.linspace(0, 1, len(data))
offset = data.min() + 0.01
amplitude = data.max() - offset

weighted_mean = np.average(x, weights=data)
weighted_sigma = np.sqrt(np.cov(x, aweights=data))

init_values = {
"mean": weighted_mean,
"sigma": weighted_sigma,
"amplitude": amplitude,
"offset": offset,
}
self.parameters.initial_values = init_values
return init_values

def find_priors(self) -> dict:
eloiseyang marked this conversation as resolved.
Show resolved Hide resolved
"""
Do initial guesses based on data and make distribution from that guess.
"""
# TODO: profile data setter method cals this and find values.
# should remove call to find values in that method.
# but since priors aren't supposed to be included (i think?)
# in this PR I will not update this.
init_values = self.find_init_values()
amplitude_mean = init_values["amplitude"]
amplitude_var = 0.05
amplitude_alpha = (amplitude_mean**2) / amplitude_var
amplitude_beta = amplitude_mean / amplitude_var
amplitude_prior = gamma(amplitude_alpha, loc=0,
scale=1 / amplitude_beta)

# Creating a normal distribution of points around the inital mean.
mean_prior = norm(init_values["mean"], 0.1)
sigma_alpha = 2.5
sigma_beta = 5.0
sigma_prior = gamma(sigma_alpha, loc=0, scale=1 / sigma_beta)

# Creating a normal distribution of points around initial offset.
offset_prior = norm(init_values["offset"], 0.5)
priors = {
"mean": mean_prior,
"sigma": sigma_prior,
"amplitude": amplitude_prior,
"offset": offset_prior,
}

self.parameters.priors = priors
return priors

def _forward(self, x: np.array, method_parameter_list: np.array):
# Load distribution parameters
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the function doing the fit right? can we rename it to something with 'fit' in the name?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe a #TODO comment to revisit would be enough for now.

# needs to be array for scipy.minimize
mean = method_parameter_list[0]
sigma = method_parameter_list[1]
amplitude = method_parameter_list[2]
offset = method_parameter_list[3]
return (np.sqrt(2 * np.pi) * amplitude
* norm.pdf((x - mean) / sigma) + offset)

def _log_prior(self, method_parameter_list: np.ndarray) -> float:
return np.sum(
[
prior.logpdf(method_parameter_list[i])
for i, prior in enumerate(self.parameters.priors)
]
)
Loading
Loading