diff --git a/hera_cal/frf.py b/hera_cal/frf.py index 7a1e65ac5..ea44598ce 100644 --- a/hera_cal/frf.py +++ b/hera_cal/frf.py @@ -4,6 +4,7 @@ import numpy as np from hera_filters import dspec +from hera_cal.noise import predict_noise_variance_from_autos from . import utils from scipy.interpolate import interp1d @@ -1998,3 +1999,233 @@ def time_average_argparser(): ap.add_argument("--equalize_interleave_ntimes", action="store_true", default=False, help=desc) return ap + +def get_frop_for_noise(times, filter_cent_use, filter_half_wid_use, freqs, + t_avg=300., cutoff=1e-9, weights=None, coherent_avg=True, + wgt_tavg_by_nsample=True, nsamples=None, rephase=True, + bl_vec=None, lat=-30.721526120689507, dlst=None): + """ + Get FRF + coherent time average operator for the purposes of noise + covariance calculation. Composes time averaging and the main lobe FRF into + a single operation. + + Parameters: + times (array): + (Interleaved) Julian Dates on hd, converted to seconds. + filter_cent_use (float): + Filter center for main lobe filter, in Hz. + filter_half_wid_use (float): + Filter half width for main lobe filter, in Hz. + freqs (array): + Frequencies in the data (in Hz). + t_avg (float): + Desired coherent averaging length, in seconds. + cutoff (float): + Eigenvalue cutoff for DPSS modes. + weights (array): + Array of weights to use for main lobe filter. + None (default) uses uniform weights. + coherent_avg (bool): + Whether to coherently average on the t_avg cadence or not. + wgt_tavg_by_nsample (bool): + Whether to weight the time average by nsamples. + Otherwise do uniform weighting. + nsamples (array): + Array proportional to how many nights contributed to each sample. + rephase (bool): + Whether to rephase before averaging. + bl_vec (array): + Baseline vector in ENU + lat (float): + Array latitude in degrees North. + dlst (float or array_like): + Amount of LST to rephase by, in radians. If float, rephases all + times by the same amount. + + Returns: + frop (array): + Filter operator. Shape (Ntimes_coarse, Ntimes_fine, Nfreqs.) + """ + + # Time handling is a slightly modified port from hera_filters/hera_cal + + Ntimes = len(times) + Nfreqs = len(freqs) + + dmatr, _ = dspec.dpss_operator(times, np.array([filter_cent_use, ]), + np.array([filter_half_wid_use, ]), + eigenval_cutoff=np.array([cutoff, ])) + Nmodes = dmatr.shape[-1] + + if weights is None: + weights = np.ones([Ntimes, Nfreqs]) + + #####Index Legend##### + # a = DPSS mode # + # f = frequency # + # t = fine time # + # T = coarse time # + #####Index Legend##### + + ddagW = dmatr.T.conj()[:, np.newaxis] * weights.T # aft + ddagWd = ddagW @ dmatr # afa + lsq = np.linalg.solve(ddagWd.swapaxes(0,1), ddagW.swapaxes(0,1)) # fat + + if coherent_avg: + dtime = np.median(np.abs(np.diff(times))) + chunk_size = int(np.round((t_avg / dtime))) + Nchunk = int(np.ceil(Ntimes / chunk_size)) + chunk_remainder = Ntimes % chunk_size + + tavg_weights = nsamples if wgt_tavg_by_nsample else np.where(weights, np.ones([Ntimes, Nfreqs]), 0) + if chunk_remainder > 0: # Stack some 0s that get 0 weight so we can do the reshaping below without incident + + dmatr_stack_shape = [chunk_size - chunk_remainder, Nmodes] + weights_stack_shape = [chunk_size - chunk_remainder, Nfreqs] + dmatr = np.vstack((dmatr, np.zeros(dmatr_stack_shape, dtype=complex))) + tavg_weights = np.vstack((tavg_weights, np.zeros(weights_stack_shape, dtype=complex))) + dlst = np.append(dlst, np.zeros(chunk_size - chunk_remainder, dtype=float)) + + dres = dmatr.reshape(Nchunk, chunk_size, Nmodes) + wres = tavg_weights.reshape(Nchunk, chunk_size, Nfreqs) + wnorm = wres.sum(axis=1)[:, np.newaxis] + # normalize for an average + wres = np.where(wnorm > 0, wres / wnorm, 0) + + # Apply the rephase to the weights matrix since it's mathematically equivalent and convenient + if rephase: + wres = lst_rephase(wres.reshape(Nchunk * chunk_size, 1, Nfreqs), + bl_vec, freqs, dlst, lat=lat, inplace=False) + wres = wres.reshape(Nchunk, chunk_size, Nfreqs) + + # does "Ttf,Tta->Tfa" much faster than einsum and fancy indexing + dchunk = np.zeros([Nchunk, Nfreqs, Nmodes], dtype=complex) + for coarse_time_ind in range(Nchunk): + dchunk[coarse_time_ind] = np.tensordot(wres[coarse_time_ind].T, + dres[coarse_time_ind], + axes=1) + + # does "Tfa,fat->Ttf" faster than einsum and fancy indexing + frop = np.zeros([Nchunk, Ntimes, Nfreqs], dtype=complex) + for freq_ind in range(Nfreqs): + frop[:, :, freq_ind] = np.tensordot(dchunk[:, freq_ind], + lsq[freq_ind], + axes=1) + else: + # ta,fat->ttf + frop = np.tensordot(dmatr, lsq.transpose(1, 2, 0), axes=1) + + return frop + +def prep_var_for_frop(data, nsamples, weights, cross_antpairpol, freq_slice, + auto_ant=None, default_value=0.): + """ + Wrapper around hera_cal.noise.predict_noise_variance_from_autos that preps + the noise variance calculation for FRF + coherent average. + + Parameters: + data: DataContainer + Must contain the cross_antpairpol in question as well as some + corresponding autos. + nsamples: DataContainer + Contains the nsamples associated with the cross_antpairpol in question. + weights: DataContainer + Contains the weights associated with the cross_antpairpol in question. + cross_antpairpol: tuple + Tuple whose first two entries are antennas and last entry is a + polarization. + freq_slice: slice + A slice into the frequency axis of the data that all gets filtered + identically (a PS analysis band). + auto_ant: int + If not None, should be a single integer specifying a single antenna's + autos (this is used because the redundantly averaged data have a single auto file for all antennas). + default_value: (float) + The default variance to use in locations with 0 nsamples to avoid + nans. + + Returns: + var: array + Noise variance (Ntimes, Nfreqs) + """ + var = predict_noise_variance_from_autos(cross_antpairpol, data, + nsamples=nsamples, auto_ant=auto_ant) + var = var[:, freq_slice] + + # Check all the infs are weighted to zero and replace with default value + all_infs_zero = np.all(weights[cross_antpairpol][:, freq_slice][np.isinf(var)]) == 0 + if not all_infs_zero: + print(f"Not all infinite variance locations are of zero weight!") + + var[np.isinf(var)] = default_value + + return var + +def get_FRF_cov(frop, var): + """ + Get noise covariance from noise variance and given FRF + coherent average operator. + + Parameters: + frop: array + A linear operator that performs the FRF and coherent averaging + operations in one step. + var: array + Noise variance for a single antpairpol. + + Returns: + cov: array + The desired covariance. Shape (Nfreqs, Ntimes, Ntimes) + """ + Nfreqs = frop.shape[2] + Ncoarse = frop.shape[0] + cov = np.zeros([Nfreqs, Ncoarse, Ncoarse], dtype=complex) + for freq_ind in range(Nfreqs): + cov[freq_ind] = np.tensordot((frop[:, :, freq_ind] * var[:, freq_ind]), frop[:, :, freq_ind].T.conj(), axes=1) + + return cov + +def get_corr(cov): + """ + Get correlation matrices from a sequence of covariance matrices. + + Parameters: + cov: array + The covariance matrices. + Returns: + corr: array + The corresponding correlation matrices. + """ + Ntimes = cov.shape[1] + diags = cov[:, np.arange(Ntimes), np.arange(Ntimes)] + corr = cov / np.sqrt(diags[:, None] * diags[:, :, None]) + + return corr + +def get_correction_factor_from_cov(cov, tslc=None): + """ + Get a correction factor for PS noise variance prediction in the absence + of propagating the noise covariances all the way to delay space. + + Parameters: + cov: array + The time-time covariance matrix at every frequency + tslc: slice + A time slice to use in case some times should be excluded from the + calculation of this factor. + + Returns: + correction_factor: array + The correction factor. + """ + corr = get_corr(cov) + + if tslc is None: + Ncotimes = corr.shape[1] + Neff = Ncotimes**2 / np.sum(np.abs(corr)**2, axis=(1, 2)) + else: + Ncotimes = tslc.stop - tslc.start + Neff = Ncotimes**2 / np.sum(np.abs(corr[:, tslc, tslc])**2, axis=(1, 2)) + correction_factor = np.mean(Ncotimes / Neff) # Average over frequency since they are independent. + + return correction_factor + diff --git a/hera_cal/noise.py b/hera_cal/noise.py index f73db8752..69ba8259a 100644 --- a/hera_cal/noise.py +++ b/hera_cal/noise.py @@ -68,7 +68,8 @@ def infer_dt(times_by_bl, bl, default_dt=None): raise ValueError('Cannot infer dt when all len(times_by_bl) == 1 or fewer.') -def predict_noise_variance_from_autos(bl, data, dt=None, df=None, nsamples=None): +def predict_noise_variance_from_autos(bl, data, dt=None, df=None, nsamples=None, + auto_ant=None): '''Predict the noise variance on a baseline using autocorrelation data using the formla sigma^2 = Vii * Vjj / Delta t / Delta nu. @@ -81,6 +82,11 @@ def predict_noise_variance_from_autos(bl, data, dt=None, df=None, nsamples=None) from the frequencies stored in the DataContainer nsamples: DataContainer mapping bl tuples to numpy arrays of the number integrations for that given baseline. Must include nsamples[bl]. + auto_ant: int + For some cases, like redundant averaging, the auto corresponding + to the individual antennas is not available. In this case, the + user should use this keyword to specify a single auto antenna from + which to derive this statistic. Returns: Noise variance predicted on baseline bl in units of data squared. @@ -92,7 +98,11 @@ def predict_noise_variance_from_autos(bl, data, dt=None, df=None, nsamples=None) df = np.median(np.ediff1d(data.freqs)) ap1, ap2 = split_pol(bl[2]) - auto_bl1, auto_bl2 = (bl[0], bl[0], join_pol(ap1, ap1)), (bl[1], bl[1], join_pol(ap2, ap2)) + if auto_ant is None: + auto_bl1, auto_bl2 = (bl[0], bl[0], join_pol(ap1, ap1)), (bl[1], bl[1], join_pol(ap2, ap2)) + else: + auto_bl1, auto_bl2 = (auto_ant, auto_ant, join_pol(ap1, ap1)), (auto_ant, auto_ant, join_pol(ap2, ap2)) + var = np.abs(data[auto_bl1] * data[auto_bl2] / dt / df) if nsamples is not None: return var / nsamples[bl]