Source code for pancake.opds

'''
All code in this module is from Jarron Leisenring and the
pyNRC / WebbPSF_ext packages, repoduced with permission.

The primary reason for direct duplication is to avoid complicating
the installation of PanCAKE.

See following links for potentially more up to date implementations:

https://github.com/JarronL/pynrc/blob/develop/pynrc/opds.py
https://github.com/JarronL/webbpsf_ext/blob/main/webbpsf_ext/opds.py
'''

# Import libraries
import numpy as np
import matplotlib
import matplotlib.pyplot as plt

import os

import scipy
import scipy.stats
from scipy.stats import arcsine
from scipy.interpolate import interp1d

from astropy.io import fits
import astropy.units as u

import webbpsf
from webbpsf.opds import OTE_Linear_Model_WSS

# Default OPD info
opd_default = ('JWST_OTE_OPD_cycle1_example_2022-07-30.fits', 0)

# The following won't work on readthedocs compilation
on_rtd = os.environ.get('READTHEDOCS') == 'True'
if not on_rtd:
    # .fits or .fits.gz?
    opd_dir = webbpsf.utils.get_webbpsf_data_path()
    opd_file = opd_default[0]
    opd_fullpath = os.path.join(opd_dir, opd_file)
    if not os.path.exists(opd_fullpath):
        opd_file_alt = opd_file + '.gz'
        opd_path_alt = os.path.join(opd_dir, opd_file_alt)
        if not os.path.exists(opd_path_alt):
            err_msg = f'Cannot find either {opd_file} or {opd_file_alt} in {opd_dir}'
            raise OSError(err_msg)
        else:
            opd_default = (opd_file_alt, 0)

        #import errno
        #raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), opd_file)


