Source code for pancake.utilities

"""
This file contains a number of pancake utilities which may be useful in running pancake.
"""
from __future__ import absolute_import, print_function

import os
import json
import numpy as np 
from pandeia.engine.psf_library import PSFLibrary
from pandeia.engine.source import Source
import pandeia.engine.sed as psed

from synphot import SourceSpectrum, SpectralElement, Observation
from synphot.models import Empirical1D
from synphot.units import VEGAMAG
import astropy.units as u

from .engine import calculate_target
from .io import read_bandpass

import requests
import requests.exceptions
import re 
import copy

import warnings
from astropy.utils.exceptions import AstropyWarning, AstropyUserWarning
warnings.simplefilter(action='ignore', category=AstropyWarning)
warnings.simplefilter(action='ignore', category=AstropyUserWarning) 

import matplotlib.pyplot as plt

[docs]def determine_pandeia_offset(config): """ Uses Pandeia's PSF library to determine which PSF offset pandeia would use for a particular configuration (for comparison with the offset that pancake would use in on-the-fly PSF generation). Parameters ---------- config : dict Pandeia dictionary for the observation Returns ------- - : dict Dictionary of 'scene' and 'reference' offsets. """ instrument = config['configuration']['instrument']['instrument'] aperture = config['configuration']['instrument']['aperture'] scene_sources, reference_sources = [], [] for source in config['scene']: scene_sources.append(Source(config=source)) if 'psf_subtraction_source' in config['strategy']: reference_sources.append(Source(config=config['strategy']['psf_subtraction_source'])) path = None if "pandeia_refdata" in os.environ: tel = 'jwst' ins = config['configuration']['instrument']['instrument'].lower() path = os.path.join(os.environ['pandeia_refdata'], tel, ins, 'psfs') library = PSFLibrary(path=path) scene_offsets = library.associate_offset_to_source(scene_sources, instrument, aperture) reference_offsets = library.associate_offset_to_source(reference_sources, instrument, aperture) return {'scene': scene_offsets, 'reference': reference_offsets}
[docs]def stellar_spectrum(stellar_type, bandpass, magnitude): """ Create a spectrum dictionary that assumes a Phoenix model with a key found in pandeia, and set the magnitude to the provided value in the provided bandpass (in ABMAG) Parameters ---------- stellar_type : str Spectral type of star bandpass : str Bandpass for normalisation magnitude : float Vega magnitude in aforementioned bandpass Returns ------- spectrum : dict Spectrum parameters for input into Pandeia """ spectrum = {'spectrum_parameters': ['normalization', 'sed']} spectrum['sed'] = {'sed_type': 'phoenix', 'key': stellar_type} if 'bessel' in bandpass or 'johnson' in bandpass: spectrum['normalization'] = {'type': 'photsys', 'bandpass': bandpass, 'norm_flux': magnitude, 'norm_fluxunit': 'abmag'} elif 'miri' in bandpass or 'nircam' in bandpass: spectrum['normalization'] = {'type': 'jwst', 'bandpass': bandpass, 'norm_flux': magnitude, 'norm_fluxunit': 'abmag'} else: spectrum['normalization'] = {'type': 'photsys', 'bandpass': bandpass, 'norm_flux': magnitude, 'norm_fluxunit': 'abmag'} return spectrum
[docs]def pandeia_spectrum(stellar_type): """ Generate a spectrum using the Pandeia Phoenix data files. Returns two arrays containing the wavelength and flux values, unormalised. Parameters ---------- stellar_type : str Spectral type of star Returns ------- spectrum.wave : pysynphot spectrum wavelength axis spectrum.flux : pysynphot spectrum wavelength axis """ pandeia_spectra = {'config':'phoenix/spectra.json'} psed_wav, psed_flux = psed.Phoenix(key=stellar_type, spectra=pandeia_spectra, webapp=True).get_spectrum() #Webapp=True so that only SpT is required PandeiaSED = SourceSpectrum(Empirical1D, points=psed_wav, lookup_table=psed_flux) #In format angstrom and flam spectrum_wave = PandeiaSED.waveset.to('micron') spectrum_flux = PandeiaSED(spectrum_wave, flux_unit='mJy') return spectrum_wave, spectrum_flux
[docs]def user_spectrum(filename, wave_unit='micron', flux_unit='mJy'): ''' Read in a user spectrum from a specified file. Parameters ---------- filename : str Filepath for user spectrum file wave_unit : str Unit for wavelength axis (column 1) flux_unit : str Unit for flux axis (column 2) Returns ------- spectrum.wave : pysynphot spectrum wavelength axis spectrum.flux : pysynphot spectrum wavelength axis ''' if not os.path.isfile(filename): raise OSError('File "{}" not located, unable to extract spectrum.'.format(filename)) with open(filename) as f: file_spectrum_data = np.genfromtxt(f).transpose() file_spectrum_wave = file_spectrum_data[0] file_spectrum_flux = file_spectrum_data[1] #Attempt to load in source spectrum and convert input units to the default 'angstrom' and 'flam' using << try: UserSED = SourceSpectrum(Empirical1D, points=file_spectrum_wave << u.Unit(wave_unit), lookup_table=file_spectrum_flux << u.Unit(flux_unit)) except: raise ValueError('Error converting input wave_unit/flux_unit to the Pandeia flux units via Synphot.') #This SED is in units 'angstrom' and 'flam', need to assign to users units and then convert to the Pandeia units of 'micron' and 'mjy' spectrum_wave = UserSED.waveset.to('micron') spectrum_flux = UserSED(spectrum_wave, flux_unit='mJy') return spectrum_wave, spectrum_flux
[docs]def normalise_spectrum(input_wave, input_flux, norm_val=5, norm_unit='vegamag', norm_bandpass='2mass_ks'): ''' Normalise a spectrum to a given bandpass Parameters ---------- input_wave : synphot wavelengths Wavelengths input_flux : synphot fluxes Flux values in mJy norm_val : float Value to normalise to norm_unit : str Unit for normalisation value norm_bandpass : str Bandpass to normalise in Returns ------- spectrum.wave : pysynphot spectrum wavelength axis spectrum.flux : pysynphot spectrum wavelength axis ''' #Get bandpass for normalisation NormBandpass = read_bandpass(norm_bandpass) #Create SED, ensuring to specify the input units are the Pandeia defaults micron and mJy SED = SourceSpectrum(Empirical1D, points=input_wave << u.Unit('micron'), lookup_table=input_flux << u.Unit('mJy')) VegaSED = SourceSpectrum.from_vega() # Normalize to input values, note that the units are retrieved from astropy, but some are explicitly defined in # the synphot package such as vegamag, abmag, flam, fnu. By virtue of running the SourceSpectrum/SpectralElement # classes these synphot units are loaded in but if things change in the future, this may break things. # # Synphot changed nomenclature for vegamag as of https://github.com/spacetelescope/synphot_refactor/pull/331 # Catch and change here if needed instead of rewriting documentation / user interface. if norm_unit == 'vegamag': SED = SED.normalize(norm_val*VEGAMAG, band=NormBandpass, vegaspec=VegaSED) else: SED = SED.normalize(norm_val*u.Unit(norm_unit), band=NormBandpass, vegaspec=VegaSED) #Return to Pandeia units spectrum_wave = SED.waveset.to('micron') spectrum_flux = SED(spectrum_wave, flux_unit='mJy') return spectrum_wave, spectrum_flux
[docs]def normalize_spectrum(*args, **kwargs): # This one's for y'all return normalise_spectrum(*args, **kwargs)
[docs]def query_simbad(query_string, query_timeout_sec=5.0, default_spt='a0v', verbose=True): """ Query simbad for details on a target object, adapted from the JWST Coronagraphic Visibility Tool Will in current state attempt to extract: RA, Dec, Spectral Type and Kmagnitude Parameters ---------- query_string : str String to query in SIMBAD, i.e. a stellar name / identifier. query_timeout_sec : float Time to allow for query to complete default_spt : str Default spectral type to adopt if SIMBAD spectral type cannot be interpreted verbose : bool Whether to print information to the terminal Returns ------- query_results : dict Extracted SIMBAD results. """ if not isinstance(query_string, str): raise TypeError('Name of source must be a string type') try: response = requests.get('http://cdsweb.u-strasbg.fr/cgi-bin/nph-sesame/-oF?' + query_string, timeout=query_timeout_sec) except (requests.exceptions.ConnectionError): raise ConnectionError('Could not access Simbad. Most likely because source name not recognised, but also check internet connection/Simbad website.') body = response.text query_results = {'ra':None, 'dec':None, 'canonical_id':None, 'norm_val':None, 'spt':None, 'norm_bandpass':None, 'norm_unit':None} for line in body.split('\n'): # RA and DEC if line[:2] == '%J' and query_results['ra'] is None: match = re.match(r'%J (\d+\.\d+) ([+\-]\d+\.\d+) .+', line) if match is None: return None query_results['ra'], query_results['dec'] = map(float, match.groups()) # Canonical ID / HD Number elif line[:4] == '%I.0' and query_results['canonical_id'] is None: match = re.match('%I.0 (.+)', line) if match is None: return None query_results['canonical_id'] = match.groups()[0] # Spectral Type elif line[:2] == '%S' and query_results['spt'] is None: match = re.match(r'%S ([^\s\n]+)', line) if match is None: return None query_results['spt'] = match.groups()[0] # K Magnitude elif line[:4] == '%M.K' and query_results['norm_val'] is None: match = re.match(r'%M.K (.?\d+\.\d+)', line) if match is None: return None query_results['norm_val'] = float(match.groups()[0]) #Check that the magnitude is from the 2MASS catalogue if '2003yCat.2246' in line: query_results['norm_bandpass'] = '2mass_ks' elif '2002yCat.2237' in line: query_results['norm_bandpass'] = 'johnson_k' else: if verbose: print('WARNING: Did not recognise K-band magnitude catalogue, approximating with 2MASS Ks') query_results['norm_bandpass'] = '2mass_ks' #Check that units are in Vegamag, if not check if in AB mag, if not assume Vega. if 'Vega' in line: query_results['norm_unit'] = 'vegamag' elif 'AB' in line: query_results['norm_unit'] = 'abmag' else: if verbose: print("WARNING: Couldn't determine magnitude system, assuming Vega magnitudes.") query_results['norm_unit'] = 'vegamag' # Check if there was no spectral type, if not just approximate if query_results['spt'] == None: if verbose: print("WARNING: Failed to obtain spectral type from SIMBAD. Using spectral type {} instead.".format(default_spt)) query_results['spt'] = default_spt # Check if any of the other values didn't get assigned... if None in query_results.values(): none_keys = ', '.join([str(key) for key in query_results if query_results[key] == None]) raise ValueError('Query to Simbad failed for {}, could not obtain: {}'.format(query_string, none_keys)) return query_results
[docs]def convert_spt_to_pandeia(raw_spectral_type): ''' Function to take a spectral type string, either from Simbad or directly from the user, and return an approximation that Pandeia can use. If the spectral type string cannot be understood, then the spectral type will be assumed as A0V. Makes use of a lot of if statements, perhaps someone smarter could streamline this at a later date. Parameters ---------- raw_spectral_type : str Spectral type to convert to Pandeia compatability. Returns ------- pandeia_spectral_type : str Pandeia compatible spectral type ''' raw_spectral_type = raw_spectral_type.lower() pandeia_spectral_type = None match_failed = False failed_spt = 'a0v' failure_message = "WARNING: Failed to approximate spectral type '{}' to Pandeia compatible spectral type. Using spectral type {} instead.".format(raw_spectral_type, failed_spt) #Read in default spectral types file to check if there is a match to the input spectral type pandeia_valid_spectral_types_file = os.path.join(os.environ.get("pandeia_refdata"), "sed/phoenix/spectra.json") try: with open(pandeia_valid_spectral_types_file, 'r') as f: pandeia_valid_spectral_types_dict = json.load(f) except IOError: raise IOError("Couldn't locate Pandeia reference files and/or pandeia_data/sed/phoenix/spectra.json file.") pandeia_valid_spectral_types = sorted(list(pandeia_valid_spectral_types_dict.keys())) pandeia_spectral_type = raw_spectral_type # If there is a match, simply return the raw spectral type if pandeia_spectral_type in pandeia_valid_spectral_types: return pandeia_spectral_type # If there is not a match, need to approximate one and notify the user. else: #Check if star is an O/B/A/F/G/K/M star compatible_temp_classes = ['o', 'b', 'a', 'f', 'g', 'k', 'm'] if pandeia_spectral_type[0] not in compatible_temp_classes: print(failure_message) return failed_spt #Check for binary+ objects, only use the primary if so. if '+' in pandeia_spectral_type: pandeia_spectral_type = pandeia_spectral_type.split('+')[0] #Weirdest objects already removed, strip out simbad 'peculiarities', won't strip out 'v' for variable spectrum #as it clashes with luminosity class. This is maybe fixable but: very small number of objects affected and tricky #to implement as it requires case sensitivity. for peculiarity in ['s', 'e', 'n', 'w', 'h', 'p', 'c', 'r', 'u', '?', '(', ')']: pandeia_spectral_type = pandeia_spectral_type.replace(peculiarity, '') #Also a bunch of 'I' subclasses that can't be used, approximate all of them to just type 'I' for isubclass in ['ia-0', 'ia-0/ia', 'ia', 'ia/iab', 'iab', 'iab-b', 'ib', 'ib-ii', '_0-ia', '_0-ia+']: pandeia_spectral_type = pandeia_spectral_type.replace(isubclass, 'i') #Check if 'm' for metallic lines present, but not if the first character or a character after '/' or '+' if 'm' in pandeia_spectral_type and pandeia_spectral_type[0] != 'm': m_index = pandeia_spectral_type.rfind('m') #Search for last 'm' in the string if not pandeia_spectral_type[m_index-1] in ['/', '+']: #Should be just a metallic line signifier pandeia_spt_list = list(pandeia_spectral_type) pandeia_spt_list[m_index] = '' pandeia_spectral_type = ''.join(pandeia_spt_list) # Next need to check length of spectra, idea is to get everything a single temperature (O3-M9) and luminosity class (I/II/III/IV/V/VI) # before further processing can be done. if len(pandeia_spectral_type) == 1: #Must be one of previuosly checked temperature classes, with no subdivision, assume it is type *5V pandeia_spectral_type = '{}5v'.format(pandeia_spectral_type[0]) elif len(pandeia_spectral_type) == 2: #Check the second string value is a number (temperature signifier) try: float(pandeia_spectral_type[1]) except: print(failure_message) return failed_spt else:#If it is a number, assume star is spectral type 'V'. pandeia_spectral_type += 'v' elif len(pandeia_spectral_type) == 3: #Couple of options, most should be simple like 'a3v', but some complicated like 'K-M' that we can still use try: #If not a number in the middle, must be something weird float(pandeia_spectral_type[1]) except: if pandeia_spectral_type[1] == '-' or pandeia_spectral_type[1] == '/': #Temperature class is uncertain with no luminosity classifer, approximate to the latter temperature class *0V pandeia_spectral_type = '{}0v'.format(pandeia_spectral_type[2]) elif pandeia_spectral_type[1] == ':': #Just don't know what the temperature class subdivision is, approximate to 5 pandeia_spectral_type = pandeia_spectral_type.replace(':', '5') else: print(failure_message) return failed_spt else: #Whole host of options, some standard and some more complex. if ':' in pandeia_spectral_type: if pandeia_spectral_type[-1] == ':': #Uncertainty in the last quantity, luminosity or temperature class try: float(pandeia_spectral_type[-2]) except: #It could be the luminosity class if pandeia_spectral_type[-2] in ['i', 'v']: pandeia_spectral_type = pandeia_spectral_type.replace(':','') else: print(failure_message) return failed_spt else: #It's the temperature class and there is no luminosity class pandeia_spectral_type = pandeia_spectral_type.replace(':','') if '/' in pandeia_spectral_type: # if 'iii/v' in pandeia_spectral_type: # pandeia_spectral_type = pandeia_spectral_type.replace('iii/v', 'v') #Must be some uncertainity in either temperature or luminosity class, or both. split_spectral_type = pandeia_spectral_type.split('/') pandeia_spt_lumclass = None pandeia_spt_tempclass = None if len(split_spectral_type) >= 4: # Too many splits, can handle at best 1 split in temperature and 1 split in luminosity print(failure_message) return failed_spt for i in range(len(split_spectral_type)-1): try: #Check if the character just before the split is a number float(split_spectral_type[i][-1]) except: #If it isn't... Should be an uncertainty in luminosity class, approximate to I, III, or V if split_spectral_type[-1] == 'i' or split_spectral_type[-1] == 'ii': pandeia_spt_lumclass = 'i' elif split_spectral_type[-1] == 'iii' or split_spectral_type[-1] == 'iv': pandeia_spt_lumclass = 'iii' elif split_spectral_type[-1] == 'v' or split_spectral_type[-1] == 'vi': pandeia_spt_lumclass = 'v' else: print(failure_message) return failed_spt else: #If it is... Should be an uncertainty in temperature class, couple more options try: #Check if the character *after* the split is a number float(split_spectral_type[i+1][0]) except: #If it isn't... temperature class is uncertain between the actual divisions, e.g. K9/M0 subdiv_one = float(''.join(c for c in split_spectral_type[i] if c.isdigit() or c == '.')) subdiv_two = float(''.join(c for c in split_spectral_type[i+1] if c.isdigit() or c == '.')) + 10 average_subdiv = int(round((subdiv_one+subdiv_two)/2)) if average_subdiv >= 10: average_subdiv -= 10 pandeia_spt_tempclass = '{}{}'.format(split_spectral_type[i+1][0],average_subdiv) else: pandeia_spt_tempclass = '{}{}'.format(split_spectral_type[0][0],average_subdiv) else: #If it is... temperature class is uncertain between subdivisions, e.g. K8/9 subdiv_one = float(''.join(c for c in split_spectral_type[i] if c.isdigit() or c == '.')) subdiv_two = float(''.join(c for c in split_spectral_type[i+1] if c.isdigit() or c == '.')) average_subdiv = int(round((subdiv_one+subdiv_two)/2)) pandeia_spt_tempclass = '{}{}'.format(split_spectral_type[0][0],average_subdiv) #Combine average classes with each other or the base spectral type. if pandeia_spt_tempclass != None and pandeia_spt_lumclass != None: pandeia_spectral_type = pandeia_spt_tempclass + pandeia_spt_lumclass elif pandeia_spt_lumclass == None: #Need to get luminosity class from the base spectral type, final character before lumclass should be a number tempclass_final_index = [x.isdigit() for x in pandeia_spectral_type[::-1]].index(True) if tempclass_final_index == 0: #There is no information on the luminosity class, use 'v' pandeia_spt_lumclass = 'v' else: #There is spectral class information. pandeia_spt_lumclass = pandeia_spectral_type[-tempclass_final_index:] pandeia_spectral_type = pandeia_spt_tempclass + pandeia_spt_lumclass elif pandeia_spt_tempclass == None: #Need to get the temperature class from the base spectral type subdiv = float(''.join(c for c in split_spectral_type[0] if c.isdigit() or c == '.')) if subdiv == 9.5: #Will be best approximated by '0' of the next temperature class down i.e. K9.5->M0 try: div = compatible_temp_classes[compatible_temp_classes.index(pandeia_spectral_type[0])+1] pandeia_spt_tempclass = div + '0' pandeia_spectral_type = pandeia_spt_tempclass + pandeia_spt_lumclass except: print(failure_message) return failed_spt else: #Simply round the temperature class to the nearest even value. pandeia_spt_tempclass = pandeia_spectral_type[0] + str(int(round(subdiv))) pandeia_spectral_type = pandeia_spt_tempclass + pandeia_spt_lumclass else: print(failure_message) return failed_spt #After that, check the luminosity class and convert to one Pandeia can use if necessary if pandeia_spectral_type[-2:] == 'ii' and pandeia_spectral_type[-3] != 'i': #Luminosity class II, approximate to III pandeia_spectral_type += 'i' elif pandeia_spectral_type[-2:] == 'iv' or pandeia_spectral_type[-2:] == 'vi': #Luminosity class IV or VI, approximate to V pandeia_spectral_type = pandeia_spectral_type[:-2] pandeia_spectral_type += 'v' elif pandeia_spectral_type[-1] == 'i' or pandeia_spectral_type[-1] == 'v': #Should only catch the remaining I, III and V, luminosity classes - just pass to avoid error pass else: print(failure_message) return failed_spt if pandeia_spectral_type not in pandeia_valid_spectral_types: #Need to convert cleaned spectral type to the nearest one that Pandeia can use. But first, to ease with the #adjustment of the strings we split up the spectral type string. pandeia_spectral_type = list(pandeia_spectral_type) if len(pandeia_spectral_type) == 3 and pandeia_spectral_type[-1] == 'i': #These are the 'i' class stars if pandeia_spectral_type[0] == 'o': # O star conversions if pandeia_spectral_type[1] in ['3', '4', '5', '7']: pandeia_spectral_type[1] = '6' elif pandeia_spectral_type[1] == '9': pandeia_spectral_type[1] = '8' else: print(failure_message) return failed_spt elif pandeia_spectral_type[0] == 'm': # M star conversions if pandeia_spectral_type[1] == '1': pandeia_spectral_type[1] = '0' elif pandeia_spectral_type[1] in ['3', '4', '5', '6', '7', '8', '9']: pandeia_spectral_type[1] = '2' else: print(failure_message) return failed_spt else: #BAFGK star convsersions if pandeia_spectral_type[1] in ['1', '2']: pandeia_spectral_type[1] = '0' elif pandeia_spectral_type[1] in ['3','4', '6', '7']: pandeia_spectral_type[1] = '5' elif pandeia_spectral_type[1] in ['8', '9']: pandeia_spectral_type[0] = compatible_temp_classes[compatible_temp_classes.index(pandeia_spectral_type[0])+1] pandeia_spectral_type[1] = '0' else: print(failure_message) return failed_spt elif len(pandeia_spectral_type) == 5: #These are 'iii' class stars if pandeia_spectral_type[0] == 'o' or (pandeia_spectral_type[0] == 'b' and pandeia_spectral_type[1] in ['1', '2']): pandeia_spectral_type = 'b0iii' elif (pandeia_spectral_type[0] == 'b' and pandeia_spectral_type[1] in ['3','4', '6', '7', '8', '9']) or pandeia_spectral_type[0] == 'a': pandeia_spectral_type = 'b5iii' elif pandeia_spectral_type[0] == 'f' or (pandeia_spectral_type[0] == 'g' and pandeia_spectral_type[1] in ['1', '2']): pandeia_spectral_type = 'g0iii' elif (pandeia_spectral_type[0] == 'g' or pandeia_spectral_type[0] == 'k') and pandeia_spectral_type[1] in ['3','4','6', '7']: pandeia_spectral_type[1] = '5' #Convert to g5iii or k5iii elif (pandeia_spectral_type[0] == 'g' and pandeia_spectral_type[1] in ['8', '9']) or (pandeia_spectral_type[0] == 'k' and pandeia_spectral_type[1] in ['1', '2']): pandeia_spectral_type = 'k0iii' elif (pandeia_spectral_type[0] == 'k' and pandeia_spectral_type[1] in ['8', '9']) or pandeia_spectral_type[0] == 'm': pandeia_spectral_type = 'm0iii' else: print(failure_message) return failed_spt elif pandeia_spectral_type[-1] == 'v': #These are the 'v' class stars, lots of possibilities: if pandeia_spectral_type[0] == 'o' and int(pandeia_spectral_type[1])%2 == 0: #O star conversions pandeia_spectral_type[1] = str(int(pandeia_spectral_type[1]) - 1) elif pandeia_spectral_type[0] == 'b': #B star conversions if pandeia_spectral_type[1] in ['2', '4', '6']: pandeia_spectral_type[1] = str(int(pandeia_spectral_type[1]) - 1) elif pandeia_spectral_type[1] in ['7', '9']: pandeia_spectral_type[1] = '8' else: print(failure_message) return failed_spt elif pandeia_spectral_type[0] == 'a': if pandeia_spectral_type[1] in ['2', '4', '6']: pandeia_spectral_type[1] = str(int(pandeia_spectral_type[1]) - 1) elif pandeia_spectral_type[1] == '7': pandeia_spectral_type[1] = '5' elif pandeia_spectral_type[1] in ['8', '9']: pandeia_spectral_type = 'f0v' else: print(failure_message) return failed_spt elif pandeia_spectral_type[0] == 'f' or pandeia_spectral_type[0] == 'g': if pandeia_spectral_type[1] in ['1', '3', '6', '9']: pandeia_spectral_type[1] = str(int(pandeia_spectral_type[1]) - 1) elif pandeia_spectral_type[1] in ['4', '7']: pandeia_spectral_type[1] = str(int(pandeia_spectral_type[1]) + 1) else: print(failure_message) return failed_spt elif pandeia_spectral_type[0] == 'k': if pandeia_spectral_type[1] in ['1', '3', '6', '8']: pandeia_spectral_type[1] = str(int(pandeia_spectral_type[1]) - 1) elif pandeia_spectral_type[1] == '4': pandeia_spectral_type[1] = '5' elif pandeia_spectral_type[1] == '9': pandeia_spectral_type = 'm0v' else: print(failure_message) return failed_spt elif pandeia_spectral_type[0] == 'm': if pandeia_spectral_type[1] in ['1', '3']: pandeia_spectral_type[1] = str(int(pandeia_spectral_type[1]) - 1) elif pandeia_spectral_type[1] in ['4', '6', '7', '8', '9']: pandeia_spectral_type[1] = '5' else: print(failure_message) return failed_spt pandeia_spectral_type = ''.join(pandeia_spectral_type) # Final check to ensure that the approximation worked, if not then set to failed spt. Unless there is a bug this shouldn't get called. if pandeia_spectral_type not in pandeia_valid_spectral_types: print(failure_message) return failed_spt # Check if pandeia spectral type equals user provided, if not notify user. if pandeia_spectral_type != raw_spectral_type: approximated_spt_message = "WARNING: Spectral type '{}' not compatible with Pandeia grid, using spectral type '{}' instead." print(approximated_spt_message.format(raw_spectral_type,pandeia_spectral_type)) return pandeia_spectral_type
[docs]def optimise_readout(obs_dict, t_exp, optimise_margin, min_sat=1e-6, max_sat=1, min_groups=4, min_ints=1): ''' Function to estimate optimal readout parameters for a given observation based on an input exposure time. Parameters ---------- obs_dict : dict Pandeia dictionary for this observation t_exp : float Desired exposure time in seconds optimise_margin : float Fraction of t_exp "wiggle-room" in estimating optimal readout min_sat : float Minimum allowable fraction of saturation max_sat : float Maximum allowable fraction of saturation Returns ------- pattern : str Optimal readout pattern groups : int Optimal number of groups integrations : int Optimal number of integrations ''' pattern, groups, integrations = None, None, None t_margin = t_exp * optimise_margin #Convert optimise margin to margin in seconds instrument = obs_dict['configuration']['instrument']['instrument'] subarray = obs_dict['configuration']['detector']['subarray'] #Need to extract subarray frame time from config files config_file = os.path.join(os.environ.get("pandeia_refdata"), 'jwst/{}/config.json'.format(instrument)) try: with open(config_file, 'r') as f: config_dict = json.load(f) except IOError: raise IOError("Couldn't locate Pandeia reference files and/or pandeia_data/jwst/{}/config.json file.".format(instrument)) subarray_frame_time = config_dict['subarray_config']['default'][subarray]['tframe'] if instrument == 'nircam': #NIRCam optimisation tricky due to the variety of readout patterns. First rule out the BRIGHT1, SHALLOW2, MEDIUM2, and DEEP2 #patterns as their counterparts have more measured frames. 5 patterns left, in this naive optimisation we will try to use the #longest possible ramp without saturating. Might be that better contrast can be obtained further out by letting the innermost #region saturate, not going to try and approximate this effect. used_patterns = ['deep8', 'medium8', 'shallow4', 'bright2', 'rapid'] #Descending order important as longest pattern checked first obs_dict['configuration']['detector']['ngroup'] = 2 #If it saturates in groups, tough to observe obs_dict['configuration']['detector']['nint'] = 1 obs_dict['configuration']['detector']['readout_pattern'] = 'rapid' data = calculate_target(obs_dict) sat_frac = data['scalar']['fraction_saturation'] sat_pix = np.count_nonzero(data['2d']['saturation']) #Can't work with groups like MIRI, need to define a maximum integration time before saturation sat_time = (max_sat/sat_frac)*2 * subarray_frame_time if sat_frac > 1: #There is saturation, use fastest possible readout pattern and warn user. print('WARNING: {:.1f}% saturation detected with the shortest possible readout settings. {} pixels affected.'.format(100*sat_frac, sat_pix)) pattern, groups, integrations = 'rapid', 2, 1 else: #No saturation, optimise readout parameters best_pattern = None for pattern in used_patterns: #Extract pattern config information from earlier loaded dict pattern_config = config_dict['readout_pattern_config'][pattern] nframe = pattern_config['nframe'] nskip = pattern_config['ndrop2'] #Time for a single integration with two groups (minimum possible) one_integration_time = subarray_frame_time * ((nframe+nskip)+nframe) if one_integration_time > (t_exp+t_margin) or one_integration_time > sat_time: #This pattern is too long to be used, go to next pattern in the list continue else: #Pattern will fit into the desired exposure time, now find optimal parameters (if possible) #First need to calculate max number of groups, which is a little tricky. Official formula is #integration time = Tframe × [(Nframes + Nskip) × (Ngroups -1) + Nframes] and we can rearrange #to find the maximum number of groups given the exposure time. max_groups_exp = int(np.floor((((t_exp + t_margin) - (nframe * subarray_frame_time))/(subarray_frame_time*(nframe+nskip))) + 1)) max_groups_sat = int(np.floor((((sat_time) - (nframe * subarray_frame_time))/(subarray_frame_time*(nframe+nskip))) + 1)) max_groups = np.min([max_groups_exp, max_groups_sat]) if 'deep' in pattern and max_groups > 20: max_groups = 20 #Maximum number of allowed groups for DEEP readout elif max_groups > 10: max_groups = 10 #Maximum number of allowed groups for all other readouts. #Now follow a similar methodology as with MIRI best_ngroup = None #Start at highest possible number of groups and work our way down until one fits within the margin. for ngroup in reversed(range(min_groups,max_groups+1)): integration_time = subarray_frame_time * ((nframe + nskip)*(ngroup-1) + nframe) min_int = int(np.ceil( (t_exp-t_margin) / integration_time)) max_int = int(np.floor( (t_exp+t_margin) / integration_time)) # May want to override minimum number of integrations if min_ints == 1: min_int = int(np.ceil( (t_exp-t_margin) / integration_time)) else: min_int = min_ints #Range of possible ints for this number of groups, if one or more match find which one is closest best_nint = None best_diff = np.inf for nint in range(min_int, max_int+1): t = nint*integration_time diff = abs(t_exp - t) if (diff <= abs(t_exp-t_margin)) and diff < best_diff: #Found a pattern that fits within our range best_pattern = pattern best_ngroup = ngroup best_nint = nint best_diff = diff if best_nint != None: #Have found optimised readout parameters for this pattern within the user defined range, exit optimisation loop. break else: #Need to keep searching pass if best_pattern != None: # Optimised pattern and readout parameters have been found, break out of pattern loop break else: #Need to move to the next pattern down pass if best_ngroup == None or best_nint == None or best_pattern == None: #Optimisation has failed raise RuntimeError('Unable to identify optimal readout parameters. Increase "optimise_margin" or manually define readout.') #Otherwise, the optimisation was succesful pattern, groups, integrations = best_pattern, best_ngroup, best_nint elif instrument == 'miri': #MIRI optimisation is relatively straightforward, always use FAST readout pattern, so we just need to find the largest number of #groups that a) doesn't saturate the detector, and b) is within th margin around the submitted time. obs_dict['configuration']['detector']['ngroup'] = 2 #If it saturates in 5 groups, tough to observe obs_dict['configuration']['detector']['nint'] = 1 obs_dict['configuration']['detector']['readout_pattern'] = 'fast' data = calculate_target(obs_dict) sat_frac = data['scalar']['fraction_saturation'] sat_pix = np.count_nonzero(data['2d']['saturation']) if sat_frac > 1: #There is saturation, use fastest possible readout pattern and warn user. print('WARNING: {:.1f}% saturation detected with the shortest possible readout settings. {} pixels affected.'.format(100*sat_frac, sat_pix)) pattern, groups, integrations = 'fast', 2, 1 else: #No saturation, optimise readout parameters. max_groups = int(np.floor((max_sat/sat_frac)*2)) cosmic_ray_groups = int(np.floor(300/subarray_frame_time)) if max_groups > cosmic_ray_groups: #Above balanced cosmic ray limit of 300s, force maximum down to the highest number of groups max_groups = cosmic_ray_groups best_ngroup = None #Start at highest possible number of groups and work our way down until one fits within the margin. for ngroup in reversed(range(1,max_groups+1)): min_int = int(np.ceil( (t_exp-t_margin) / (subarray_frame_time*ngroup))) max_int = int(np.floor( (t_exp+t_margin) / (subarray_frame_time*ngroup))) # May want to override minimum number of integrations if min_ints == 1: min_int = int(np.ceil( (t_exp-t_margin) / (subarray_frame_time*ngroup))) else: min_int = min_ints #Range of possible ints for this number of groups, if one or more match find which one is closest best_nint = None best_diff = np.inf for nint in range(min_int, max_int+1): t = subarray_frame_time*ngroup*nint diff = abs(t_exp - t) if (diff <= abs(t_exp-t_margin)) and diff < best_diff: #Found a pattern that fits within our range best_ngroup = ngroup best_nint = nint best_diff = diff if best_nint != None: #Have found an optimised readout pattern within the user defined range, exit optimisation loop. break else: #Need to keep searching pass if best_ngroup == None or best_nint == None: #Optimisation has failed raise RuntimeError('Unable to identify optimal readout parameters. Increase "optimise_margin" or manually define readout.') #Otherwise, the optimisation was succesful pattern, groups, integrations = 'fast', best_ngroup, best_nint else: raise ValueError('Instrument {} is not supported, please select NIRCam or MIRI'.format(instrument)) return pattern, groups, integrations
[docs]def optimize_readout(*args, **kwargs): return optimise_readout(*args, **kwargs)
[docs]def compute_magnitude(spectrum_wave, spectrum_flux, filt, wave_unit='micron', flux_unit='mJy'): ''' Calculate the magnitude of a spectrum in a given filter. Parameters ---------- spectrum_wave : synphot wavelengths Wavelengths spectrum_flux : synphot fluxes Flux values in mJy filt : str Filter / bandpass to use wave_unit : str Unit of spectrum wave axis flux_unit : str Unit of spectrum flux axis Returns ------- magnitude : float Vega magnitude in provided filter. ''' Bandpass = read_bandpass(filt) SED = SourceSpectrum(Empirical1D, points=spectrum_wave << u.Unit(wave_unit), lookup_table=spectrum_flux << u.Unit(flux_unit)) VegaSED = SourceSpectrum.from_vega() Obs = Observation(SED, Bandpass, binset=Bandpass.waveset) magnitude = Obs.effstim(flux_unit='vegamag', vegaspec=VegaSED).value return magnitude
[docs]def equatorial_to_ecliptic(ra, dec, form=None, verbose=False): ''' Function to covert equatorial RA and Dec into the ecliptic longitude and latitude. Parameters ---------- ra : float Right ascension dec : float Declination form : str Format of values, 'degrees' or anything else for radians. verbose : bool Print information to terminal Returns ------- lamb : float Ecliptic longtiude beta : float Ecliptic latitude ''' ecl = 23.43 * (np.pi / 180) if form == 'degrees': ra *= (np.pi / 180) dec *= (np.pi / 180) else: if verbose: print('Assuming angles are in radians') beta = np.arcsin(np.sin(dec)*np.cos(ecl) - np.cos(dec)*np.sin(ecl)*np.sin(ra)) lamb = np.arccos(np.cos(ra)*np.cos(dec) / np.cos(beta)) if form == 'degrees': beta *= 180/np.pi lamb *= 180/np.pi return lamb, beta