Source code for pancake.sequence

from __future__ import absolute_import, print_function

import os
import json
import numpy as np
from pandeia.engine.calc_utils import build_default_calc
from copy import deepcopy
from collections import OrderedDict

from .utilities import optimise_readout, compute_magnitude, equatorial_to_ecliptic
from .engine import calculate_target, get_options
from .scene import Scene, create_SGD
from .opds import OPDFile_to_HDUList, OTE_WFE_Drift_Model
from .io import determine_instrument, determine_aperture, determine_subarray, determine_aperture, determine_pixel_scale, sequence_input_checks, determine_exposure_time, determine_detector

from import fits
import astropy.units as u
from scipy.interpolate import interp1d

import webbpsf

[docs]class Sequence(): ''' Sequence class used to construct an observational sequence from invidual specified observations of user defined scenes. ''' def __init__(self, **kwargs): self.observation_sequence = []
[docs] def add_observation(self, scene, exposures, mode='coronagraphy', nircam_mask='default', nircam_subarray='default', miri_subarray='default', telescope='jwst', optimise_margin=0.05, optimize_margin=None, max_sat=0.95, rolls=None, nircam_sgd=None, miri_sgd=None, scale_exposures=None, verbose=True, min_groups=4, min_ints=1): ''' Add observation does the heavy lifting, with a wide variety of input options. Parameters ---------- scene : pancake.Scene() object The scene we want to observe exposures: list of tuples The exposures we want to perform, possible formats are - [(FILTER, PATTERN, NGROUP, NINT),...] - [(FILTER, 'optimise', t_exp),...] with t_exp in seconds mode : str Observing mode, 'coronagraphy' is working, 'imaging' a future development. nircam_mask : str The NIRCam coronagraphic mask to use, 'default' we use round masks nircam_subarray : str NIRCam subarray to use, 'default' will use the one assigned to the chosen mask miri_subaray : str MIRI subarray to use, 'default' will use the one assigned to the chosen mask telescope : str Telescope to observe with, only 'jwst' is possible at this time. optimise_margin : float Fraction of the input t_exp that we can deviate from the overall t_exp for a given exposure optimize_margin : str / None For the Americans max_sat : float Maximum fraction of saturation to reach when optimising rolls : list of floats PA angles to observe scene at nircam_sgd : str / None NIRCam small grid dither pattern to use, if any miri_sgd : str / None MIRI small grid dither pattern to use, if any scale_exposures : pancake.Scene() Scene to scale the provided t_exp in order to reach ~the same number of photons verbose : bool Boolean toggle for terminal printing. ''' #First copy scene so that the user provided scene isn't modified scene = deepcopy(scene) #If there is only one exposure, convert it to a list. if isinstance(exposures, tuple): exposures = [exposures] #Check if max roll angle requested. if rolls == 'max': # Use an initial roll of 0 (i.e. what the input as the planet PA) and then a second roll of 14 degrees rolls = [0, 14] #Perform remaining checks on input parameters sequence_input_checks(exposures, mode, nircam_mask, nircam_subarray, miri_subarray, telescope, rolls, nircam_sgd, miri_sgd) #Loop over each exposure to build independent observation dictionaries. temp_obs_dict_array = [] for exposure in exposures: #Extract filter from exposure filt = exposure[0].lower() #Identify the instrument, aperture, and subarray from the filter and mode instrument = determine_instrument(filt) aperture = determine_aperture(filt, nircam_mask, mode) subarray = determine_subarray(filt, mode, nircam_subarray, miri_subarray) detector = determine_detector(filt) # Construct observation configuration dictionary for Pandeia input obs_dict = build_default_calc(telescope, instrument, mode) obs_dict['scene'] = scene.pandeia_scene obs_dict['scene_name'] = scene.scene_name obs_dict['configuration']['detector']['subarray'] = subarray.lower() obs_dict['configuration']['instrument']['aperture'] = aperture.lower() obs_dict['configuration']['instrument']['filter'] = filt.lower() obs_dict['configuration']['instrument']['detector'] = detector.lower() #Extract readout information from exposure settings if optimize_margin != None: optimise_margin = optimize_margin #Check for US spelling pattern, groups, integrations = self._extract_readout(scene, exposure, subarray, obs_dict, optimise_margin, scale_exposures, max_sat, verbose=verbose, min_groups=min_groups, min_ints=min_ints) obs_dict['configuration']['detector']['readout_pattern'] = pattern.lower() obs_dict['configuration']['detector']['ngroup'] = groups obs_dict['configuration']['detector']['nint'] = integrations #Check if dithers are meant to be performed. if instrument == 'nircam' and nircam_sgd != None: obs_dict['dither_strategy'] = nircam_sgd elif instrument == 'miri' and miri_sgd != None: obs_dict['dither_strategy'] = miri_sgd else: obs_dict['dither_strategy'] = 'SINGLE-POINT' temp_obs_dict_array.append(obs_dict) #Check if any rolls were requested if rolls == None: #There are no rolls, just add the obs_dicts to the sequence as they are. for obs_dict in temp_obs_dict_array: obs_dict['scene_rollang'] = 0 self.observation_sequence.append(obs_dict) else: #We need to add each of the roll exposures, making sure that things are grouped by the coronagraph used. prev_inst_index = 0 for i, obs_dict in enumerate(temp_obs_dict_array): if i == 0: if len(temp_obs_dict_array) > 1: #First observation, just append with first roll temp_scene = deepcopy(scene) temp_scene.rotate_scene(rolls[0]) obs_dict['scene'] = temp_scene.pandeia_scene obs_dict['scene_rollang'] = rolls[0] self.observation_sequence.append(deepcopy(obs_dict)) #Deepcopy otherwise roll will adjust all obs_dicts else: #There is only a single filter in the obs dict array, no coronagraph changes. for roll in rolls: temp_scene = deepcopy(scene) temp_scene.rotate_scene(roll) obs_dict['scene'] = temp_scene.pandeia_scene obs_dict['scene_rollang'] = roll self.observation_sequence.append(deepcopy(obs_dict)) else: #Up to but excluding the last observation. prev_aperture = temp_obs_dict_array[i-1]['configuration']['instrument']['aperture'] curr_aperture = obs_dict['configuration']['instrument']['aperture'] if prev_aperture == curr_aperture: #Coronagraphs are the same, don't want to roll yet. temp_scene = deepcopy(scene) temp_scene.rotate_scene(rolls[0]) obs_dict['scene'] = temp_scene.pandeia_scene obs_dict['scene_rollang'] = rolls[0] self.observation_sequence.append(deepcopy(obs_dict)) else: #Coronagraphs must be different, need to append the other rolls now before the switch for roll in rolls[1:]: for temp_obs_dict in temp_obs_dict_array[prev_inst_index:i]: temp_scene = deepcopy(scene) temp_scene.rotate_scene(roll) temp_obs_dict['scene'] = temp_scene.pandeia_scene temp_obs_dict['scene_rollang'] = roll self.observation_sequence.append(deepcopy(temp_obs_dict)) #Set an index for the first observation with the next instrument. prev_inst_index = i #Now append the new coronagraphs first roll temp_scene = deepcopy(scene) temp_scene.rotate_scene(roll) obs_dict['scene'] = temp_scene.pandeia_scene obs_dict['scene_rollang'] = rolls[0] self.observation_sequence.append(deepcopy(obs_dict)) #Check if this is the last observation if i == (len(temp_obs_dict_array)-1): #Append all the rolls for the current working coronagraph for roll in rolls[1:]: for temp_obs_dict in temp_obs_dict_array[prev_inst_index:]: temp_scene = deepcopy(scene) temp_scene.rotate_scene(roll) temp_obs_dict['scene'] = temp_scene.pandeia_scene temp_obs_dict['scene_rollang'] = roll self.observation_sequence.append(deepcopy(temp_obs_dict))
# #We need to add each of the roll exposures, making sure that things are grouped by the coronagraph used. # prev_inst_index = 0 # for i, obs_dict in enumerate(temp_obs_dict_array): # if i == 0: # if len(temp_obs_dict_array) > 1: # #First observation, just append with first roll # obs_dict['strategy']['scene_rotation'] = rolls[0] # self.observation_sequence.append(deepcopy(obs_dict)) #Deepcopy otherwise roll will adjust all obs_dicts # else: # #There is only a single filter in the obs dict array, no coronagraph changes. # for roll in rolls: # obs_dict['strategy']['scene_rotation'] = roll # self.observation_sequence.append(deepcopy(obs_dict)) # else: # #Up to but excluding the last observation. # prev_aperture = temp_obs_dict_array[i-1]['configuration']['instrument']['aperture'] # curr_aperture = obs_dict['configuration']['instrument']['aperture'] # if prev_aperture == curr_aperture: # #Coronagraphs are the same, don't want to roll yet. # obs_dict['strategy']['scene_rotation'] = rolls[0] # self.observation_sequence.append(deepcopy(obs_dict)) # else: # #Coronagraphs must be different, need to append the other rolls now before the switch # for roll in rolls[1:]: # for temp_obs_dict in temp_obs_dict_array[prev_inst_index:i]: # temp_obs_dict['strategy']['scene_rotation'] = roll # self.observation_sequence.append(deepcopy(temp_obs_dict)) # #Set an index for the first observation with the next instrument. # prev_inst_index = i # #Now append the new coronagraphs first roll # obs_dict['strategy']['scene_rotation'] = rolls[0] # self.observation_sequence.append(deepcopy(obs_dict)) # #Check if this is the last observation # if i == (len(temp_obs_dict_array)-1): # #Append all the rolls for the current working coronagraph # for roll in rolls[1:]: # for temp_obs_dict in temp_obs_dict_array[prev_inst_index:]: # temp_obs_dict['strategy']['scene_rotation'] = roll # self.observation_sequence.append(deepcopy(temp_obs_dict))
[docs] def run(self, ta_error='saved', wavefront_evolution=True, on_the_fly_PSFs=False, wave_sampling=3, save_file=False, resume=False, verbose=True, cache='none', cache_path='default' ,offaxis_nircam=[1,1], offaxis_miri=[1,1], debug_verbose=False, initial_wavefront_realisation=4, wavefront_pa_range='median', background='default', roll_ta_error='default'): ''' Perform all simulations for the exposures defined within this sequence. Parameters ---------- ta_error : str / float Target acquisition errors - 'saved' to use a saved random seed of offsets - 'random' for a random error - int or float for random error with set amplitude in mas - 'none' for no error wavefront_evolution : bool Whether to evolve the wavefront error throughout the observation, requires RA and Dec for at least one source within the scenes being observed. on_the_fly_PSFs : bool Whehter to calculate PSF's on the fly using WebbPSF (more accurate) or use the precomputed library (faster). wave_sampling : int If on_the_fly_PSFs == True, sets the number of wavelengths to be sampled in each filter save_file : str File name to save simulations to resume : bool Toggle to resume an interrupted run under the same save_file path verbose : bool Toggle for printing statments to terminal cache : str Type of cache to use when generating PSFs, options are - 'none' for no caching - 'ram' for LRU ram caching - 'disk' for hard drive caching cache_path : str Directory path to save / load cached PSFs under 'disk' caching offaxis_nircam : list x, y coordinates in arcseconds for NIRCam offaxis PSF calculations offaxis_miri : list x, y coordinates in arcseconds for MIRI offaxis PSF calculations debug_verbose : bool Toggle printing of debug statements, toggled off by default as they flood the terminal and do no impact anything. initial_wavefront_realisation : int, 1-10 10 possible wavefronts to use to start a simulation, doesn't greatly impact results wavefront_pa_range : str What range of PA's to aim for when calculating movement of telescope between scenes - options are, 'median', 'minimum', 'maximum' Returns ------- hdulist : FITS HDUList() All simulations as a FITS HDUlist dataset ''' #PanCAKE adjustable options options = get_options() options.verbose = debug_verbose options.wave_sampling = wave_sampling options.on_the_fly_PSFs = on_the_fly_PSFs options.cache = cache if cache != 'none': if cache_path == 'default': cache_path = os.getcwd() + '/PSF_CACHE/' if not os.path.exists(cache_path): os.makedirs(cache_path) else: cache_path = None options.cache_path = cache_path #Create the HDUList for saving our results to. primary_header = fits.Header() hdu_array = [fits.PrimaryHDU(header=primary_header)] hdulist = fits.HDUList(hdu_array) #Set the index of the first observation to simulate. start_index = 0 # If we are saving the results to a file, set this up. if save_file != False: if resume != True: #If we pass the checks, overwrite any existing file with the same name. #Check provided directory exists and is the right format before running all the simulations save_dir = '/'.join(save_file.split('/')[0:-1]) if not os.path.exists(save_dir): os.makedirs(save_dir) elif save_file.split('.')[-1] != 'fits': raise IOError('Provided save file "{}" not of "*.fits" format.'.format(save_file)) hdulist.writeto(save_file, overwrite=True) else: #Need to carry on from where a saved simulation last ended. Figure out where that is... with as save_hdul: if len(save_hdul)-1 == len(self.observation_sequence): #All simulations already completed. print('WARNING: All observation simulations have been completed, nothing to resume.') return else: print('Resuming previous observation sequence...') #Change the start index to the correct value. start_index = len(save_hdul)-1 #Calculate OPD's throughout the observation if requested and on_the_fly_PSFs are being used. if wavefront_evolution == True and on_the_fly_PSFs == True and len(self.observation_sequence) > 1: if verbose: print('Computing OPD Maps...') opd_res = self._calculate_opds(opd_realisation=initial_wavefront_realisation, pa_range=wavefront_pa_range) if opd_res == None: wavefront_evolution = False else: nircam_opds, miri_opds = opd_res else: #Don't use an OPD drift if we aren't using on_the_fly_PSFs or only a single observation. wavefront_evolution = False #Next, we loop over all the observations within the sequence and simulate each one individually. if verbose: print('Running Simulations...') observation_counter = 0 unocculted_images = [] for i_raw, base_obs_dict in enumerate(self.observation_sequence[start_index:]): i = i_raw + start_index scene_name = base_obs_dict['scene_name'] filt = base_obs_dict['configuration']['instrument']['filter'] instrument = base_obs_dict['configuration']['instrument']['instrument'] if instrument == 'miri': offaxis_x, offaxis_y = offaxis_miri elif instrument == 'nircam': offaxis_x, offaxis_y = offaxis_nircam if verbose: print('--> Observation {}/{} // {}_{}'.format(i+1, len(self.observation_sequence), scene_name, filt.upper())) ##### First let's calculate an unocculted image to compute the contrast curve with offaxis_dict = deepcopy(base_obs_dict) for j in range(len(offaxis_dict['scene'])): #Offset all the sources offaxis_dict['scene'][j]['position']['x_offset'] += offaxis_x offaxis_dict['scene'][j]['position']['y_offset'] += offaxis_y # Ignore any saturation, allowing counts above full well, and turn off on_the_fly_PSFs for speed. options.set_saturation(False) options.on_the_fly_PSFs = False # Ensure the scene isn't rotated to make extracting the PSF center easier offaxis_dict['strategy']['scene_rotation'] = 0 if background != 'default': offaxis_dict['background_level'] = background else: if filt in ['f1065c', 'f1140c', 'f1550c']: offaxis_dict['background_level'] = 'none' else: pass # Calculate off-axis image. offaxis_result = calculate_target(offaxis_dict) # Return settings to original values options.set_saturation(True) options.on_the_fly_PSFs = on_the_fly_PSFs ##### Now we will compute the actual observations # Assemble small grid dither array. Even without any SGD this will happen, it will just use a single dither point. # This is also where the random target acquisition error is added. sgds = create_SGD(ta_error=ta_error, pattern_name=base_obs_dict['dither_strategy'], sim_num=i, instrument=instrument) for j, sgd in enumerate(sgds): if len(sgds) > 1: #Print the small grid dither step we are on if verbose: print('----> Small Grid Dither {}/{}'.format(j+1,len(sgds))) #Begin by making a new copy of the obs_dict so that shifts in source offsets only happen once. obs_dict = deepcopy(base_obs_dict) #Offset all the sources in the scene by the necessary dither + target acquisition amounts. xoff, yoff = sgd for k in range(len(obs_dict['scene'])): obs_dict['scene'][k]['position']['x_offset'] += xoff obs_dict['scene'][k]['position']['y_offset'] += yoff if wavefront_evolution == True: #Use the calculated OPD for this specific observation. if instrument == 'nircam': options.on_the_fly_webbpsf_opd = nircam_opds[observation_counter] elif instrument == 'miri': options.on_the_fly_webbpsf_opd = miri_opds[observation_counter] # Set background level if background != 'default': # Options are "benchmark", "low", 'medium', 'high' obs_dict['background_level'] = background else: if filt in ['f1065c', 'f1140c', 'f1550c']: obs_dict['background_level'] = 'none' else: pass ######## RUN OBSERVATION ######## data = calculate_target(obs_dict) ################################# if j == 0: #We are on the first simulation, so lets make the right size array now. first_detector_image = data['2d']['detector'] # Shape is SGDs + 1 as we also need to include the unocculted image from earlier. detector_images = np.empty([len(sgds)+1, first_detector_image.shape[0], first_detector_image.shape[1]]) detector_images[0] = first_detector_image else: #We are on a simulation due to dithering, add it to the already defined array detector_images[j] = data['2d']['detector'] observation_counter += 1 ##### Append the unocculted image to the array detector_images[-1] = offaxis_result['2d']['detector'] #We need to create a HDU for this observation, first make the header image_header = fits.Header() image_header['EXTNAME'] = str(i+1)+ ':' + base_obs_dict['scene_name'] + '_' + base_obs_dict['configuration']['instrument']['aperture'].upper() + '_' + base_obs_dict['configuration']['instrument']['filter'].upper() image_header['TARGET'] = base_obs_dict['scene_name'] image_header['INSTRMNT'] = base_obs_dict['configuration']['instrument']['instrument'] image_header['FILTER'] = base_obs_dict['configuration']['instrument']['filter'] image_header['APERTURE'] = base_obs_dict['configuration']['instrument']['aperture'] image_header['SUBARRAY'] = base_obs_dict['configuration']['detector']['subarray'] image_header['PATTERN'] = base_obs_dict['configuration']['detector']['readout_pattern'] image_header['NGROUP'] = base_obs_dict['configuration']['detector']['ngroup'] image_header['NINT'] = base_obs_dict['configuration']['detector']['nint'] image_header['TEXP'] = (determine_exposure_time(image_header['SUBARRAY'], image_header['PATTERN'], image_header['NGROUP'], image_header['NINT']), 'seconds') image_header['PIXSCALE'] = (determine_pixel_scale(image_header['FILTER']), 'arcsec/pixel') #image_header['ROLLANG'] = (base_obs_dict['strategy']['scene_rotation'], 'degrees') image_header['ROLLANG'] = (base_obs_dict['scene_rollang'], 'degrees') image_header['DITHER'] = base_obs_dict['dither_strategy'] image_header['NSOURCES'] = len(base_obs_dict['scene']) for source in base_obs_dict['scene']: for j, sgd in enumerate(sgds): image_header['SOURCE{}'.format(source['id'])] = source['pancake_parameters']['name'] image_header['S{}XOFF{}'.format(source['id'],j+1)] = source['position']['x_offset'] + sgd[0] image_header['S{}YOFF{}'.format(source['id'],j+1)] = source['position']['y_offset'] + sgd[1] image_header['S{}OFFAX'.format(source['id'])] = source['position']['x_offset'] + offaxis_x image_header['S{}OFFAY'.format(source['id'])] = source['position']['y_offset'] + offaxis_y image_header['S{}VGAMG'.format(source['id'])] = compute_magnitude(source['spectrum']['sed']['spectrum'][0], source['spectrum']['sed']['spectrum'][1], base_obs_dict['configuration']['instrument']['filter']) image_header.add_blank('', before='SOURCE{}'.format(source['id'])) image_header['PCOTFPSF'] = on_the_fly_PSFs image_header['PCWAVSAM'] = (wave_sampling, 'Only valid if PCOTFPSF==True') image_header['PCOPDDRF'] = wavefront_evolution image_header.add_blank('', before='PCOTFPSF') # Append the images and header information to the initial HDUList. image_data = detector_images hdulist.append(fits.ImageHDU(image_data, header=image_header)) if save_file != False: # Then we also want to append the data to our saved file, doing it this way means if there is a crash # halfway through then things are still saved. fits.append(save_file, image_data, image_header) return hdulist
def _relative_exposure_time(self, scene, filt, master_scene, master_exposure_time=3600): ''' #Figure out how much longer (or shorter) an observation should be with reference to the relative flux of the brightest sources between two scenes. Inaccurate if the flux of the brightest source isn't >> than other sources. Parameters ---------- scene : pancake.Scene() Pancake scene to scale filt : str Filter we are comparing fluxes in master_scene : str Pancake scene to scale the flux relative to master_exposure_time : float Exposure time in seconds of the master scene observation Returns ------- exposure_time : float New exposure time for the scene ''' #Figure out how much longer (or shorter) an observation should be with reference to the relative flux of the #brightest sources between two scenes. Inaccurate if the flux of the brightest source isn't >> than other sources. magnitude=50 for i in range(len(scene.source_list)): source = scene.pandeia_scene[i] spec_wave, spec_flux = source['spectrum']['sed']['spectrum'] temp_magnitude = compute_magnitude(spec_wave, spec_flux, filt) if temp_magnitude < magnitude: magnitude = temp_magnitude master_magnitude=50 for i in range(len(master_scene.source_list)): source = master_scene.pandeia_scene[i] spec_wave, spec_flux = source['spectrum']['sed']['spectrum'] temp_magnitude = compute_magnitude(spec_wave, spec_flux, filt) if temp_magnitude < master_magnitude: master_magnitude = temp_magnitude flux_ratio = 100**((magnitude-master_magnitude)/5) exposure_time = flux_ratio * master_exposure_time return exposure_time def _extract_readout(self, scene, exposure, subarray, obs_dict, optimise_margin, scale_exposures, max_sat, verbose=True, min_groups=4, min_ints=1): ''' Wrapper function to determine readout parameters for a given observation Parameters ---------- scene : pancake.Scene() Pancake scene being obesrved exposure : tuple The exposure we want to perform, possible formats are - (FILTER, PATTERN, NGROUP, NINT) - (FILTER, 'optimise', t_exp) subarray : str Subarray we are observing in obs_dict : dict pandeia formatted observation dictionary optimise_margin : float Fraction of the input t_exp that we can deviate from the overall t_exp for a given exposure scale_exposures : pancake.Scene() Scene to scale the provided t_exp in order to reach ~the same number of photons max_sat : float Maximum fraction of saturation to reach. verbose : bool Boolean toggle for terminal printing. ''' filt = exposure[0].lower() if exposure[1] in ['optimise', 'optimize']: #Need to optimise the exposure readouts if verbose: print('Optimising Readout // {} // Exposure: {}, {} seconds'.format(obs_dict['scene_name'], exposure[0].upper(), str(exposure[2]))) exposure_time = exposure[2] if scale_exposures != None: if isinstance(scale_exposures, (int, float)): #Scale exposure time by a numeric value. if verbose: print('--> Scaling provided exposure times by {}'.format(scale_exposures)) exposure_time *= scale_exposures elif isinstance(scale_exposures, Scene): #Scale exposure time to match flux of another scene. if verbose: print('--> Scaling provided exposure times by relative flux of: "{}"'.format(scale_exposures.scene_name)) master_scene = scale_exposures exposure_time = self._relative_exposure_time(scene, filt, master_scene, master_exposure_time=exposure_time) else: raise ValueError('Chosen "scale_exposures" setting not recognised. Select int/float scaling factor or defined Scene.') pattern, groups, integrations = optimise_readout(obs_dict, exposure_time, optimise_margin, max_sat=max_sat, min_groups=min_groups, min_ints=min_ints) exposure_time = determine_exposure_time(subarray, pattern, groups, integrations) #Notify user of the optimised readout parameters if verbose: print('--> Pattern: {}, Number of Groups: {}, Number of Integrations: {} = {}s'.format(pattern.upper(), groups, integrations, int(exposure_time+0.5))) else: #Parameters have been specified explicitly if scale_exposures != None: exposure_time = determine_exposure_time(subarray, exposure[1].lower(), exposure[2], exposure[3]) if isinstance(scale_exposures, (int, float)): #Scale readout parameters by a numeric value if verbose: print('--> Scaling provided exposure times by {}'.format(scale_exposures)) exposure_time *= scale_exposures elif isinstance(scale_exposures, Scene): #Scale readout to match the flux of another scene. if verbose: print('--> Scaling provided exposure times by relative flux of: "{}"'.format(scale_exposures.scene_name)) master_scene = scale_exposures exposure_time = self._relative_exposure_time(scene, filt, master_scene, master_exposure_time=exposure_time) else: raise ValueError('Chosen "scale_exposures" setting not recognised. Select int/float scaling factor or defined Scene.') #"Re"-optimise readout patterns using the new exposure time pattern, groups, integrations = optimise_readout(obs_dict, exposure_time, optimise_margin, max_sat=max_sat, min_groups=min_groups, min_ints=min_ints) exposure_time = determine_exposure_time(subarray, pattern, groups, integrations) #Notify user of the optimised readout parameters if verbose: print('---> Pattern: {}, Number of Groups: {}, Number of Integrations: {} = {}s'.format(pattern.upper(), groups, integrations, int(exposure_time+0.5))) else: pattern, groups, integrations = exposure[1:] return pattern, groups, integrations def _calculate_opds(self, opd_estimate='requirements', opd_realisation=1, pa_range='median'): ''' We want to calculate a range of optical path difference (OPD) maps dependent on how the observatory moves throughout the sequenced observations. Parameters ---------- opd_estimate : str 'requirements' or 'predicted' initial OPD wavefront opd_realisation : int, 1-10 10 possible wavefronts to use to start a simulation, doesn't greatly impact results pa_range : str What range of PA's to aim for when calculating movement of telescope between scenes - options are, 'median', 'minimum', 'maximum' Returns ------- nircam_opd_lhdul : HDUList() HDUList for OPD wavefronts at the start, middle, and end of all observations for NIRCam miri_opd_lhdul HDUList for OPD wavefronts at the start, middle, and end of all observations for MIRI ''' # First thing we need to do is gather the geometric properties of the scenes in the sequence so that we # know what the ra, dec, lambda, beta, and pitch angle should be for each observation. geom_props = self._get_geom_props(pa_range=pa_range) # Check that the properties were received correctly, if not we can't perform the OPD drift if geom_props == None: return None obs_times = [0] pitch_angles = [geom_props[self.observation_sequence[0]['scene_name']]['pitch_angle']] skip_indices = [0] #OPDs that we don't need for the sequence, but are needed to more accurately estimate the OPD changes overall #Loops over observations to identify changes in observation time and slews for i, obs_dict in enumerate(self.observation_sequence): if i != 0: ###########This is the second observation onwards, potential for a slew #####First, find out if we have slewed to a different scene. previous_scene = self.observation_sequence[i-1]['scene_name'] current_scene = self.observation_sequence[i]['scene_name'] if previous_scene != current_scene: #Need to determine how large a slew #Calculate the angular offset between the two scenes #Let's work in an ecliptic coordinate system rather than equatorial, where lamb and beta are the #ecliptic longitudes and latitudes respectively. previous_lamb, previous_beta = geom_props[previous_scene]['lamb'], geom_props[previous_scene]['beta'] current_lamb, current_beta = geom_props[current_scene]['lamb'], geom_props[current_scene]['beta'] previous_lamb_rad, previous_beta_rad = np.deg2rad(previous_lamb), np.deg2rad(previous_beta) current_lamb_rad, current_beta_rad = np.deg2rad(current_lamb), np.deg2rad(current_beta) #Absolute slew is just the distance between the two points scene_slew_angdist_rad = np.arccos(np.sin(previous_beta_rad)*np.sin(current_beta_rad) + \ np.cos(previous_beta_rad)*np.cos(current_beta_rad)*np.cos(previous_lamb_rad-current_lamb_rad)) scene_slew_angdist = scene_slew_angdist_rad * (180/np.pi) scene_pitch_angle = geom_props[current_scene]['pitch_angle'] else: #There is no slew between scenes scene_slew_angdist = 0 scene_pitch_angle = geom_props[previous_scene]['pitch_angle'] ##### Now find out if the instrument has changed. previous_inst = self.observation_sequence[i-1]['configuration']['instrument']['instrument'] current_inst = self.observation_sequence[i]['configuration']['instrument']['instrument'] if previous_inst != current_inst: ''' The instrument has changed, so some time will have passed to perform the slew. In reality this depends on the exact subarray / aperture being used. Here we approximate to 500" based on the overall offset between NIRCam and MIRI from: This is roughly equivalent to 10 minutes of slew time. ''' if sorted([previous_inst, current_inst]) == ['miri', 'nircam']: inst_slew_angdist = 500 / 3600 #Potential for this to be negative, but assume positive for now. else: print('WARNING: Unable to process angular distance change between instruments.') #From the same link above, change in pitch angle (V3) is ~100", also could be negative. if current_inst == 'miri': inst_delta_pitch_angle = -100 / 3600 elif current_inst == 'nircam': inst_delta_pitch_angle = 100 / 3600 else: inst_slew_angdist = 0 inst_delta_pitch_angle = 0 if inst_slew_angdist != 0 or scene_slew_angdist !=0 : #There has been a slew #Now turn a slew distance into a slew time. slew_angdist = scene_slew_angdist + inst_slew_angdist pitch_angle = scene_pitch_angle + inst_delta_pitch_angle slew_time = self._slew_angdist_to_time(slew_angdist) obs_times.append(obs_times[-1] + slew_time) pitch_angles.append(pitch_angle) #Log the index so that we don't use its OPD for later calculations skip_indices.append(len(pitch_angles)-1) #Now that slews are accounted for, append observations as normal if obs_dict['dither_strategy'] == 'SINGLE-POINT': dither_points = 1 else: dither_points = int(obs_dict['dither_strategy'][0]) for j in range(dither_points): subarray = obs_dict['configuration']['detector']['subarray'] pattern = obs_dict['configuration']['detector']['readout_pattern'] groups = obs_dict['configuration']['detector']['ngroup'] integrations = obs_dict['configuration']['detector']['nint'] exposure_time = determine_exposure_time(subarray, pattern, groups, integrations) #Append the centre time of the observation, these are the OPD's we actually want. pitch_angles.append(pitch_angles[-1]) obs_times.append(obs_times[-1] + (exposure_time / 2)) #Also add the end observation time, but make sure it is skipped. pitch_angles.append(pitch_angles[-1]) obs_times.append(obs_times[-1] + (exposure_time / 2)) skip_indices.append(len(pitch_angles)-1) # Now that we know the pitch angle at specific times throughout our observation, we can load in the base OPD # files from WebbPSF, evolve them, delete the ones we don't need, and return the specific OPD for each observation. all_opd_estimates = ['predicted', 'requirements'] if opd_estimate not in all_opd_estimates: raise ValueError('Chosen OPD estimate "{}" not recognised. Compatible options are : {}'.format(opd_estimate, ', '.join(all_opd_estimates))) #base_opd = 'OPD_RevW_ote_for_{}_'+opd_estimate+'.fits.gz' base_opd = 'JWST_OTE_OPD_cycle1_example_2022-07-30.fits' #Get Base OPD for NIRCam and MIRI (or just one if only one in sequence). nircam_opd_file = os.path.join(webbpsf.utils.get_webbpsf_data_path(), base_opd.format('NIRCam')) miri_opd_file = os.path.join(webbpsf.utils.get_webbpsf_data_path(), base_opd.format('MIRI')) nircam_opd_hdu = OPDFile_to_HDUList(nircam_opd_file, slice_to_use=opd_realisation) miri_opd_hdu = OPDFile_to_HDUList(miri_opd_file, slice_to_use=opd_realisation) BaseNIRCamOPD = OTE_WFE_Drift_Model(opd=nircam_opd_hdu) BaseMIRIOPD = OTE_WFE_Drift_Model(opd=miri_opd_hdu) nircam_opd_hdul = BaseNIRCamOPD.opds_as_hdul(obs_times*u.second, np.array(pitch_angles), slew_averages=False) miri_opd_hdul = BaseMIRIOPD.opds_as_hdul(obs_times*u.second, np.array(pitch_angles), slew_averages=False) #Remove OPD's that we no longer need, leaving just those at the centre of each of our observations. for index in sorted(skip_indices, reverse=True): del nircam_opd_hdul[index] del miri_opd_hdul[index] #Now need to restructure as a list of HDULists for integration with WebbPSF nircam_opd_lhdul = [fits.HDUList([hdu]) for hdu in nircam_opd_hdul] miri_opd_lhdul = [fits.HDUList([hdu]) for hdu in miri_opd_hdul] return nircam_opd_lhdul, miri_opd_lhdul def _get_geom_props(self, pa_range='median'): ''' Get geometric properties of the scenes in an observational sequence, estimate the possible pitch angles for the scenes throughout the year, and assign which pitch angles to observe at. Parameters ---------- pa_range : str The range of PA's between scenes that we want to adopt for the simulation - 'minimum', the smallest possible range in PA between scenes (lowest wavefront drift) - 'median', the median possible range in PA between scenes - 'maximum', the maximum possible range in PA between scenes (highest wavefront drift) Returns ------- geom_props : dict Geometric properties for each scene within the sequence, including RA, Dec, Ecliptic Longitude/Latitude, and observatory PA. ''' ##### First get the RA, Dec geom_props = {} for obs_dict in self.observation_sequence[:]: scene_name = obs_dict['scene_name'] if scene_name not in geom_props.keys(): geom_props[scene_name] = {} ra, dec = self._find_ra_dec(obs_dict) if None in [ra, dec]: geom_props[scene_name]['failed_flag'] = True else: geom_props[scene_name]['failed_flag'] = False geom_props[scene_name]['ra'] = ra geom_props[scene_name]['dec'] = dec ##### Now need to check whether all RA's or Dec's have values scene_names = list(geom_props.keys()) flags = [geom_props[scene_name]['failed_flag'] for scene_name in scene_names] if True in flags: #Something failed if all(x == True for x in flags): #They all failed base_ra = 0 base_dec = 45 else: #At least one didn't fail base_ra = geom_props[scene_names[flags.index(False)]]['ra'] base_dec = geom_props[scene_names[flags.index(False)]]['dec'] for i, scene_name in enumerate(scene_names): if flags[i] == True: ra = base_ra + 5*i dec = base_ra + 5*i geom_props[scene_name]['ra'] = ra geom_props[scene_name]['dec'] = dec print('WARNING: RA and/or Dec value not present for Scene "{}", assuming values of RA={} and Dec={}'.format(scene_name, ra, dec)) #Calculate ecliptic longitude (lamb) and latitude (beta) for i, scene_name in enumerate(scene_names): lamb, beta = equatorial_to_ecliptic(geom_props[scene_name]['ra'], geom_props[scene_name]['dec'], form='degrees') geom_props[scene_name]['lamb'] = lamb geom_props[scene_name]['beta'] = beta ##### Get the pitch angles for all of the scenes. #Look at every orientation throughout the year orientations = np.arange(0, 360, 0.1) all_pitch_angles = np.empty((len(orientations), len(geom_props.keys()))) all_pitch_angle_ranges = np.empty(orientations.shape) for i, ostep in enumerate(orientations): temp_pitch_angles = [] #Loop over the scene names to preserve the order for later for scene_name in scene_names: #Get the ecliptic latitude and longitude in radians beta_rad = geom_props[scene_name]['beta'] * (np.pi / 180) lamb_rad = ((geom_props[scene_name]['lamb'] + ostep) % 360 ) * (np.pi / 180) #Identify the latitude after a rotation of the coordinate system by 90 degrees #--> This is equivalent to the pitch angle: AC has saved the derivation to DayOne on Feb 16 2021. pitch_angle = np.arcsin(np.cos(beta_rad)*np.sin(lamb_rad)) * (180/np.pi) temp_pitch_angles.append(pitch_angle) #Save the pitch angles all_pitch_angles[i] = temp_pitch_angles if all([-5 < pa < 45 for pa in temp_pitch_angles]): #Then at this orientation all scenes can be slewed to pitch_angle_range = max(temp_pitch_angles) - min(temp_pitch_angles) all_pitch_angle_ranges[i] = pitch_angle_range else: #It is not possible to observe everything at this slew all_pitch_angle_ranges[i] = np.nan #Need to check if any of the orientations were viable. if np.isnan(all_pitch_angle_ranges).all(): #There is no orientation where all scenes lie within the FOV. print('WARNING: Specified scenes always span a pitch angle range of greater than 50 degrees and cannot be scheduled in sequence. No OPD drift can be calculated for this simulation.') return None # Now we can use the pitch angle range to specify what the PA location of each Scene is for this sequence. if pa_range == 'minimum': pa_index = np.nanargmin(all_pitch_angle_ranges) elif pa_range == 'maximum': pa_index = np.nanargmax(all_pitch_angle_ranges) elif pa_range == 'median': valid_pa_ranges = all_pitch_angle_ranges[np.logical_not(np.isnan(all_pitch_angle_ranges))] median_pa_range = np.sort(valid_pa_ranges)[len(valid_pa_ranges)//2] pa_index = np.where(all_pitch_angle_ranges == median_pa_range)[0][0] #Set the pitch angles for this Sequence pitch_angles = all_pitch_angles[pa_index] for i, scene_name in enumerate(scene_names): geom_props[scene_name]['pitch_angle'] = pitch_angles[i] return geom_props def _find_ra_dec(self, obs_dict): ''' Simple helper function to grab the RA and Dec of a scene from an input Pandeia/PanCAKE dictionary. Parameters ---------- obs_dict : dict Pandeia dictionary created during Scene building Returns ------- ra : float Right ascension dec : float Declination ''' #Search for RA and DEC values ra, dec = None, None for source in obs_dict['scene']: try: ra = source['pancake_parameters']['ra'] dec = source['pancake_parameters']['dec'] except: #These values haven't been assigned. continue return ra, dec def _slew_angdist_to_time(self, slew_angdist): ''' Function to determine the time necessary to perform a slew of a given angular distance. All values taken from: Parameters ---------- slew_angdist : float Angular slew distance (in degrees) Returns ------- slew_time : float Slew time (in seconds) ''' slew_angdists = np.array([0.0000000, 0.0600000, 0.0600001, 15.0000000, 20.0000000, 20.0000001, 30.0000000, 50.0000000, 100.0000000, 150.0000000, 300.0000000, 1000.0000000, 3600.0000000, 4000.0000000, 10000.0000000, 10800.0000000, 10800.0000001, 14400.0000000, 18000.0000000, 21600.0000000, 25200.0000000, 28800.0000000, 32400.0000000, 36000.0000000, 39600.0000000, 43200.0000000, 46800.0000000, 50400.0000000, 54000.0000000, 57600.0000000, 61200.0000000, 64800.0000000, 68400.0000000, 72000.0000000, 108000.0000000, 144000.0000000, 180000.0000000, 216000.0000000, 252000.0000000, 288000.0000000, 324000.0000000, 360000.0000000, 396000.0000000, 432000.0000000, 468000.0000000, 504000.0000000, 540000.0000000, 576000.0000000, 612000.0000000, 648000.0000000]) slew_times = np.array([0.000, 0.000, 20.480, 20.480, 23.296, 101.632, 116.224, 137.728, 173.568, 198.656, 250.112, 373.504, 572.416, 592.896, 804.864, 825.600, 521.216, 578.048, 628.608, 674.560, 716.928, 756.608, 793.856, 829.184, 862.848, 894.976, 925.824, 955.648, 984.320, 1012.224, 1039.104, 1065.344, 1090.816, 1115.648, 1336.448, 1537.408, 1744.000, 1939.328, 2112.192, 2278.272, 2440.320, 2599.936, 2757.632, 2914.240, 3069.888, 3224.832, 3379.328, 3533.376, 3687.104, 3840.512]) slew_angdists_deg = slew_angdists / 3600 time_angdist_interp = interp1d(slew_angdists_deg, slew_times) slew_time = time_angdist_interp(slew_angdist) return slew_time def _get_ordered_scene_names(self, duplicates=True): ''' Helper function to get all scene names from an input Pandeia+PanCAKE dictionary. Parameters ---------- duplicates : bool Include duplicate scene names in the returned list. Returns ------- scene_names : list of strings All the scene names from the dictionary ''' scene_names = [] for obs_dict in self.observation_sequence: scene_names.append(obs_dict['scene_name']) if duplicates == False: scene_names = list(OrderedDict.fromkeys(scene_names)) return scene_names
[docs]def load_run(save_file): ''' Load a previously performed / incomplete PanCAKE simulation run. Parameters ---------- save_file : str File path for the saved simulation. Returns ------- hdul : FITS HDUList Extracted file in HDUList format. ''' #Load in a recently performed and saved run with as save_data: hdul = deepcopy(save_data) return hdul