from __future__ import absolute_import, print_function
from copy import deepcopy
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as tck
from astropy.io import fits
from .transformations import polar_to_cart, cart_to_polar, rotate
from .utilities import query_simbad, convert_spt_to_pandeia, user_spectrum, pandeia_spectrum, normalise_spectrum, compute_magnitude
from pandeia.engine.calc_utils import build_default_calc
[docs]class Scene():
'''
Scene class to mirror the construction of a typical Pandeia 'Scene', reinventing the wheel a little bit, but means that
users don't need to import pandeia in their main code and that aspects of data input can be streamlined.
'''
__NEXT_ID = 1
def __init__(self, name=None, **kwargs):
'''
Initialisation function, can pass a string name.
'''
#Load a default pandeia scene to assign properties to. NIRCam/Coroangraphy doesn't matter here, just need an empty scene dict
self.pandeia_scene = build_default_calc('jwst', 'nircam', 'coronagraphy')['scene']
self.pandeia_scene[0]['assigned'] = False #No source has been assigned to this 'default' scene yet
self.source_list = []
if name == None:
self.scene_name = 'Scene{}'.format(Scene.__NEXT_ID)
Scene.__NEXT_ID += 1
else:
self.scene_name = name
[docs] def add_source(self, name, kind='simbad', r=0.0, theta=0.0, verbose=True, **kwargs):
'''
Add a source to the Scene
Parameters
----------
name : str
Name of the source to be added
kind : str
Kind of source to add, options are
- 'simbad' for SIMBAD query of name string
- 'grid' to use Pandeia Phoenix with 'spt', 'norm_val', 'norm_unit', 'norm_bandpass' passed as kwargs
- 'file' to use input file, with 'wave_unit' and 'flux_unit' passed as kwargs
r : float
Radial separation of source from center in arcseconds
theta : float
PA of source, should be N->E counterclockwise
verbose : bool
Print update statements to terminal
'''
if verbose: print('{} // Adding Source: {}'.format(self.scene_name, name))
raw_id = len(self.source_list)
if '_' in name or ':' in name:
raise ValueError('Underscores "_" and colons ":" cannot be included in Scene names for file saving purposes')
self.source_list.append(name)
#Check if the scene dictionary needs to be extended to add another source.
if raw_id > 0: self.pandeia_scene.append(deepcopy(self.pandeia_scene[0]))
#Load in source
working_source = self.pandeia_scene[raw_id]
working_source['id'] = raw_id+1 #Each source must have an allocated source ID starting at 1.
working_source['pancake_parameters'] = {}
working_source['pancake_parameters']['name'] = name
#Apply source offset
#NOTE: polar_to_cart returns a normal conversion, in our case things are flipped as we are working from the y-axis.
yoff, xoff = polar_to_cart(r, theta)
working_source['position']['x_offset'] = -xoff #Negative as we are working N->E counterclockwise
working_source['position']['y_offset'] = yoff
# Use 'kind' of source input to read in different properties
if kind == 'simbad':
#Attempt to query data for the input 'name' string from simbad
query_results = query_simbad(name, verbose=verbose)
try:
sptval = kwargs.get('spt').lower()
print('Attempting to use Provided Spectral Type: {}'.format(sptval))
except:
sptval = query_results['spt']
print('Attempting to use SIMBAD Spectral Type: {}'.format(sptval))
approx_spt = convert_spt_to_pandeia(sptval)
working_source['pancake_parameters']['spt'] = approx_spt
for qresult in ['ra', 'dec', 'norm_bandpass', 'norm_val', 'norm_unit']:
working_source['pancake_parameters'][qresult] = query_results[qresult]
#Generate spectrum
raw_spectrum_wave, raw_spectrum_flux = pandeia_spectrum(working_source['pancake_parameters']['spt'])
spectrum_wave, spectrum_flux = normalise_spectrum(raw_spectrum_wave, raw_spectrum_flux, norm_val=working_source['pancake_parameters']['norm_val'], norm_unit=working_source['pancake_parameters']['norm_unit'], norm_bandpass=working_source['pancake_parameters']['norm_bandpass'])
elif kind == 'grid':
#User provides the spectral type, normalisation bandpass, and normalisation mag for source so a spectrum can be retrieved from a grid
for ginput in ['spt', 'norm_val', 'norm_unit', 'norm_bandpass']:
#Check variable has been provided
try:
ginput_val = kwargs.get(ginput)
except:
raise NameError("Please provide the spectral type ('spt'), normalisation flux ('norm_val'), normalisation flux unit ('norm_unit'), and normalisation bandpass ('norm_bandpass') of the source")
#Check inputs are of the correct variable types.
if ginput!='norm_val' and not isinstance(ginput_val, str):
raise TypeError("{} input must be of string type.".format(ginput))
elif ginput=='norm_val' and not isinstance(ginput_val, (int,float)):
raise TypeError("{} input must be of int or float type.".format(ginput))
# Assign input values to the scene.
if ginput == 'spt':
#Find the best approximation spectral type that Pandeia understands (could match what user provides).
approx_spt = convert_spt_to_pandeia(kwargs.get(ginput))
working_source['pancake_parameters']['spt'] = approx_spt
else:
working_source['pancake_parameters'][ginput] = kwargs.get(ginput)
#Generate spectrum
raw_spectrum_wave, raw_spectrum_flux = pandeia_spectrum(working_source['pancake_parameters']['spt'])
spectrum_wave, spectrum_flux = normalise_spectrum(raw_spectrum_wave, raw_spectrum_flux, norm_val=working_source['pancake_parameters']['norm_val'], norm_unit=working_source['pancake_parameters']['norm_unit'], norm_bandpass=working_source['pancake_parameters']['norm_bandpass'])
elif kind == 'file':
#User provides a file location for a spectrum which can then be read in and converted to microns and mJy
spectrum_wave, spectrum_flux = user_spectrum(kwargs.get('filename'), wave_unit=kwargs.get('wave_unit'), flux_unit=kwargs.get('flux_unit'))
else:
raise ValueError("Source generation kind not recognised, please use 'simbad', 'grid', or 'file'")
#We are taking the normalisation away from Pandeia so that other filters can be added like 2MASS/WISE.
working_source['spectrum']['normalization']['type'] = 'none'
#Assign spectrum of source to the Pandeia scene.
working_source['spectrum']['sed']['sed_type'] = 'input'
working_source['spectrum']['sed']['spectrum'] = [spectrum_wave, spectrum_flux]
[docs] def renormalize_source(self, *args, **kwargs):
#I assure you it's pronounced 'zed'.
return renormalise_source(self, *args, **kwargs)
[docs] def renormalise_source(self, source, norm_val=5, norm_unit='vegamag', norm_bandpass='2mass_ks'):
'''
Renormalise a source already within a scene.
Parameters
----------
source : str
Name of source to renormalise
norm_val : float
Value to renormalise to
norm_unit : str
Unit to perform renormalisation to
norm_bandpass : str
String for the bandpass we are normalising under, 2MASS, WISE or anything in synphot by default.
'''
try:
raw_id = self.source_list.index(source)
except:
raise ValueError('Source {} has not been allocated to this scene. Currently allocated sources are: {}'.format(source, ', '.join(self.source_list)))
working_source = self.pandeia_scene[raw_id]
spectrum_wave, spectrum_flux = working_source['spectrum']['sed']['spectrum']
renorm_spec_wave, renorm_spec_flux = normalise_spectrum(spectrum_wave, spectrum_flux, norm_val=norm_val, norm_unit=norm_unit, norm_bandpass=norm_bandpass)
working_source['spectrum']['sed']['spectrum'] = [renorm_spec_wave, renorm_spec_flux]
[docs] def source_magnitude(self, source, filt):
'''
Calculate the magnitude of particular source in a given filter
Parameters
----------
source : str
Name of source
filt : str
Filter to calculate magnitude in
Returns
-------
magnitude : float
Magnitude of object in apparent vegamag
'''
try:
raw_id = self.source_list.index(source)
except:
raise ValueError('Source {} has not been allocated to this scene. Currently allocated sources are: {}'.format(source, ', '.join(self.source_list)))
spectrum_wave, spectrum_flux = source['spectrum']['sed']['spectrum']
magnitude = compute_magnitude(spectrum_wave, spectrum_flux, bandpass)
return magnitude
[docs] def offset_scene(self,x,y):
'''
Offset scene in x, y space in arcseconds
Parameters
----------
x : float
x offset in arcseconds
y : float
y offset in arcseconds
'''
for source in self.pandeia_scene:
source['position']['x_offset'] += x
source['position']['y_offset'] += y
[docs] def rotate_scene(self, theta, center=[0.,0.], direction='counter_clockwise'):
'''
Rotate scene given an angle in degrees
Parameters
----------
theta : float
Angular distance to rotate
center : list
x and y coordinate to act as rotation center
direction : str
Direction to perform rotation, 'clockwise' or 'counter_clockwise'
default is counter clockwise.
'''
if direction == 'counter_clockwise':
#Subtract from 360 to convert to a counter clockwise rotation
theta = 360-theta
elif direction != 'clockwise':
raise ValueError('Invalid direction: {}, options are "clockwise" or "counter_clockwise"')
for source in self.pandeia_scene:
newxy = rotate([source['position']['x_offset'],source['position']['y_offset']],theta,center)
source['position']['x_offset'] = newxy[0]
source['position']['y_offset'] = newxy[1]
[docs] def plot_source_spectra(self, sources='all', title='', newfig=True, xlim=(0.5,30), ylim=(1e-3,None)):
'''
Produce a plot of the spectra of sources within a scene.
Parameters
----------
sources : str / list of strings
List of source names, or 'all' to plot all source spectra
title : str
Title of plot
newfig : bool
Start a new figure, default is True
'''
if newfig:
plt.figure(figsize=(8,5))
ax = plt.subplot(111)
for s in self.pandeia_scene:
if s['pancake_parameters']['name'] in sources or sources == 'all':
ax.plot(s['spectrum']['sed']['spectrum'][0], s['spectrum']['sed']['spectrum'][1], label=s['pancake_parameters']['name'])
ax.set_xscale('log')
ax.set_yscale('log')
ax.set_xlim(xlim)
ax.set_ylim(ylim)
ax.set_title(title,y=1.1,fontsize=14)
ax.tick_params(which='both', direction='in', labelsize=12, axis='both', top=True, right=True)
ax.xaxis.set_ticklabels([], minor=True)
ax.xaxis.set_major_formatter(tck.FormatStrFormatter('%g'))
ax.xaxis.set_minor_locator(tck.FixedLocator([0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30]))
ax.xaxis.set_major_locator(tck.FixedLocator([0.6, 1, 2, 3, 4, 5, 6, 8, 10, 15, 20, 28]))
ax.set_xlabel('Wavelength ($\mu$m)',fontsize=14)
ax.set_ylabel('Spectral Flux Density (mJy)',fontsize=14)
ax.legend(numpoints=1,loc='best')
plt.show()
[docs] def plot_scene(self, title='', newfig=True):
'''
Plot the scene and its sources spatially.
Parameters
----------
title : str
Title of plot
newfig : bool
Start a new figure, default is True
'''
if newfig:
plt.figure(figsize=(5,5))
ax = plt.subplot(111,projection='polar')
for s in self.pandeia_scene:
r, theta = cart_to_polar([s['position']['x_offset'],s['position']['y_offset']])
theta -= 90 # As we use the y-axis as theta=0, not x
ax.plot(np.deg2rad(theta),r,lw=0,marker='o',ms=10,label=s['pancake_parameters']['name'])
ax.set_rmin(0) #Centre the scene at 0,0
ax.set_title(title,y=1.1,fontsize=14)
ax.legend(numpoints=1, loc='best', framealpha=1)
ax.set_theta_offset(np.pi/2)
plt.show()
[docs]def create_SGD(ta_error='none', fsm_error='default', stepsize=20.e-3, pattern_name=None, sim_num=0, instrument='nircam'):
'''
Create small grid dither pointing set. There are two
ways to specify dither patterns:
ta_error - add TA error to each point in the SGD?
stepsize - floating point value for a 3x3 grid.
pattern_name - string name of a pattern corresponding to
one of the named dither patterns in APT.
If you specify pattern_name, then stepsize is ignored.
See https://jwst-docs-stage.stsci.edu/display/JTI/NIRCam+Small-Grid+Dithers
for information on the available dither patterns and their names.
Parameters
----------
ta_error : str / int / float
Target acquisition error
- 'saved' to use a saved random seed of offsets
- 'random' for a random error
- int or float for random error with set amplitude in mas
- 'none' for no error
fsm_error : str
Options of 'none' for no error, or 'default' for 2e-3 mas
stepsize : float
Manual step size for dither
pattern_name : str
Default small grid dither pattern name
sim_num : int
Specific simulation number / draw from a 'saved' ta_error
Returns
-------
sgds : list
list of x, y offsets for each step in the dither pattern.
'''
# Small grid dither patterns
if pattern_name is not None:
pattern_name = pattern_name.upper()
if pattern_name == "5-POINT-BOX":
pointings = [(0, 0),
(0.015, 0.015),
(-0.015, 0.015),
(-0.015, -0.015),
(0.015, -0.015)]
elif pattern_name == "5-POINT-DIAMOND":
pointings = [(0, 0),
(0, 0.02),
(0, -0.02),
(+0.02, 0),
(-0.02, 0)]
elif pattern_name == '9-POINT-CIRCLE':
pointings = [( 0, 0),
( 0, 0.02),
(-0.015, 0.015),
(-0.02, 0),
(-0.015, -0.015),
( 0.000, -0.02),
( 0.015, -0.015),
( 0.020, 0.0),
( 0.015, 0.015)]
elif pattern_name == "3-POINT-BAR":
pointings = [(0, 0),
(0.0, 0.015),
(0.0, -0.015)]
elif pattern_name == "5-POINT-BAR":
pointings = [(0, 0),
(0.0, 0.020),
(0.0, 0.010),
(0.0, -0.010),
(0.0, -0.020)]
elif pattern_name == "SINGLE-POINT":
pointings = [(0, 0)]
elif pattern_name == "5-POINT-SMALL-GRID":
pointings = [( 0, 0),
(-0.010, 0.010),
( 0.010, 0.010),
( 0.010, -0.010),
(-0.010, -0.010)]
elif pattern_name == "9-POINT-SMALL-GRID":
pointings = [( 0, 0),
(-0.010, 0.0),
(-0.010, 0.010),
( 0.0, 0.010),
( 0.010, 0.010),
( 0.010, 0.0),
( 0.010, -0.010),
( 0.0, -0.010),
(-0.010, -0.010)]
else:
raise ValueError("Unknown pattern_name value; check your input matches exactly an allowed SGD pattern in APT.")
else:
steps = [-stepsize,0.,stepsize]
pointings = itertools.product(steps,steps)
if ta_error=='saved':
# Use a ta_error from a "saved" list of draws from a 5mas normal distribution (still randomnly generated, but seed is fixed)
rngx = np.random.RandomState(42)
rngy = np.random.RandomState(2021)
if instrument == 'nircam':
scale = 7e-3
elif instrument == 'miri':
scale = 10e-3
saved_ta_x = rngx.normal(loc=0.,scale=scale,size=100)
saved_ta_y = rngy.normal(loc=0.,scale=scale,size=100)
# saved_ta_x = [1e-3, -9e-3, 1e-3, -9e-3, 1e-3, -9e-3, 1e-3, -9e-3, 1e-3, -9e-3]
# saved_ta_y = [6e-3, 6e-3, 6e-3, 6e-3, 6e-3, 6e-3, 6e-3, 6e-3, 6e-3, 6e-3]
ta_x, ta_y = saved_ta_x[sim_num], saved_ta_y[sim_num]
elif ta_error=='random':
# Simulate the TA error from a 1mas normal distribution
ta_x, ta_y = get_ta_error(instrument, error='default')
elif isinstance(ta_error, (int, float)):
# Simulate the TA error from an X normal distribution (should provide in arcsec)
ta_x, ta_y = get_ta_error(instrument, error=ta_error)
elif ta_error=='none':
ta_x, ta_y = 0., 0.
fsm_error = 'none'
else:
raise ValueError('Target Acquisition (ta_error) string not recognised, options are "random", "saved", or "none", or user specified values.')
sgds = []
for i, (sx, sy) in enumerate(pointings):
if i > 0:
errx, erry = get_fsm_error(error=fsm_error)
offset_x = sx + errx + ta_x
offset_y = sy + erry + ta_y
else:
offset_x = sx + ta_x
offset_y = sy + ta_y
sgds.append([offset_x, offset_y])
return sgds
[docs]def get_ta_error(instrument, error='default'):
'''
7 mas 1-sigma/axis error
Parameters
----------
error : str / float
String of 'default' for 1e-3, or input float
Returns
-------
x,y random error
'''
if error == 'default':
if instrument == 'nircam':
error = 7e-3
elif instrument == 'miri':
error = 7e-3
return np.random.normal(loc=0.,scale=error,size=2)
[docs]def get_fsm_error(error='default'):
'''2mas 1/sigma/axis error from the fine steering mirror
Parameters
----------
error : str / float
String of 'default' for 2e-3, 'none' for 0, or input float
Returns
-------
x,y random error
'''
if error == 'default':
error = 3e-3
elif error == 'none':
error = 0.
return np.random.normal(loc=0.,scale=error,size=2)