from __future__ import absolute_import, print_function
# Just build an actual subclass of the necessary JWST classes
from copy import deepcopy
from glob import glob
import json
import logging
import multiprocessing as mp
import os
import pkg_resources
import sys
import warnings
import astropy.units as units
import astropy.io.fits as fits
import scipy.integrate as integrate
import webbpsf
from poppy import poppy_core
from functools import wraps
if sys.version_info[0] >= 3:
from functools import lru_cache
from io import StringIO
else:
from functools32 import lru_cache
from cStringIO import StringIO
import numpy as np
import pandeia.engine
from pandeia.engine import observation
from pandeia.engine import astro_spectrum as astro
from pandeia.engine import background as bg
from pandeia.engine import coords
from pandeia.engine.config import DefaultConfig
from pandeia.engine.report import Report
from pandeia.engine.scene import Scene
from pandeia.engine.calc_utils import build_empty_scene
from pandeia.engine.custom_exceptions import EngineInputError, EngineOutputError, RangeError, DataError
from pandeia.engine.instrument_factory import InstrumentFactory
from pandeia.engine.strategy import StrategyFactory
from pandeia.engine.pandeia_warnings import signal_warning_messages as warning_messages
from pandeia.engine import debug_utils
from pandeia.engine.psf_library import PSFLibrary
from pandeia.engine.strategy import Coronagraphy
from pandeia.engine.constants import SPECTRAL_MAX_SAMPLES
default_SPECTRAL_MAX_SAMPLES = SPECTRAL_MAX_SAMPLES
from pandeia.engine.etc3D import CalculationConfig, DetectorSignal
PandeiaDetectorSignal = DetectorSignal
from pandeia.engine.signal import DetectorBinning
from .config import EngineConfiguration
cache_maxsize = 256 # Number of monochromatic PSFs stored in an LRU cache
# Should speed up calculations that involve modifying things
# like exposure time and don't actually require calculating new PSFs.
[docs]class CoronagraphyPSFLibrary(PSFLibrary, object):
'''
Subclass of the Pandeia PSFLibrary class, intended to allow PSFs to be generated on-the-fly
via webbpsf rather than using cached PSFs
'''
def __init__(self, wavelength_range, path=None, psf_key='all'):
from .engine import options
self._options = options
self._log("debug", "CUSTOM PSF LIBRARY ACTIVATE!")
if path is None:
if "pandeia_refdata" in os.environ:
tel = 'jwst'
ins = options.current_config['configuration']['instrument']['instrument'].lower()
path = os.path.join(os.environ['pandeia_refdata'], tel, ins, 'psfs')
super(CoronagraphyPSFLibrary, self).__init__(wavelength_range, path, psf_key)
self.latest_on_the_fly_PSF = None
self._cache_path = options.cache_path
if self._cache_path is None:
self._cache_path = os.getcwd()
[docs] def associate_offset_to_source(self, sources, instrument, aperture_name):
'''
Added azimuth information for use with webbpsf. Pandeia currently does not calculate
the PA and assumes azimuthal symmetry resulting in incorrect calculations when using
the bar coronagraph.
'''
psf_offsets = self.get_offsets(instrument, aperture_name)
psf_associations = []
for source in sources:
source_offset_radius = np.sqrt(source.position['x_offset']**2. + source.position['y_offset']**2.)
source_offset_azimuth = 360*(np.pi+np.arctan2(source.position['x_offset'],source.position['y_offset']))/2/np.pi
psf_associations.append((source_offset_radius,source_offset_azimuth))
return psf_associations
[docs] def get_pupil_throughput(self, wave, instrument, aperture_name):
"""
Intended for pandeia 1.2 compatibility.
"""
if hasattr(super(CoronagraphyPSFLibrary, self), "get_pupil_throughput"):
return super(CoronagraphyPSFLibrary, self).get_pupil_throughput(wave, instrument, aperture_name)
ins = CoronagraphyPSFLibrary._get_instrument(instrument, aperture_name)
return CoronagraphyPSFLibrary._pupil_throughput(ins)
[docs] @staticmethod
@lru_cache(maxsize=cache_maxsize)
def get_cached_psf( wave, instrument, aperture_name, oversample=None, source_offset=(0, 0), otf_options=None, full_aperture=None):
from .engine import options
#Make the instrument and determine the mode
ins = CoronagraphyPSFLibrary._get_instrument(instrument, aperture_name, source_offset)
pix_scl = ins.pixelscale
fov_pixels = CoronagraphyPSFLibrary.fov_pixels[aperture_name]
trim_fov_pixels = CoronagraphyPSFLibrary.trim_fov_pixels[aperture_name]
psf_result = CoronagraphyPSFLibrary.calc_psf(ins, wave, source_offset, oversample, pix_scl,
fov_pixels, trim_fov_pixels=trim_fov_pixels)
pupil_throughput = CoronagraphyPSFLibrary._pupil_throughput(ins)
pix_scl = psf_result[0].header['PIXELSCL']
upsamp = psf_result[0].header['OVERSAMP']
diff_limit = psf_result[0].header['DIFFLMT']
psf = psf_result[0].data
if len(psf) == 0:
psf = np.ones((1,1))
psf = {
'int': psf,
'wave': wave,
'pix_sclx': pix_scl,
'pix_scly': pix_scl,
'diff_limit': diff_limit,
'upsamp': upsamp,
'instrument': instrument,
'aperture_name': aperture_name,
'source_offset': source_offset,
'pupil_throughput': pupil_throughput
}
return psf
[docs] def get_psf(self, wave, instrument, aperture_name, oversample=None, source_offset=(0, 0), otf_options=None, full_aperture=None):
cache = self._options.cache
if oversample is None:
oversample = self._options.on_the_fly_oversample
if source_offset[0] > 50.:
ins = CoronagraphyPSFLibrary._get_instrument(instrument, aperture_name, source_offset)
diff_limit = ((((wave*units.micron).to(units.meter).value)/6.5)*units.radian).to(units.arcsec).value
psf = {
'int': np.ones((1,1)),
'wave': wave,
'pix_sclx': ins.pixelscale/oversample,
'pix_scly': ins.pixelscale/oversample,
'diff_limit': diff_limit,
'upsamp': oversample,
'instrument': instrument,
'aperture_name': aperture_name,
'source_offset': source_offset,
'pupil_throughput': self._pupil_throughput(ins)
}
return psf
self._log("info", "Getting {} {} {}... with caching {}".format(instrument, aperture_name, wave, cache))
if cache == 'disk':
psf_name = 'cached_{:.5f}_{}_{}_{:.5f}_{:.5f}_{}.fits'.format(wave, instrument, aperture_name, source_offset[0], source_offset[1], oversample)
if self._have_psf(psf_name):
self._log("info", " Found in cache")
psf_flux, pix_scl, diff_limit, pupil_throughput = self._get_psf(psf_name)
psf = {
'int': psf_flux,
'wave': wave,
'pix_sclx': pix_scl,
'pix_scly': pix_scl,
'diff_limit': diff_limit,
'upsamp': oversample,
'instrument': instrument,
'aperture_name': aperture_name,
'source_offset': source_offset,
'pupil_throughput': pupil_throughput
}
return psf
elif cache == 'ram':
# At this point, splice in the cache wrapper code, since we're testing moving the lru_cache out of the class to see what happens
# Include the on-the-fly override options in the hash key for the lru_cache
if isinstance(self._options.on_the_fly_webbpsf_opd, fits.HDUList):
opd = self._options.on_the_fly_webbpsf_opd[0]
else:
opd = self._options.on_the_fly_webbpsf_opd
otf_options = tuple(sorted(self._options.on_the_fly_webbpsf_options.items()) + [opd,])
# this may be needed in get_psf; extract it so we can avoid
# passing in 'self', which isn't hashable for the cache lookup
full_aperture = self._psfs[0]['aperture_name']
tmp = self.get_cached_psf(wave, instrument, aperture_name, oversample, source_offset, otf_options=otf_options, full_aperture=full_aperture)
self._log("info", " Cache Stats: {}".format(self.get_cached_psf.cache_info()))
return tmp
# Either disk cache miss or no caching
#Make the instrument and determine the mode
ins = CoronagraphyPSFLibrary._get_instrument(instrument, aperture_name, source_offset)
pix_scl = ins.pixelscale
fov_pixels = CoronagraphyPSFLibrary.fov_pixels[aperture_name]
trim_fov_pixels = CoronagraphyPSFLibrary.trim_fov_pixels[aperture_name]
psf_result = self.calc_psf(ins, wave, source_offset, oversample, pix_scl, fov_pixels, trim_fov_pixels=trim_fov_pixels)
pupil_throughput = self._pupil_throughput(ins)
pix_scl_x = psf_result[0].header['PIXELSCL']
pix_scl_y = psf_result[0].header['PIXELSCL']
upsamp = psf_result[0].header['OVERSAMP']
diff_limit = psf_result[0].header['DIFFLMT']
psf = psf_result[0].data
if len(psf) == 0:
psf = np.ones((1,1))
psf = {
'int': psf,
'wave': wave,
'pix_sclx': pix_scl_x,
'pix_scly': pix_scl_y,
'diff_limit': diff_limit,
'upsamp': upsamp,
'instrument': instrument,
'aperture_name': aperture_name,
'source_offset': source_offset,
'pupil_throughput': pupil_throughput
}
if cache == 'disk':
psf_result[0].header['PUPTHR'] = pupil_throughput
psf_result.writeto(os.path.join(self._cache_path, psf_name))
self._log("info", " Created and saved to cache.")
return psf
[docs] def get_pix_scale(self, instrument, aperture_name):
"""
Get PSF pixel scale for given instrument/aperture.
OVERRIDE Pandeia so as to make sure that the pixel scale comes out correctly.
"""
aperture_dict = self.parse_aperture(aperture_name)
upsample = self.get_upsamp(instrument, aperture_name)
pix_scl = aperture_dict[4]/upsample #Return twice for x and y pixel scale
pix_sclx, pix_scly = pix_scl, pix_scl
return pix_sclx, pix_scly
def _have_psf(self, psf_name):
'''
Determine whether a cached PSF exists for a given combination
'''
return os.path.exists(os.path.join(self._cache_path, psf_name))
def _get_psf(self, psf_name):
'''
Return a cached PSF exists for a given combination
'''
with fits.open(os.path.join(self._cache_path, psf_name)) as inf:
pix_scl = inf[0].header['PIXELSCL']
diff_limit = inf[0].header['DIFFLMT']
pupil_throughput = inf[0].header['PUPTHR']
psf = inf[0].data
return psf, pix_scl, diff_limit, pupil_throughput
@staticmethod
def _pupil_throughput(ins):
"""
Determines pupil throughput given a webbpsf instrument object
"""
optsys = ins.get_optical_system()
ote_pupil = optsys[0].amplitude
coron_pupil = optsys[-2].amplitude
pupil_throughput = coron_pupil.sum() / ote_pupil.sum()
return pupil_throughput
@staticmethod
def _get_instrument(instrument, aperture_name, source_offset=None):
from .engine import options as pancake_options
instrument_config = pancake_options.current_config['configuration']['instrument']
scene_config = pancake_options.current_config['scene']
ref_config = pancake_options.current_config['strategy']['psf_subtraction_source']
if source_offset is None:
offset_x = max([x['position']['x_offset'] for x in scene_config] + [ref_config['position']['x_offset']])
offset_y = max([x['position']['y_offset'] for x in scene_config] + [ref_config['position']['y_offset']])
source_offset_radius = np.sqrt(offset_x**2. + offset_y**2.)
source_offset_azimuth = 360*(np.pi+np.arctan2(offset_x, offset_y))/2/np.pi
source_offset = [source_offset_radius, source_offset_azimuth]
if instrument.upper() == 'NIRCAM':
ins = webbpsf.NIRCam()
ins.filter = instrument_config['filter']
if CoronagraphyPSFLibrary.nircam_mode[aperture_name] == 'lw_imaging':
ins.detector = 'NRCA5'
ins.pixelscale = ins._pixelscale_long
elif instrument.upper() == 'MIRI':
ins = webbpsf.MIRI()
ins.filter = instrument_config['filter']
else:
raise ValueError('Only NIRCam and MIRI are supported instruments!')
ins.image_mask = CoronagraphyPSFLibrary.image_mask[aperture_name]
ins.pupil_mask = CoronagraphyPSFLibrary.pupil_mask[aperture_name]
for key in pancake_options.on_the_fly_webbpsf_options:
ins.options[key] = pancake_options.on_the_fly_webbpsf_options[key]
if pancake_options.on_the_fly_webbpsf_opd is not None:
ins.pupilopd = pancake_options.on_the_fly_webbpsf_opd
#get offset
ins.options['source_offset_r'] = source_offset[0]
ins.options['source_offset_theta'] = source_offset[1]
ins.options['output_mode'] = 'oversampled'
ins.options['parity'] = 'odd'
return ins
[docs] @staticmethod
def parse_aperture(aperture_name):
'''
Return [image mask, pupil mask, fov_pixels, trim_fov_pixels, pixelscale]
'''
aperture_keys = ['mask210r','mask335r','mask430r','masklwb','maskswb','fqpm1065','fqpm1140','fqpm1550','lyot2300', 'mask210rsw','mask335rsw','mask430rsw','masklwbsw','maskswbsw', 'mask210rlw','mask335rlw','mask430rlw','masklwblw','maskswblw']
assert aperture_name in aperture_keys, 'Aperture {} not recognized! Must be one of {}'.format(aperture_name, aperture_keys)
nc = webbpsf.NIRCam()
miri = webbpsf.MIRI()
aperture_dict = {
'mask210r' : ['MASK210R','CIRCLYOT', 101, None, nc._pixelscale_short, 'sw_imaging'],
'mask335r' : ['MASK335R','CIRCLYOT', 101, None, nc._pixelscale_long, 'lw_imaging'],
'mask430r' : ['MASK430R','CIRCLYOT', 101, None, nc._pixelscale_long, 'lw_imaging'],
'masklwb' : ['MASKLWB','WEDGELYOT', 351, 101, nc._pixelscale_long, 'lw_imaging'],
'maskswb' : ['MASKSWB','WEDGELYOT', 351, 101, nc._pixelscale_short, 'sw_imaging'],
'fqpm1065' : ['FQPM1065','MASKFQPM', 81, None, miri.pixelscale, 'imaging'],
'fqpm1140' : ['FQPM1140','MASKFQPM', 81, None, miri.pixelscale, 'imaging'],
'fqpm1550' : ['FQPM1550','MASKFQPM', 81, None, miri.pixelscale, 'imaging'],
'lyot2300' : ['LYOT2300','MASKLYOT', 81, None, miri.pixelscale, 'imaging'],
'mask210rsw' : ['MASK210R','CIRCLYOT', 101, None, nc._pixelscale_short, 'sw_imaging'],
'mask335rsw' : ['MASK335R','CIRCLYOT', 101, None, nc._pixelscale_short, 'sw_imaging'],
'mask430rsw' : ['MASK430R','CIRCLYOT', 101, None, nc._pixelscale_short, 'sw_imaging'],
'masklwbsw' : ['MASKLWB','WEDGELYOT', 351, 101, nc._pixelscale_short, 'sw_imaging'],
'maskswbsw' : ['MASKSWB','WEDGELYOT', 351, 101, nc._pixelscale_short, 'sw_imaging'],
'mask210rlw' : ['MASK210R','CIRCLYOT', 101, None, nc._pixelscale_long, 'lw_imaging'],
'mask335rlw' : ['MASK335R','CIRCLYOT', 101, None, nc._pixelscale_long, 'lw_imaging'],
'mask430rlw' : ['MASK430R','CIRCLYOT', 101, None, nc._pixelscale_long, 'lw_imaging'],
'masklwblw' : ['MASKLWB','WEDGELYOT', 351, 101, nc._pixelscale_long, 'lw_imaging'],
'maskswblw' : ['MASKSWB','WEDGELYOT', 351, 101, nc._pixelscale_long, 'lw_imaging'],
}
return aperture_dict[aperture_name]
[docs] @staticmethod
def calc_psf(ins, wave, offset, oversample, pix_scale, fov_pixels, trim_fov_pixels=None):
'''
Following the treatment in pandeia_data/dev/make_psf.py to handle
off-center PSFs for use as a kernel in later convolutions.
'''
# Split out offset
offset_r, offset_theta = offset
# Create an optical system model. This is done because, in order to determine the critical angle, we need this model, and it otherwise
# wouldn't be generated until the PSF itself is generated. In this case, we want to generate the model early because we want to make
# sure that the observation *isn't* over the critical angle *before* generating the PSF
optsys = ins.get_optical_system(fft_oversample=3, detector_oversample=3, fov_arcsec=None, fov_pixels=fov_pixels)
# determine the spatial frequency which is Nyquist sampled by the input pupil.
# convert this to units of cycles per meter and make it not a Quantity
sf = (1./(optsys.planes[0].pixelscale * 2 * units.pixel)).to(1./units.meter).value
critical_angle_arcsec = wave*1.e-6*sf*poppy_core._RADIANStoARCSEC
critical_angle_pixels = int(np.floor(0.5 * critical_angle_arcsec / pix_scale))
if offset_r > 0.:
#roll back to center
dx = int(np.rint( offset_r * np.sin(np.deg2rad(offset_theta)) / pix_scale ))
dy = int(np.rint( offset_r * np.cos(np.deg2rad(offset_theta)) / pix_scale ))
dmax = np.max([np.abs(dx), np.abs(dy)])
psf_result = ins.calc_psf(monochromatic=wave*1e-6, oversample=oversample, fov_pixels=min(critical_angle_pixels, fov_pixels + 2*dmax))
image = psf_result[0].data
image = np.roll(image, dx * oversample, axis=1)
image = np.roll(image, -dy * oversample, axis=0)
image = image[dmax * oversample:(fov_pixels + dmax) * oversample, dmax * oversample:(fov_pixels + dmax) * oversample]
#trim if requested
if trim_fov_pixels is not None:
trim_amount = int(oversample * (fov_pixels - trim_fov_pixels) / 2)
image = image[trim_amount:-trim_amount, trim_amount:-trim_amount]
psf_result[0].data = image
else:
psf_result = ins.calc_psf(monochromatic=wave*1e-6, oversample=oversample, fov_pixels=min(critical_angle_pixels, fov_pixels))
return psf_result
def _log(self, level, message):
"""
A bypass for the inability for Pandeia to do some internal python class serialization if the
class contains a logger
"""
logger = logging.getLogger(__name__)
if not len(logger.handlers):
logger.addHandler(logging.StreamHandler(sys.stderr))
logger.setLevel(logging.WARNING)
if self._options.verbose:
logger.setLevel(logging.DEBUG)
if not hasattr(logger, level.lower()):
print("Logger has no function {}".format(level.lower()))
print("Message is: {}".format(message))
logging_fn = getattr(logger, level.lower())
logging_fn(message)
nircam_mode = {
'mask210r': 'sw_imaging', 'mask335r': 'lw_imaging', 'mask430r': 'lw_imaging',
'masklwb': 'lw_imaging', 'maskswb': 'sw_imaging', 'fqpm1065': 'imaging',
'fqpm1140': 'imaging', 'fqpm1550': 'imaging', 'lyot2300': 'imaging',
'mask210rsw': 'sw_imaging', 'mask335rsw': 'sw_imaging', 'mask430rsw': 'sw_imaging',
'masklwbsw': 'sw_imaging', 'maskswbsw': 'sw_imaging','mask210rlw': 'lw_imaging',
'mask335rlw': 'lw_imaging', 'mask430rlw': 'lw_imaging',
'masklwblw': 'lw_imaging', 'maskswblw': 'lw_imaging',
}
image_mask = {
'mask210r': 'MASK210R', 'mask335r': 'MASK335R', 'mask430r': 'MASK430R',
'masklwb': 'MASKLWB', 'maskswb': 'MASKSWB', 'fqpm1065': 'FQPM1065',
'fqpm1140': 'FQPM1140', 'fqpm1550': 'FQPM1550', 'lyot2300': 'LYOT2300',
'mask210rsw': 'MASK210R', 'mask335rsw': 'MASK335R', 'mask430rsw': 'MASK430R',
'masklwbsw': 'MASKLWB', 'maskswbsw': 'MASKSWB', 'mask210rlw': 'MASK210R',
'mask335rlw': 'MASK335R', 'mask430rlw': 'MASK430R',
'masklwblw': 'MASKLWB', 'maskswblw': 'MASKSWB'
}
pupil_mask = {
'mask210r': 'CIRCLYOT', 'mask335r': 'CIRCLYOT', 'mask430r': 'CIRCLYOT',
'masklwb': 'WEDGELYOT', 'maskswb': 'WEDGELYOT', 'fqpm1065': 'MASKFQPM',
'fqpm1140': 'MASKFQPM', 'fqpm1550': 'MASKFQPM', 'lyot2300': 'MASKLYOT',
'mask210rsw': 'CIRCLYOT', 'mask335rsw': 'CIRCLYOT', 'mask430rsw': 'CIRCLYOT',
'masklwbsw': 'WEDGELYOT', 'maskswbsw': 'WEDGELYOT', 'mask210rlw': 'CIRCLYOT',
'mask335rlw': 'CIRCLYOT', 'mask430rlw': 'CIRCLYOT',
'masklwblw': 'WEDGELYOT', 'maskswblw': 'WEDGELYOT'
}
fov_pixels = {
'mask210r': 101, 'mask335r': 101, 'mask430r': 101, 'masklwb': 351,
'maskswb': 351, 'fqpm1065': 81, 'fqpm1140': 81, 'fqpm1550': 81,
'lyot2300': 81, 'mask210rsw': 101, 'mask335rsw': 101, 'mask430rsw': 101,
'masklwbsw': 351, 'maskswbsw': 351, 'mask210rlw': 101, 'mask335rlw': 101,
'mask430rlw': 101, 'masklwblw': 351, 'maskswblw': 351
}
trim_fov_pixels = {
'mask210r': None, 'mask335r': None, 'mask430r': None, 'masklwb': 101,
'maskswb': 101, 'fqpm1065': None, 'fqpm1140': None, 'fqpm1550': None,
'lyot2300': None,'mask210rsw': None, 'mask335rsw': None, 'mask430rsw': None,
'masklwbsw': 101, 'maskswbsw': 101, 'mask210rlw': None, 'mask335rlw': None,
'mask430rlw': None, 'masklwblw': 101, 'maskswblw': 101
}
[docs]class CoronagraphyConvolvedSceneCube(pandeia.engine.astro_spectrum.ConvolvedSceneCube):
'''
This class overrides the ConvolvedSceneCube class, and instead of using SPECTRAL_MAX_SAMPLES it
looks for a wavelength size that should be present in the 'scene' part of the template
background=None, psf_library=None, webapp=False, empty_scene=False
'''
def __init__(self, scene, instrument, **kwargs):
from .engine import options
self.coronagraphy_options = options
self._options = options
self._log("debug", "CORONAGRAPHY SCENE CUBE ACTIVATE!")
pandeia.engine.astro_spectrum.SPECTRAL_MAX_SAMPLES = self._max_samples
if 'psf_library' in kwargs and not isinstance(kwargs['psf_library'], CoronagraphyPSFLibrary):
self.instrument = instrument
wrange = self.instrument.get_wave_range()
kwargs['psf_library'] = CoronagraphyPSFLibrary(wrange)
super(CoronagraphyConvolvedSceneCube, self).__init__(scene, instrument, **kwargs)
@property
def _max_samples(self):
'''
This is intended to replace a constant with a function. Maybe it works?
'''
if self.coronagraphy_options.wave_sampling is None:
return default_SPECTRAL_MAX_SAMPLES
return self.coronagraphy_options.wave_sampling
def _log(self, level, message):
"""
A bypass for the inability for Pandeia to do some internal python class serialization if the
class contains a logger
"""
logger = logging.getLogger(__name__)
if not len(logger.handlers):
logger.addHandler(logging.StreamHandler(sys.stderr))
logger.setLevel(logging.WARNING)
if self._options.verbose:
logger.setLevel(logging.DEBUG)
logging_fn = getattr(logger, level)
logging_fn(message)
[docs]class CoronagraphyDetectorSignal(CoronagraphyConvolvedSceneCube):
'''
Override the DetectorSignal to avoid odd issues with inheritance. Unfortunately this currently
means copying the functions entirely (with changes to which class is used)
webapp=False, order=None, empty_scene=False
'''
def __init__(self, observation, calc_config=CalculationConfig(), **kwargs):
# Get calculation configuration
self.calculation_config = calc_config
# Link to the passed observation
self.observation = observation
# Load the instrument we're using
self.current_instrument = observation.instrument
# save order to the DetectorSignal instance, for convenience purposes
self.order = None
if 'order' in kwargs:
self.order = kwargs['order']
del kwargs['order']
# and configure the instrument for that order
self.current_instrument.order = self.order
# Get optional arguments
webapp = kwargs.get('webapp', False)
empty_scene = kwargs.get('empty_scene', False)
# Get wavelength range
wrange = self.current_instrument.get_wave_range()
# Add coronagraphy-specific PSF library for on-the-fly PSF generation
kwargs['psf_library'] = CoronagraphyPSFLibrary(wrange)
# how are we projecting the signal onto the detector plane?
self.projection_type = self.current_instrument.projection_type
# If we're in a dispersed mode, we need to know which axis the signal is dispersed along
self.dispersion_axis = self.current_instrument.dispersion_axis()
# Get the detector parameters (read noise, etc.)
self.det_pars = self.current_instrument.get_detector_pars()
# Get the detector parameters (read noise, etc.)
self.the_detector = self.current_instrument.the_detector
# Initialize detector mask
self.det_mask = 1.0
# Get the background
if self.calculation_config.effects['background']:
self.background = bg.Background(self.observation, webapp=webapp)
else:
self.background = None
kwargs['background'] = self.background
# Then initialize the flux and wavelength grid
CoronagraphyConvolvedSceneCube.__init__(
self,
self.observation.scene,
self.current_instrument,
**kwargs
)
self.warnings.update(self.background.warnings)
# We have to propagate the background through the system transmission
# to get the background in e-/s/pixel/micron. The background rate is a 1D spectrum.
self.bg_fp_rate = self.get_bg_fp_rate()
# Initialize slice lists
self.rate_list = []
self.rate_plus_bg_list = []
self.saturation_list = []
self.groups_list = []
self.pixgrid_list = []
# Loop over all slices and calculate the photon and electron rates through the
# observatory for each one. Note that many modes (imaging, etc.) will have just
# a single slice.
for flux_cube, flux_plus_bg in zip(self.flux_cube_list, self.flux_plus_bg_list):
# Rates for the slice without the background
slice_rate = self.all_rates(flux_cube, add_extended_background=False)
# Rates for the slice with the background added
slice_rate_plus_bg = self.all_rates(flux_plus_bg, add_extended_background=True)
# Saturation map for the slice
slice_saturation = self.get_saturation_mask(rate=slice_rate_plus_bg['fp_pix'])
exposure_spec = self.current_instrument.the_detector.exposure_spec
if hasattr(exposure_spec, 'get_groups_before_sat'):
slice_group = exposure_spec.get_groups_before_sat(slice_rate_plus_bg['fp_pix'],
self.det_pars.detector_config['fullwell'])
else:
slice_group = self._groups_before_sat(slice_rate_plus_bg['fp_pix'], self.det_pars.detector_config['fullwell'])
# The grid in the slice
slice_pixgrid = self.get_pix_grid(slice_rate)
# Append all slices to the master lists
self.rate_list.append(slice_rate)
self.rate_plus_bg_list.append(slice_rate_plus_bg)
self.saturation_list.append(slice_saturation)
self.groups_list.append(slice_group)
self.pixgrid_list.append(slice_pixgrid)
# Get the mapping of wavelength to pixels on the detector plane. This is grabbed from the
# first entry in self.rate_list and is currently defined to be the same for all slices.
self.wave_pix = self.get_wave_pix()
# This is also grabbed from the first slice as a diagnostic
self.fp_rate = self.get_fp_rate()
# Note that the 2D image due to background alone may have spatial structure due to instrumental effects.
# Therefore it is calculated here.
self.bg_pix_rate = self.get_bg_pix_rate()
# Check to see if the background is saturating
bgsat = self.get_saturation_mask(rate=self.bg_pix_rate)
if (np.sum(bgsat) > 0) or (np.isnan(np.sum(bgsat))):
key = "background_saturated"
self.warnings[key] = warning_messages[key]
# Reassemble rates of multiple slices on the detector
self.rate = self.on_detector(self.rate_list)
self.rate_plus_bg = self.on_detector(self.rate_plus_bg_list)
exposure_spec = self.current_instrument.the_detector.exposure_spec
if hasattr(exposure_spec, 'get_groups_before_sat'):
self.ngroup_map = exposure_spec.get_groups_before_sat(slice_rate_plus_bg['fp_pix'],
self.det_pars.detector_config['fullwell'])
else:
self.ngroup_map = self._groups_before_sat(slice_rate_plus_bg['fp_pix'], self.det_pars.detector_config['fullwell'])
if hasattr(exposure_spec, 'get_saturation_fraction'):
saturation_fraction = exposure_spec.get_saturation_fraction(self.rate_plus_bg, self.det_pars.detector_config['fullwell'])
else:
saturation_fraction = exposure_spec.saturation_time / (self.det_pars.detector_config['fullwell'] / self.rate_plus_bg)
self.fraction_saturation = np.max(saturation_fraction)
self.detector_pixels = self.current_instrument.get_detector_pixels(self.wave_pix)
self.brightest_pixel = np.max(self.rate_plus_bg)
# Get the read noise correlation matrix and store it as an attribute.
if self.det_pars.detector_config['rn_correlation']:
self.read_noise_correlation_matrix = self.current_instrument.get_readnoise_correlation_matrix(self.rate.shape)
[docs] def wcs_info(self):
"""
Get detector coordinate transform as a dict of WCS keyword/value pairs.
Returns
-------
header: dict
WCS header keys defining coordinate transform in the detector plane
"""
if self.projection_type == 'image':
# if we're in imaging mode, the detector sampling is the same as the model
header = self.grid.wcs_info()
elif self.projection_type in ('spec', 'slitless', 'multiorder'):
# if we're in a dispersed mode, dispersion can be either along the X or Y axis. the image outputs in
# the engine Report are rotated so that dispersion will always appear to be along the X axis with
# wavelength increasing with increasing X (i. e. dispersion angle of 0). currently, the only other
# supported dispersion angle is 90 which is what we get when dispersion_axis == 'y'.
t = self.grid.as_dict()
t.update(self.spectral_detector_transform())
header = {
'ctype1': 'Wavelength',
'crpix1': 1,
'crval1': t['wave_det_min'] - 0.5 * t['wave_det_step'],
'cdelt1': t['wave_det_step'],
'cunit1': 'um',
'cname1': 'Wavelength',
'ctype2': 'Y offset',
'crpix2': 1,
'crval2': t['y_min'] - 0.5 * t['y_step'],
'cdelt2': -t['y_step'],
'cunit2': 'arcsec',
'cname2': 'Detector Offset',
}
if self.dispersion_axis == 'y':
header['ctype2'] = 'X offset'
header['crval2'] = t['x_min'] - 0.5 * t['x_step'],
header['cdelt2'] = t['x_step']
else:
message = "Unsupported projection_type: %s" % self.projection_type
raise EngineOutputError(value=message)
return header
[docs] def get_wave_pix(self):
"""
Return the mapping of wavelengths to pixels on the detector plane
"""
return self.rate_list[0]['wave_pix']
[docs] def get_fp_rate(self):
"""
Return scene flux at the focal plane in e-/s/pixel/micron (excludes background)
"""
return self.rate_list[0]['fp']
[docs] def get_bg_fp_rate(self):
"""
Calculate background in e-/s/pixel/micron at the focal plane. Also correct for any excess in predicted background
if there are pupil losses in the PSF. (#2529)
"""
bg_fp_rate = self.focal_plane_rate(self.ote_rate(self.background.mjy_pix), self.wave)
wave_range = self.current_instrument.get_wave_range()
pupil_thru = self.current_instrument.psf_library.get_pupil_throughput(wave_range['wmin'],
self.current_instrument.instrument[
'instrument'],
self.current_instrument.instrument[
'aperture'])
return bg_fp_rate * pupil_thru
[docs] def get_bg_pix_rate(self):
"""
Calculate the background on the detector in e-/s/pixel
"""
bg_pix_rate = self.rate_plus_bg_list[0]['fp_pix'] - self.rate_list[0]['fp_pix']
return bg_pix_rate
[docs] def on_detector(self, rate_list):
"""
This will take the list of (pixel) rates and use them create a single detector frame. A single
image will only have one rate in the list, but the IFUs will have n_slices. There may be other examples,
such as different spectral orders for NIRISS. It is not yet clear how many different flavors there are, so
this step may get refactored if it gets too complicated. Observing modes that only have one set of rates
(imaging and single-slit spectroscopy, for instance) will still go through this, but the operation is trivial.
"""
aperture_sh = rate_list[0]['fp_pix'].shape
n_apertures = len(rate_list)
detector_shape = (aperture_sh[0] * n_apertures, aperture_sh[1])
detector = np.zeros(detector_shape)
i = 0
for rate in rate_list:
detector[i * aperture_sh[0]:(i + 1) * aperture_sh[0], :] = rate['fp_pix']
i += 1
return detector
[docs] def get_pix_grid(self, rate):
"""
Generate the coordinate grid of the detector plane
"""
if self.projection_type == 'image':
grid = self.grid
elif self.projection_type in ('spec', 'slitless', 'multiorder'):
nw = rate['wave_pix'].shape[0]
if self.dispersion_axis == 'x':
# for slitless calculations, the dispersion axis is longer than the spectrum being dispersed
# because the whole field of view is being dispersed. 'excess' is the size of the FOV
# and half will be to the left of the blue end of the spectrum and half to the right of the red end.
# this is used to create the new spatial coordinate transform for the pixel image on the detector.
excess = rate['fp_pix'].shape[1] - nw
pix_grid = coords.IrregularGrid(
self.grid.col,
(np.arange(nw + excess) - (nw + excess) / 2.0) * self.grid.xsamp
)
else:
excess = rate['fp_pix'].shape[0] - nw
pix_grid = coords.IrregularGrid(
(np.arange(nw + excess) - (nw + excess) / 2.0) * self.grid.ysamp,
self.grid.row
)
return pix_grid
else:
raise EngineOutputError(value="Unsupported projection_type: %s" % self.projection_type)
return grid
[docs] def all_rates(self, flux, add_extended_background=False):
"""
Calculate rates in e-/s/pixel/micron or e-/s/pixel given a flux cube in mJy
Parameters
----------
flux: ConvolvedSceneCube instance
Convolved source flux cube with flux units in mJy
add_extended_background: bool (default=False)
Toggle for including extended background not contained within the flux cube
Returns
-------
products: dict
Dict of products produced by rate calculation.
'wave_pix' - Mapping of wavelength to detector pixels
'ote' - Source rate at the telescope aperture
'fp' - Source rate at the focal plane in e-/s/pixel/micron
'fp_pix' - Source rate per pixel
'fp_pix_no_ipc' - Source rate per pixel excluding effects if inter-pixel capacitance
"""
# The source rate at the telescope aperture
ote_rate = self.ote_rate(flux)
# The source rate at the focal plane in interacting photons/s/pixel/micron
fp_rate = self.focal_plane_rate(ote_rate, self.wave)
# the fp_pix_variance is the variance of the per-pixel electron rate and includes the chromatic effects
# of quantum yield.
if self.projection_type == 'image':
# The wavelength-integrated rate in e-/s/pixel, relevant for imagers
fp_pix_rate, fp_pix_variance = self.image_rate(fp_rate)
wave_pix = self.wave_eff(fp_rate)
elif self.projection_type == 'spec':
# The wavelength-integrated rate in e-/s/pixel, relevant for spectroscopy
wave_pix, fp_pix_rate, fp_pix_variance = self.spec_rate(fp_rate)
elif self.projection_type in ('slitless', 'multiorder'):
# The wavelength-integrated rate in e-/s/pixel, relevant for slitless spectroscopy
wave_pix, fp_pix_rate, fp_pix_variance = self.slitless_rate(
fp_rate,
add_extended_background=add_extended_background
)
else:
raise EngineOutputError(value="Unsupported projection_type: %s" % self.projection_type)
# Include IPC effects, if available and requested
if self.det_pars.detector_config['ipc'] and self.calculation_config.effects['ipc']:
kernel = self.current_instrument.get_ipc_kernel()
fp_pix_rate_ipc = self.ipc_convolve(fp_pix_rate, kernel)
else:
fp_pix_rate_ipc = fp_pix_rate
# fp_pix is the final product. Since there is no reason to
# carry around the ipc label everywhere, we rename it here.
products = {
'wave_pix': wave_pix,
'ote': ote_rate,
'fp': fp_rate,
'fp_pix': fp_pix_rate_ipc,
'fp_pix_no_ipc': fp_pix_rate, # this is for calculating saturation
'fp_pix_variance': fp_pix_variance # this is for calculating the detector noise
}
return products
[docs] def ote_rate(self, flux):
"""
Calculate source rate in e-/s/pixel/micron at the telescope entrance aperture given
a flux cube in mJy/pixel.
"""
# spectrum in mJy/pixel, wave in micron, f_lambda in photons/cm^2/s/micron
f_lambda = 1.5091905 * (flux / self.wave)
ote_int = self.current_instrument.telescope.get_ote_eff(self.wave)
coll_area = self.current_instrument.telescope.coll_area
a_lambda = coll_area * ote_int
# e-/s/pixel/micron
ote_rate = f_lambda * a_lambda
return ote_rate
[docs] def focal_plane_rate(self, rate, wave):
"""
Takes the output from self.ote_rate() and multiplies it by the components
of efficiency within the system and returns the source rate at the focal plane in
e-/s/pixel/micron.
"""
filter_eff = self.current_instrument.get_filter_eff(wave)
disperser_eff = self.current_instrument.get_disperser_eff(wave)
internal_eff = self.current_instrument.get_internal_eff(wave)
qe = self.current_instrument.get_detector_qe(wave)
fp_rate = rate * filter_eff * disperser_eff * internal_eff * qe
return fp_rate
[docs] def spec_rate(self, rate):
'''
For slitted spectrographs, calculate the detector signal by integrating
along the dispersion direction of the cube (which is masked by a, by assumption,
narrow slit). For slitless systems or slits wider than the PSF, the slitless_rate
method should be used to preserve spatial information within the slit.
Parameters
---------
rate: numpy.ndarray
Rate of photons interacting with detector as a function of model wavelength set
Returns
-------
products: 3-element tuple of numpy.ndarrays
first element - map of pixel to wavelength
second element - electron rate per pixel
third element - variance of electron rate per pixel
'''
dispersion = self.current_instrument.get_dispersion(self.wave)
wave_pix = self.current_instrument.get_wave_pix()
wave_pix_trunc = wave_pix[np.where(np.logical_and(wave_pix >= self.wave.min(),
wave_pix <= self.wave.max()))]
# Check that the source spectrum is actually inside the instrumental wavelength
# coverage.
if len(wave_pix_trunc) == 0:
raise RangeError(value='wave and wave_pix do not overlap')
# Check the dispersion axis to determine which axis to sum and interpolate over
if self.dispersion_axis == 'x':
axis = 1
else:
axis = 0
# We can simply sum over the dispersion direction. This is where we lose the spatial information within the aperture.
spec_rate = np.sum(rate, axis=axis)
# And then scale to the dispersion function (pixel/micron) to transform
# from e-/s/micron to e-/s/pixel.
spec_rate_pix = spec_rate * dispersion
# but we are still sampled on the internal grid, so we have to interpolate to the pixel grid.
# use kind='slinear' since it's ~2x more memory efficient than 'linear'. 'slinear' uses different code path to
# calculate the slopes.
int_spec_rate = sci_int.interp1d(self.wave, spec_rate_pix, axis=axis, kind='slinear', assume_sorted=True,
copy=False)
spec_rate_pix_sampled = int_spec_rate(wave_pix_trunc)
# Handle a detector gap here by constructing a mask. If the current_instrument implements it,
# it'll be a real mask array. Otherwise it will simply be 1.0.
self.det_mask = self.current_instrument.create_gap_mask(wave_pix_trunc)
# this is the interacting photon rate in the detector with mask applied.
spec_rate_pix_sampled *= self.det_mask
# Add effects of non-unity quantum yields. For the spec projection, we assume that the quantum yield does not
# change over a spectral element. Then we can just multiply the products by the relevant factors.
q_yield, fano_factor = self.current_instrument.get_quantum_yield(wave_pix_trunc)
# convert the photon rate to electron rate by multiplying by the quantum yield which is a function of wavelength
spec_electron_rate_pix = spec_rate_pix_sampled * q_yield
# to meet IDT expectations, some instruments require a possibly chromatic fudge factor to be applied
# to the per-pixel electron rate variance.
var_fudge = self.current_instrument.get_variance_fudge(wave_pix_trunc)
# the variance in the electron rate, Ve, is also scaled by the quantum yield plus a fano factor which is
# analytic in the simple 1 or 2 electron case: Ve = (qy + fano) * Re. since Re is the photon rate
# scaled by the quantum yield, Re = qy * Rp, we get: Ve = qy * (qy + fano) * Rp
spec_electron_variance_pix = spec_rate_pix_sampled * q_yield * (q_yield + fano_factor) * var_fudge
products = wave_pix_trunc, spec_electron_rate_pix, spec_electron_variance_pix
# bin the instrument wavelength grid in pixels
if self.dispersion_axis == 'x':
bin_x = self.current_instrument.get_dispersion_binning()
bin_y = self.current_instrument.get_spatial_binning()
else:
bin_x = self.current_instrument.get_spatial_binning()
bin_y = self.current_instrument.get_dispersion_binning()
self.binning = DetectorBinning(spec_electron_rate_pix, bin_x, bin_y, self.grid, dispersion_axis=self.dispersion_axis, wave=wave_pix_trunc)
return products
[docs] def image_rate(self, rate):
'''
Calculate the electron rate for imaging modes by integrating along
the wavelength direction of the cube.
Parameters
---------
rate: numpy.ndarray
Rate of photons interacting with detector as a function of model wavelength set
Returns
-------
products: 2-element tuple of numpy.ndarrays
first element - electron rate per pixel
second element - variance of electron rate per pixel
'''
q_yield, fano_factor = self.current_instrument.get_quantum_yield(self.wave)
# convert the photon rate to electron rate by multiplying by the quantum yield which is a function of wavelength
electron_rate_pix = integrate.simps(rate * q_yield, self.wave)
# to meet IDT expectations, some instruments require a possibly chromatic fudge factor to be applied
# to the per-pixel electron rate variance.
var_fudge = self.current_instrument.get_variance_fudge(self.wave)
# the variance in the electron rate, Ve, is also scaled by the quantum yield plus a fano factor which is
# analytic in the simple 1 or 2 electron case: Ve = (qy + fano) * Re. since Re is the photon rate
# scaled by the quantum yield, Re = qy * Rp, we get: Ve = qy * (qy + fano) * Rp
electron_variance_pix = integrate.simps(rate * q_yield * (q_yield + fano_factor) * var_fudge, self.wave)
# bin the instrument wavelength grid in pixels
bin_x = self.current_instrument.get_spatial_binning()
bin_y = self.current_instrument.get_spatial_binning()
self.binning = DetectorBinning(electron_rate_pix, bin_x, bin_y, self.grid)
products = electron_rate_pix, electron_variance_pix
return products
[docs] def slitless_rate(self, rate, add_extended_background=True):
'''
Calculate the detector rates for slitless modes. Here we retain all spatial information and build
up the detector plane by shifting and coadding the frames from the convolved flux cube. Also need to handle
and add background that comes from outside the flux cube, but needs to be accounted for.
Parameters
----------
rate: 3D numpy.ndarray
Cube containing the flux rate at the focal plane
add_extended_background: bool (default: True)
Toggle for including extended background not contained within the flux cube
Returns
-------
products: 2 entry tuple
wave_pix: 1D numpy.ndarray containing wavelength to pixel mapping on the detector plane
spec_rate: 2D numpy.ndarray of detector count rates
'''
wave_pix = self.current_instrument.get_wave_pix()
wave_subs = np.where(
np.logical_and(
wave_pix >= self.wave.min(),
wave_pix <= self.wave.max()
)
)
wave_pix_trunc = wave_pix[wave_subs]
if len(wave_pix_trunc) == 0:
raise RangeError(value='wave and wave_pix do not overlap')
dispersion = self.current_instrument.get_dispersion(wave_pix_trunc)
trace = self.current_instrument.get_trace(wave_pix_trunc)
q_yield, fano_factor = self.current_instrument.get_quantum_yield(wave_pix_trunc)
# if we kind='slinear' since it's ~2x more memory efficient than 'linear'. 'slinear' uses different code
# path to calculate the slopes. However, slinear is *much* slower, so it is a tradeoff. Also lowering the
# rate type to float32 to conserve memory.
int_rate_pix = sci_int.interp1d(self.wave, rate.astype(np.float32, casting='same_kind'),
kind='linear', axis=2, assume_sorted=True, copy=False)
rate_pix = int_rate_pix(wave_pix_trunc)
# convert the photon rate to electron rate by multiplying by the quantum yield which is a function of wavelength
electron_rate_pix = rate_pix * q_yield
# to meet IDT expectations, some instruments require a possibly chromatic fudge factor to be applied
# to the per-pixel electron rate variance.
var_fudge = self.current_instrument.get_variance_fudge(wave_pix_trunc)
# the variance in the electron rate, Ve, is also scaled by the quantum yield plus a fano factor which is
# analytic in the simple 1 or 2 electron case: Ve = (qy + fano) * Re. since Re is the photon rate
# scaled by the quantum yield, Re = qy * Rp, we get: Ve = qy * (qy + fano) * Rp
electron_variance_pix = rate_pix * q_yield * (q_yield + fano_factor) * var_fudge
# interpolate the background onto the pixel spacing
int_bg_fp_rate = sci_int.interp1d(self.wave, self.bg_fp_rate.astype(np.float32, casting='same_kind'),
kind='linear', assume_sorted=True, copy=False)
bg_fp_rate_pix = int_bg_fp_rate(wave_pix_trunc)
# calculate electron rate and variance due to background
bg_electron_rate = bg_fp_rate_pix * q_yield
bg_electron_variance = bg_fp_rate_pix * q_yield * (q_yield + fano_factor) * var_fudge
# The first part of this code is meant to add the PSF images from the convolved scene cube along either the x
# or y axis depending on the dispersion axis, optionally following the path of a spectral trace (currently used
# only for SOSS mode). The psfs will be added to all locations within the resolution element.
#
# Because, in slitless modes, the disperser is dispersing light coming in from everywhere in the pupil plane,
# every part of the detector should have a contribution from every wavelength of light (from both orders, for
# SOSS mode). The add_extended_background statement does that - it fills every pixel up to i, and after
# i+rate_pix.shape[1], with the same background that comes baked into the rate_pix images thanks to the
# AdvancedPSF functions that create the convolved scene cube.
if self.empty_scene:
# if we have an explicitly empty scene, we're doing a background-only order calculation and don't need to
# even pretend to disperse the spectrum - CombinedSignal will handle padding it to match the interesting
# order(s) and this way there will be no need to trim.
spec_shape = (rate_pix.shape[0], rate_pix.shape[1])
spec_rate = np.zeros(spec_shape)
spec_variance = np.zeros(spec_shape)
if add_extended_background:
for i in np.arange(dispersion.shape[0]):
spec_rate += bg_electron_rate[i] * dispersion[i]
spec_variance += bg_electron_variance[i] * dispersion[i]
else: # if the scene is data
# dispersion_axis tells us whether we need to sum the planes of the cube horizontally
# or vertically on the detector plane.
if self.dispersion_axis == 'x':
spec_shape = (rate_pix.shape[0], rate_pix.shape[2] + rate_pix.shape[1])
spec_rate = np.zeros(spec_shape)
spec_variance = np.zeros(spec_shape)
for i in np.arange(dispersion.shape[0]):
# Background not yet completely added. Make sure there is a trace shift to be done so that we
# don't make an expensive call to shift() if we don't have to. Use mode='nearest' to fill in new
# pixels with background when image is shifted.
if trace[i] != 0.0:
spec_rate[:, i:i + rate_pix.shape[1]] += shift(
electron_rate_pix[:, :, i],
shift=(trace[i], 0),
mode='nearest',
order=1
) * dispersion[i]
spec_variance[:, i:i + rate_pix.shape[1]] += shift(
electron_variance_pix[:, :, i],
shift=(trace[i], 0),
mode='nearest',
order=1
) * dispersion[i]
else:
spec_rate[:, i:i + rate_pix.shape[1]] += electron_rate_pix[:, :, i] * dispersion[i]
spec_variance[:, i:i + rate_pix.shape[1]] += electron_variance_pix[:, :, i] * dispersion[i]
# Adding background to all other pixels, unless we are asked not to.
if add_extended_background:
spec_rate[:, :i] += bg_electron_rate[i] * dispersion[i]
spec_rate[:, i + rate_pix.shape[1]:] += bg_electron_rate[i] * dispersion[i]
spec_variance[:, :i] += bg_electron_variance[i] * dispersion[i]
spec_variance[:, i + rate_pix.shape[1]:] += bg_electron_variance[i] * dispersion[i]
else: # if the dispersion is on the y axis
spec_shape = (rate_pix.shape[2] + rate_pix.shape[0], rate_pix.shape[1])
spec_rate = np.zeros(spec_shape)
spec_variance = np.zeros(spec_shape)
for i in np.arange(dispersion.shape[0]):
# Background not yet completely added. Make sure there is a trace shift to be done so that we
# don't make an expensive call to shift() if we don't have to. Use mode='nearest' to fill in new
# pixels with background when image is shifted.
if trace[i] != 0.0:
spec_rate[i:i + rate_pix.shape[0], :] += shift(
electron_rate_pix[:, :, i],
shift=(0, trace[i]),
mode='nearest',
order=1
) * dispersion[i]
spec_variance[i:i + rate_pix.shape[0], :] += shift(
electron_variance_pix[:, :, i],
shift=(0, trace[i]),
mode='nearest',
order=1
) * dispersion[i]
else:
spec_rate[i:i + rate_pix.shape[0], :] += electron_rate_pix[:, :, i] * dispersion[i]
spec_variance[i:i + rate_pix.shape[0], :] += electron_variance_pix[:, :, i] * dispersion[i]
# Adding background to all other pixels, unless we are asked not to.
if add_extended_background:
spec_rate[:i, :] += bg_electron_rate[i] * dispersion[i]
spec_rate[i + rate_pix.shape[0]:, :] += bg_electron_rate[i] * dispersion[i]
spec_variance[:i, :] += bg_electron_variance[i] * dispersion[i]
spec_variance[i + rate_pix.shape[0]:, :] += bg_electron_variance[i] * dispersion[i]
# bin the instrument wavelength grid in pixels
if self.dispersion_axis == 'x':
bin_x = self.current_instrument.get_dispersion_binning()
bin_y = self.current_instrument.get_spatial_binning()
else:
bin_x = self.current_instrument.get_spatial_binning()
bin_y = self.current_instrument.get_dispersion_binning()
self.binning = DetectorBinning(spec_rate, bin_x, bin_y, self.grid, dispersion_axis=self.dispersion_axis, wave=wave_pix_trunc, projection_type=self.projection_type)
# dispersion_axis determines whether wavelength is the first or second axis
if self.dispersion_axis == 'x' or self.projection_type == 'multiorder':
products = wave_pix_trunc, spec_rate, spec_variance
else:
# if dispersion is along Y, wavelength increases bottom to top, but Y index increases top to bottom.
# flip the Y axis to account for this.
products = wave_pix_trunc, np.flipud(spec_rate), np.flipud(spec_variance)
return products
[docs] def wave_eff(self, rate):
rate_tot = np.nansum(rate, axis=0)
a = np.sum(rate_tot * self.wave)
b = np.sum(rate_tot)
if b > 0.0:
wave_eff = a / b
else:
wave_eff = self.wave.mean()
wave_eff_arr = np.array([wave_eff])
return wave_eff_arr
[docs] def get_projection_type(self):
return self.projection_type
[docs] def ipc_convolve(self, rate, kernel):
fp_pix_ipc = sg.fftconvolve(rate, kernel, mode='same')
debug_utils.debugarrays.store('etc3D', 'ipc_convolve',
{
'rate': rate,
'kernel': kernel,
'fp_pix_ipc': fp_pix_ipc,
'description': 'This is just a short, unnecessary description.'
})
return fp_pix_ipc
[docs] def get_saturation_mask(self, rate=None):
"""
Compute a numpy array indicating pixels with full saturation (2), partial saturation (1) and no saturation (0).
Parameters
----------
rate: None or 2D np.ndarray
Detector plane rate image used to build saturation map from
Returns
-------
mask: 2D np.ndarray
Saturation mask image
"""
if rate is None:
rate = self.rate_plus_bg
saturation_mask = np.zeros(rate.shape)
if self.calculation_config.effects['saturation']:
fullwell = self.det_pars.detector_config['fullwell']
exp_pars = self.current_instrument.the_detector
unsat_ngroups = exp_pars.get_unsaturated(rate, fullwell)
ngroup = exp_pars.exposure_spec.ngroup
saturation_mask[(unsat_ngroups < ngroup)] = 1
saturation_mask[(unsat_ngroups < 2)] = 2
return saturation_mask
def _groups_before_sat(self, slope, fullwell):
"""
Fix for Pandeia 1.2/1.3, since exposure spec doesn't have this in 1.2
"""
exposure_spec = self.current_instrument.the_detector.exposure_spec
tfffr = exposure_spec.tfffr
nframe = exposure_spec.nframe
tframe = exposure_spec.tframe
ndrop2 = exposure_spec.ndrop2
ndrop1 = exposure_spec.ndrop1
time_to_saturation = self.det_pars.detector_config['fullwell'] / slope.clip(1e-10, np.max(slope))
if self.det_pars.detector_config['det_type'] == 'sias':
groups_before_sat = (time_to_saturation - tfffr) / (nframe * tframe)
elif self.det_pars.detector_config['det_type'] == 'h2rg':
groups_before_sat = (((time_to_saturation - tfffr) / tframe) - nframe - ndrop1) / (nframe + ndrop2 + 1.)
else:
raise ValueError("Unknown detector type {}".format(exposure_spec.det_type))
slice_group = np.floor(groups_before_sat)
return slice_group
[docs]class SeparateTargetReferenceCoronagraphy(Coronagraphy):
'''
This class is intended to override the cronography behaviour of requiring that the reference
source be included in the same calculation template as the observation source.
'''
def _create_weight_matrix(self, my_detector_signal_list, my_detector_noise_list):
"""
This private method creates the weight matrix, a_ij, used for the strategy sum. It gets overridden
in each strategy. In this case, it applies all weight to the first (and only) target. As such, it
deliberately uses the weight matrix creation from the ImagingApPhot class
"""
if len(my_detector_signal_list) > 1:
message = 'This Strategy Configuration is intended for separate Target, Reference, and Unocculted Source observations, so only one signal is supported.'
raise UnsupportedError(value=message)
my_detector_signal = my_detector_signal_list[0]
aperture = self.aperture_size
annulus = self.sky_annulus
# pass target_xy to Signal.grid.dist() to offset the target position
dist = my_detector_signal.grid.dist(xcen=self.target_xy[0], ycen=self.target_xy[1])
# sky_subs only takes into account whole pixels which is sufficient for the sky estimation
# region and for the sanity checking we need to do. however, we need to be more exact for the source extraction
# region. photutils.geometry provides routines to do this either via subsampling or exact geometric
# calculation. the exact method is slower, but for the sizes of regions we deal with in the ETC it is not noticeable.
sky_subs = np.where((dist > annulus[0]) & (dist <= annulus[1]))
n_sky = len(sky_subs[0])
# generate the source extraction region mask.
src_region = my_detector_signal.grid.circular_mask(
aperture,
xoff=self.target_xy[0],
yoff=self.target_xy[1],
use_exact=self.use_exact,
subsampling=self.subsampling
)
# the src_region mask values are the fraction of the pixel subtended by the aperture so
# in the range 0.0 to 1.0 inclusive. the effective number of pixels in the aperture is
# then the sum of this mask.
n_aper = src_region.sum()
# do some more sanity checks to make sure the target and background regions are configured as expected
self._check_circular_aperture_limits(src_region, sky_subs, my_detector_signal.grid, aperture, annulus)
weight_matrix = np.matrix(src_region)
if self.background_subtraction:
weight_matrix[sky_subs] = -1. * n_aper / n_sky
# The method also returns a list of 'products': subscripts of the weight matrix that is non-zero.
# This can also be a list if the strategy returns more than one product (such a spectrum over a
# number of wavelengths).
product_subscript = weight_matrix.nonzero()
# The subscripts returned from a matrix contain a redundant dimension. This removes it.
# Note that this is not how matrix indexing is formally constructed, but it enforces a rule
# that product subscripts should always be tuples or regular ndarrays.
product_subscript = (np.array(product_subscript[0]).flatten(), np.array(product_subscript[1]).flatten())
return weight_matrix, [product_subscript]