[docs]def OPDFile_to_HDUList(file, slice_to_use=0): """ Make a picklable HDUList for ingesting into multiproccessor WebbPSF helper function. """ try: hdul = fits.open(file) except FileNotFoundError: opd_dir = webbpsf.utils.get_webbpsf_data_path() hdul = fits.open(os.path.join(opd_dir, file)) ndim = len(hdul[0].data.shape) if ndim==3: opd_im = hdul[0].data[slice_to_use,:,:] else: opd_im = hdul[0].data hdu_new = fits.PrimaryHDU(opd_im) hdu_new.header = hdul[0].header.copy() opd_hdul = fits.HDUList([hdu_new]) hdul.close() return opd_hdul
[docs]class OTE_WFE_Drift_Model(OTE_Linear_Model_WSS): """ OPD subclass for calculating OPD drift values over time. """ def __init__(self, **kwargs): """ Parameters ---------- opdfile : str or fits.HDUList FITS file to load an OPD from. The OPD must be specified in microns. opd_index : int, optional FITS extension to load OPD from transmission : str or None FITS file for pupil mask, with throughput from 0-1. If not explicitly provided, will be inferred from wherever is nonzero in the OPD file. slice : int, optional Slice of a datacube to load OPD from, if the selected extension contains a datacube. segment_mask_file : str FITS file for pupil mask, with throughput from 0-1. If not explicitly provided, will use JWpupil_segments.fits zero : bool Set an OPD to precisely zero everywhere. rm_ptt : bool Remove piston, tip, and tilt? This is mostly for visualizing the higher order parts of the LOM. Default: False. """ # Initialize OTE_Linear_Model_WSS OTE_Linear_Model_WSS.__init__(self, **kwargs) # Initialize delta OPD normalized images self.dopd_thermal = None self.dopd_frill = None self.dopd_iec = None # Initialize normalized delta OPD images self._calc_delta_opds()
[docs] def reset(self, verbose=True): """ Reset an OPD to the state it was loaded from disk. i.e. undo all segment moves. """ self._frill_wfe_amplitude = 0 self._iec_wfe_amplitude = 0 self.opd = self._opd_original.copy() self.segment_state *= 0
def _calc_delta_opds(self, thermal=True, frill=True, iec=True): """ Calculate delta OPDs for the three components and save to class properties. Each delta OPD image will be normalized such that the nm RMS WFE is equal to 1. """ # Set everything to initial state self.reset(verbose=False) # Calculate thermal dOPD if thermal: self.thermal_slew(1*u.day) # self.opd has now been updated to drifted OPD # Calculate delta OPD and save into self.opd attribute # This is because self.rms() uses the image in self.opd self.opd -= self._opd_original # scale by RMS of delta OPD, and save self.dopd_thermal = self.opd / self.rms() # Calculate frill dOPD if frill: # Explicitly set thermal component to 0 self.thermal_slew(0*u.min, scaling=0, delay_update=True) self.apply_frill_drift(amplitude=1) # self.opd has now been updated to drifted OPD # Temporarily calculate delta and calc rms self.opd -= self._opd_original # scale by RMS of delta OPD, and save self.dopd_frill = self.opd / self.rms() # Calculate IEC dOPD if iec: # Explicitly set thermal and frill components to 0 self.thermal_slew(0*u.min, scaling=0, delay_update=True) self.apply_frill_drift(amplitude=0, delay_update=True) self.apply_iec_drift(amplitude=1) # self.opd has now been updated to drifted OPD # Temporarily calculate delta and calc rms self.opd -= self._opd_original # scale by RMS of delta OPD, and save self.dopd_iec = self.opd / self.rms() # Back to initial state self.reset(verbose=False)
[docs] def calc_rms(self, arr, segname=None): """Calculate RMS of input images""" # RMS for a single image def rms_im(im): """ Find RMS of an image by excluding pixels with 0s, NaNs, or Infs""" ind = (im != 0) & (np.isfinite(im)) res = 0 if len(im[ind]) == 0 else np.sqrt(np.mean(im[ind] ** 2)) res = 0 if np.isnan(res) else res return res # Reshape into a 3-dimension cube for consistency if len(arr.shape) == 3: nz,ny,nx = arr.shape else: ny,nx = arr.shape nz = 1 arr = arr.reshape([nz,ny,nx]) if segname is None: # RMS of whole aperture rms = np.array([rms_im(im) for im in arr]) else: # RMS of specified segment assert (segname in self.segnames) iseg = np.where(self.segnames == segname)[0][0] + 1 # segment index from 1 - 18 seg_mask = self._segment_masks == iseg arr_seg = arr[:,seg_mask] rms = np.array([rms_im(im) for im in arr_seg]) # If single image, remove first dimension if nz==1: rms = rms[0] return rms
[docs] def slew_scaling(self, start_angle, end_angle): """ WFE scaling due to slew angle Scale the WSS Hexike components based on slew pitch angles. Parameters ---------- start_angle : float The starting sun pitch angle, in degrees between -5 and +45 end_angle : float The ending sun pitch angle, in degrees between -5 and +45 """ num = np.sin(np.radians(end_angle)) - np.sin(np.radians(start_angle)) den = np.sin(np.radians(45.)) - np.sin(np.radians(-5.)) return num / den
[docs] def gen_frill_drift(self, delta_time, start_angle=-5, end_angle=45, case='BOL'): """ Frill WFE drift scaling Function to determine the factor to scale the delta OPD associated with frill tensioning. Returns the RMS WFE (nm) depending on time and slew angles. Parameters ---------- delta_time : astropy.units quantity object The time since a slew occurred. start_angle : float The starting sun pitch angle, in degrees between -5 and +45 end_angle : float The ending sun pitch angle, in degrees between -5 and +45 case : string either "BOL" for current best estimate at beginning of life, or "EOL" for more conservative prediction at end of life. The amplitude of the frill drift is roughly 2x lower for BOL (8.6 nm after 2 days) versus EOL (18.4 nm after 2 days). """ frill_hours = np.array([ 0.00, 0.55, 1.00, 1.60, 2.23, 2.85, 3.47, 4.09, 4.71, 5.33, 5.94, 6.56, 7.78, 9.00, 9.60, 11.41, 12.92, 15.02, 18.00, 21.57, 23.94, 26.90, 32.22, 35.76, 41.07, 45.20, 50.50, 100.58 ]) # Normalized frill drift amplitude frill_wfe_drift_norm = np.array([ 0.000, 0.069, 0.120, 0.176, 0.232, 0.277, 0.320, 0.362, 0.404, 0.444, 0.480, 0.514, 0.570, 0.623, 0.648, 0.709, 0.758, 0.807, 0.862, 0.906, 0.930, 0.948, 0.972, 0.981, 0.991, 0.995, 0.998, 1.000 ]) # Create interpolation function finterp = interp1d(frill_hours, frill_wfe_drift_norm, kind='cubic', fill_value=(0, 1), bounds_error=False) # Convert input time to hours and get normalized amplitude time_hour = delta_time.to(u.hour).value amp_norm = finterp(time_hour) # Scale height from either EOL or BOL (nm RMS) # Assuming slew angles from -5 to +45 deg if case=='EOL': wfe_drift_rms = 18.4 * amp_norm elif case=='BOL': wfe_drift_rms = 8.6 * amp_norm else: print(f'case={case} is not recognized') # Get scale factor based on start and end angle solar elongation angles scaling = self.slew_scaling(start_angle, end_angle) wfe_drift_rms *= scaling return wfe_drift_rms
[docs] def gen_thermal_drift(self, delta_time, start_angle=-5, end_angle=45, case='BOL'): """ Thermal WFE drift scaling Function to determine the factor to scale the delta OPD associated with OTE backplane thermal distortion. Returns the RMS WFE (nm) depending on time and slew angles. Parameters ---------- delta_time : astropy.units quantity object The time since a slew occurred. start_angle : float The starting sun pitch angle, in degrees between -5 and +45 end_angle : float The ending sun pitch angle, in degrees between -5 and +45 case : string either "BOL" for current best estimate at beginning of life, or "EOL" for more conservative prediction at end of life. The amplitude of the frill drift is roughly 3x lower for BOL (13 nm after 14 days) versus EOL (43 nm after 14 days). """ # Convert time array to minutes and get values # if isinstance(delta_time, (u.Quantity)): # time_arr_minutes = np.array(delta_time.to(u.hour).value) # else: # time_arr_minutes = delta_time thermal_hours = np.array([ 0., 1., 2., 3., 4., 5., 6., 7., 8., 9., 10., 11., 12., 13., 14., 15., 16., 17., 18., 19., 20., 21., 22., 23., 24., 48., 72., 96., 120., 144., 168., 192., 216., 240., 264., 288., 312., 336., 360., 384., 408., 432., 456., 480., 800. ]) thermal_wfe_drift_norm = np.array([ 0.0000, 0.0134, 0.0259, 0.0375, 0.0484, 0.0587, 0.0685, 0.0777, 0.0865, 0.0950, 0.1031, 0.1109, 0.1185, 0.1259, 0.1330, 0.1400, 0.1468, 0.1534, 0.1600, 0.1664, 0.1727, 0.1789, 0.1850, 0.1910, 0.1970, 0.3243, 0.4315, 0.5227, 0.5999, 0.6650, 0.7197, 0.7655, 0.8038, 0.8358, 0.8625, 0.8849, 0.9035, 0.9191, 0.9322, 0.9431, 0.9522, 0.9598, 0.9662, 0.9716, 1.0000 ]) # Create interpolation function finterp = interp1d(thermal_hours, thermal_wfe_drift_norm, kind='cubic', fill_value=(0, 1), bounds_error=False) # Convert input time to hours and get normalized amplitude time_hour = delta_time.to(u.hour).value amp_norm = finterp(time_hour) # Normalize to 14 days (336 hours) amp_norm /= finterp(336) # Scale height from either EOL or BOL (nm RMS) # Assuming full slew angle from -5 to +45 deg if case=='EOL': wfe_drift_rms = 45.0 * amp_norm elif case=='BOL': wfe_drift_rms = 13.0 * amp_norm else: print(f'case={case} is not recognized') # Get scale factor based on start and end angle solar elongation angles scaling = self.slew_scaling(start_angle, end_angle) wfe_drift_rms *= scaling return wfe_drift_rms
[docs] def gen_iec_series(self, delta_time, amplitude=3.5, period=5.0, interp_kind='linear', random_seed=None): """Create a series of IEC WFE scale factors Create a series of random IEC heater state changes based on arcsine distribution. Parameters ---------- delta_time : astropy.units quantity object array Time series of atropy units to interpolate IEC amplitudes Keyword Args ------------ amplitude : float Full amplitude of arcsine distribution. Values will range from -0.5*amplitude to +0.5*amplitude. period : float Period in minutes of IEC oscillations. Usually 3-5 minutes. random_seed : int Provide a random seed value between 0 and (2**32)-1 to generate reproducible random values. interp_kind : str or int Specifies the kind of interpolation as a string ('linear', 'nearest', 'zero', 'slinear', 'quadratic', 'cubic', 'previous', 'next', where 'zero', 'slinear', 'quadratic' and 'cubic' refer to a spline interpolation of zeroth, first, second or third order; 'previous' and 'next' simply return the previous or next value of the point) or as an integer specifying the order of the spline interpolator to use. Default is 'linear'. """ # Convert time array to minutes and get values if isinstance(delta_time, (u.Quantity)): time_arr_minutes = np.array(delta_time.to(u.min).value) else: time_arr_minutes = delta_time # Create a series of random IEC heater state changes based on arcsin distribution dt = period nsamp = int(np.max(time_arr_minutes)/dt) + 2 tvals = np.arange(nsamp) * dt # Random values between 0 and 1 arcsine_rand = arcsine.rvs(size=nsamp, random_state=random_seed) # Scale by amplitude wfe_iec_all = arcsine_rand * amplitude - amplitude / 2 # res = np.interp(time_arr_minutes, tvals, wfe_iec_all) finterp = interp1d(tvals, wfe_iec_all, kind=interp_kind, fill_value=0, bounds_error=False) res = finterp(time_arr_minutes) return res
[docs] def gen_delta_opds(self, delta_time, start_angle=-5, end_angle=45, do_thermal=True, do_frill=True, do_iec=True, case='BOL', return_wfe_amps=True, return_dopd_fin=True, **kwargs): """Create series of delta OPDs Generate a series of delta OPDS, the result of which is a combination of thermal, frill, and IEC effects. The thermal and frill values are dependent on time, start/end slew angles, and case ('BOL' or 'EOL'). Delta OPD contributions from the IEC heater switching are treated as random state switches assuming an arcsine distribution. Parameters ---------- delta_time : astropy.units quantity object An array of times assuming astropy units. start_angle : float The starting sun pitch angle, in degrees between -5 and +45. end_angle : float The ending sun pitch angle, in degrees between -5 and +45. case : string Either "BOL" for current best estimate at beginning of life, or "EOL" for more conservative prediction at end of life. do_thermal : bool Include thermal slew component? Mostly for debugging purposes. do_frill : bool Include frill component? Mostly for debugging purposes. do_iec : bool Include IEC component? Good to exclude if calling this function repeatedly for evolution of multiple slews, then add IEC later. return_wfe_amps : bool Return a dictionary that provides the RMS WFE (nm) of each component at each time step. return_dopd_fin : bool Option to exclude calculating final delta OPD in case we only want the final RMS WFE dictionary. """ try: nz = len(delta_time) except TypeError: nz = 1 ny,nx = self.opd.shape # Thermal drift amplitudes if do_thermal: amp_thermal = self.gen_thermal_drift(delta_time, case=case, start_angle=start_angle, end_angle=end_angle) else: amp_thermal = np.zeros(nz) if nz>1 else 0 # Frill drift amplitudes if do_frill: amp_frill = self.gen_frill_drift(delta_time, case=case, start_angle=start_angle, end_angle=end_angle) else: amp_frill = np.zeros(nz) if nz>1 else 0 # Random IEC amplitudes if do_iec: amp_iec = self.gen_iec_series(delta_time, **kwargs) if nz>1: amp_iec[0] = 0 else: amp_iec = np.zeros(nz) if nz>1 else 0 # Add OPD deltas delta_opd_fin = np.zeros([nz,ny,nx]) if do_thermal: amp = np.reshape(amp_thermal, [-1,1,1]) delta_opd_fin += self.dopd_thermal.reshape([1,ny,nx]) * amp if do_frill: amp = np.reshape(amp_frill, [-1,1,1]) delta_opd_fin += self.dopd_frill.reshape([1,ny,nx]) * amp if do_iec: amp = np.reshape(amp_iec, [-1,1,1]) delta_opd_fin += self.dopd_iec.reshape([1,ny,nx]) * amp if nz==1: delta_opd_fin = delta_opd_fin[0] # Get final RMS in nm rms_tot = np.array(self.calc_rms(delta_opd_fin)) * 1e9 wfe_amps = { 'thermal': amp_thermal, 'frill' : amp_frill, 'iec' : amp_iec, 'total' : rms_tot } if return_wfe_amps and return_dopd_fin: return delta_opd_fin, wfe_amps elif return_wfe_amps: return wfe_amps elif return_dopd_fin: return delta_opd_fin
[docs] def evolve_dopd(self, delta_time, slew_angles, case='BOL', return_wfe_amps=True, return_dopd_fin=True, do_thermal=True, do_frill=True, do_iec=True, **kwargs): """ Evolve the delta OPD with multiple slews Input an array of `delta_time` and `slew_angles` to return the evolution of a delta_OPD image. Option to return the various WFE components, including OTE backplane (thermal), frill tensioning, and IEC heater switching. Parameters ---------- delta_time : astropy.units quantity object An array of times assuming astropy units. slew_angles : ndarray The sun pitch angles, in degrees between -5 and +45. case : string Either "BOL" for current best estimate at beginning of life, or "EOL" for more conservative prediction at end of life. do_thermal : bool Include thermal slew component? Mostly for debugging purposes. do_frill : bool Include frill component? Mostly for debugging purposes. do_iec : bool Include IEC component? Good to exclude if calling this function repeatedly for evolution of multiple slews, then add IEC later. return_wfe_amps : bool Return a dictionary that provides the RMS WFE (nm) of each component at each time step. return_dopd_fin : bool Option to exclude calculating final delta OPD in case we only want the final RMS WFE dictionary. Keyword Args ------------ amplitude : float Full amplitude of IEC arcsine distribution. Values will range from -0.5*amplitude to +0.5*amplitude. period : float Period in minutes of IEC oscillations. Usually 3-5 minutes. """ # Indices where slews occur islew = np.where(slew_angles[1:] - slew_angles[:-1] != 0)[0] + 1 islew = np.concatenate(([0], islew)) # Build delta OPDs for each slew angle kwargs['case'] = case kwargs['return_wfe_amps'] = return_wfe_amps kwargs['return_dopd_fin'] = True kwargs['do_thermal'] = do_thermal kwargs['do_frill'] = do_frill kwargs['do_iec'] = False for i in islew: ang1 = slew_angles[0] if i==0 else ang2 ang2 = slew_angles[i] tvals = delta_time[i:] tvals = tvals - tvals[0] res = self.gen_delta_opds(tvals, start_angle=ang1, end_angle=ang2, **kwargs) if return_wfe_amps: dopds, wfe_dict = res else: dopds = res # Accumulate delta OPD images if i==0: dopds_fin = dopds + 0.0 else: dopds_fin[i:] += dopds # Add in drift amplitudes for thermal and frill components if return_wfe_amps: if i==0: wfe_dict_fin = wfe_dict else: for k in wfe_dict.keys(): wfe_dict_fin[k][i:] += wfe_dict[k] del dopds # Get IEC values if do_iec: kwargs['do_thermal'] = False kwargs['do_frill'] = False kwargs['do_iec'] = True res = self.gen_delta_opds(delta_time, **kwargs) if return_wfe_amps: dopds, wfe_dict = res wfe_dict_fin['iec'] = wfe_dict['iec'] else: dopds = res # Add IEC OPDs dopds_fin += dopds del dopds # Calculate RMS values on final delta OPDs if return_wfe_amps: wfe_dict_fin['total'] = self.calc_rms(dopds_fin)*1e9 if return_wfe_amps and return_dopd_fin: return dopds_fin, wfe_dict_fin elif return_dopd_fin: return dopds_fin elif return_wfe_amps: return wfe_dict_fin
[docs] def interp_dopds(self, delta_time, dopds, dt_new, wfe_dict=None, interp_kind='linear', **kwargs): """ Interpolate an array of delta OPDs Perform a linear interpolation on a series of delta OPDS. Parameters ---------- delta_time : astropy.units quantity object An array of times assuming astropy units corresponding to each `dopd`. dopds : ndarray Array of delta OPD images associated with `delta_time`. dt_new : astropy.units quantity object New array to interpolate onto. Keyword Args ------------ wfe_dict : dict or None If specified, then must provide a dictionary where the values for each keywords are the WFE drift components associated with each `delta_time`. Will then return a dictionary interp_kind : str or int Specifies the kind of interpolation as a string ('linear', 'nearest', 'zero', 'slinear', 'quadratic', 'cubic', 'previous', 'next', where 'zero', 'slinear', 'quadratic' and 'cubic' refer to a spline interpolation of zeroth, first, second or third order; 'previous' and 'next' simply return the previous or next value of the point) or as an integer specifying the order of the spline interpolator to use. Default is 'linear'. """ dt_new_vals = dt_new.to('hour') # Create interpolation function dt_vals = delta_time.to('hour') func = interp1d(dt_vals, dopds, axis=0, kind=interp_kind, bounds_error=True) opds_new = func(dt_new_vals) if wfe_dict is not None: wfe_dict_new = {} for k in wfe_dict.keys(): vals = wfe_dict[k] func = interp1d(dt_vals, vals, kind=interp_kind, bounds_error=True) wfe_dict_new[k] = func(dt_new_vals) return opds_new, wfe_dict_new else: return opds_new
[docs] def slew_pos_averages(self, delta_time, slew_angles, opds=None, wfe_dict=None, mn_func=np.mean, interpolate=False, **kwargs): """ Get averages at each slew position Given a series of times and slew angles, calculate the average OPD and WFE RMS error within each slew angle position. Returns a tuple with new arrays of (dt_new, opds_new, wfe_dict_new). If input both `opds` and `wfe_dict` are not specified, then we call the `evolve_dopd` function and return . Parameters ---------- delta_time : astropy.units quantity object An array of times assuming astropy units. slew_angles : ndarray The sun pitch angles at each `delta_time`, in degrees between -5 and +45. opds : ndarray or None Cube of OPD images (or delta OPDs) associated with each `delta_time`. If set to None, then a new set of OPDs are not calculated. wfe_dict : dict or None If specified, then must provide a dictionary where the values for each keywords are the WFE drift components associated with each `delta_time`. New set of WFE dictionary is not calculated if set to None. mn_func : function Function to use for taking averages. Default: np.mean() interpolate : bool Instead of taking average, use the interpolation function `self.interp_dopds()`. Keyword Args ------------ case : string Either "BOL" for current best estimate at beginning of life, or "EOL" for more conservative prediction at end of life. do_thermal : bool Include thermal slew component? Mostly for debugging purposes. do_frill : bool Include frill component? Mostly for debugging purposes. do_iec : bool Include IEC component? Good to exclude if calling this function repeatedly for evolution of multiple slews, then add IEC later. amplitude : float Full amplitude of IEC arcsine distribution. Values will range from -0.5*amplitude to +0.5*amplitude. period : float Period in minutes of IEC oscillations. Usually 3-5 minutes. kind : str or int Specifies the kind of interpolation (if specified) as a string. Default: 'linear'. """ if (opds is None) and (wfe_dict is None): kwargs['return_wfe_amps'] = True kwargs['return_dopd_fin'] = True opds, wfe_dict = self.evolve_dopd(delta_time, slew_angles, **kwargs) # Indices where slews occur islew = np.where(slew_angles[1:] - slew_angles[:-1] != 0)[0] + 1 # Start and stop indices for each slew position i1_arr = np.concatenate(([0], islew)) i2_arr = np.concatenate((islew, [len(slew_angles)])) # Get average time at each position dt_new = np.array([mn_func(delta_time[i1:i2].value) for i1, i2 in zip(i1_arr, i2_arr)]) dt_new = dt_new * delta_time.unit if interpolate: res = self.interp_dopds(delta_time, opds, dt_new, wfe_dict=wfe_dict, **kwargs) if wfe_dict is None: opds_new = res wfe_dict_new = None else: opds_new, wfe_dict_new = res return dt_new, opds_new, wfe_dict_new # Averages of OPD at each position if opds is not None: opds_new = np.array([mn_func(opds[i1:i2], axis=0) for i1, i2 in zip(i1_arr, i2_arr)]) else: opds_new = None # Get average of each WFE drift component if wfe_dict is not None: wfe_dict_new = {} for k in wfe_dict.keys(): wfe_dict_new[k] = np.array([mn_func(wfe_dict[k][i1:i2]) for i1, i2 in zip(i1_arr, i2_arr)]) if opds_new is not None: wfe_dict_new['total'] = self.calc_rms(opds_new)*1e9 else: wfe_dict = None return dt_new, opds_new, wfe_dict_new
[docs] def opds_as_hdul(self, delta_time, slew_angles, delta_opds=None, wfe_dict=None, case=None, add_main_opd=True, slew_averages=False, return_ind=None, **kwargs): """Convert series of delta OPDS to HDUList""" if delta_opds is None: case = 'BOL' if case is None else case kwargs['case'] = case kwargs['return_wfe_amps'] = True kwargs['return_dopd_fin'] = True delta_opds, wfe_dict = self.evolve_dopd(delta_time, slew_angles, **kwargs) if slew_averages: res = self.slew_pos_averages(delta_time, slew_angles, opds=delta_opds, wfe_dict=wfe_dict, **kwargs) delta_time, delta_opds, wfe_dict = res # Indices where slews occur islew = np.where(slew_angles[1:] - slew_angles[:-1] != 0)[0] + 1 islew = np.concatenate(([0], islew)) slew_angles = slew_angles[islew] nz, ny, nx = delta_opds.shape # Indices where slews occur islew = np.where(slew_angles[1:] - slew_angles[:-1] != 0)[0] + 1 islew = np.concatenate(([0], islew)) hdul = fits.HDUList() for i in range(nz): if len(islew) == 1: #There is no change in slew angle ang1 = ang2 = slew_angles[0] elif i<islew[1]: ang1 = ang2 = slew_angles[i] else: if i in islew: ang1 = slew_angles[i-1] ang2 = slew_angles[i] # Skip if only returning a single OPD if (return_ind is not None) and (i != return_ind): continue # Update header dt = delta_time[i].to(u.day).to_string() opd_im = self._opd_original + delta_opds[i] if add_main_opd else delta_opds[i] hdu = fits.ImageHDU(data=opd_im, header=self.opd_header, name=f'OPD{i}') hdr = hdu.header hdr['BUNIT'] = 'meter' hdr['DELTA_T'] = (dt, "Delta time after initial slew [d]") hdr['STARTANG'] = (ang1, "Starting sun pitch angle [deg]") hdr['ENDANG'] = (ang2, "Ending sun pitch angle [deg]") hdr['THRMCASE'] = (case, "Thermal model case, beginning or end of life") #if add_main_opd: #hdr['OPDSLICE'] = (self.opd_index, 'OPD slice index') hdr['WFE_RMS'] = (self.calc_rms(hdu.data)*1e9, "RMS WFE [nm]") # Include the WFE RMS inputs from each component if wfe_dict is not None: for k in wfe_dict.keys(): hdr[k] = (wfe_dict[k][i], f"{k} RMS delta WFE [nm]") hdul.append(hdu) return hdul
[docs]def plot_im(im, fig, ax, vlim=None, add_cbar=True, return_ax=False, extent=None): """ Plot single image on some axes """ if vlim is None: vlim = np.max(np.abs(im)) img = ax.imshow(im, cmap='RdBu_r', vmin=-vlim, vmax=vlim, extent=extent) # Add colorbar if add_cbar: cbar = fig.colorbar(img, ax=ax) cbar.set_label('Amplitude [nm]') if return_ax and add_cbar: return ax, cbar elif return_ax: return ax
[docs]def plot_opd(hdul, index=1, opd0=None, vlim1=None, vlim2=None): """ Plot OPDs images (full and delta) """ def calc_rms_nm(im): ind = (im != 0) & (np.isfinite(im)) rms = np.sqrt((im[ind] ** 2).mean()) * 1e9 return rms m_to_nm = 1e9 # Define OPD to compare delta OPD image opd0 = hdul[0].data if opd0 is None else opd0 # Header and data for current image header = hdul[index].header opd = hdul[index].data opd_diff = (opd - opd0) rms_opd = calc_rms_nm(opd) rms_diff = calc_rms_nm(opd_diff) # Time since slew delta_time = header['DELTA_T'] try: pupilscale = header['PUPLSCAL'] s = opd.shape extent = [a * pupilscale for a in [-s[0] / 2, s[0] / 2, -s[1] / 2, s[1] / 2]] except KeyError: extent = None # Create figure fig, axes = plt.subplots(1,2, figsize=(12,5)) ax = axes[0] vlim = 3*rms_opd if vlim1 is None else vlim1 plot_im(opd * m_to_nm, fig, ax, vlim=vlim, extent=extent) data_val, data_units = str.split(delta_time) data_val = np.float(data_val) if 'h' in data_units: dt = data_val * u.hr elif 'm' in data_units: dt = data_val * u.min elif 'd' in data_units: dt = data_val * u.day # Convert to hours dt = dt.to('hr') ax.set_title("Delta Time = {:.1f} (RMS = {:.2f} nm)".format(dt, rms_opd)) ax = axes[1] vlim = 3*rms_diff if vlim2 is None else vlim2 plot_im(opd_diff * m_to_nm, fig, ax, vlim=vlim, extent=extent) ax.set_title("Delta OPD = {:.2f} nm RMS".format(rms_diff)) fig.tight_layout() plt.draw()