import pathlib
import copy
import uuid
import re
import simplejson
import h5py
import numpy as np
from snappl.provenance import Provenance
from snappl.diaobject import DiaObject
from snappl.image import Image
from snappl.pathedobject import PathedObject
from snappl.logger import SNLogger
from snappl.utils import asUUID, SNPITJsonEncoder
from snappl.dbclient import SNPITDBClient
[docs]
class Spectrum1d( PathedObject ):
"""A class to store and save single-epoch 1d transient spectra.
Spectrum1d schema are defined here:
https://github.com/Roman-Supernova-PIT/Roman-Supernova-PIT/wiki/spectrum_1d
Properties of a Spectrum1d object include:
* filepath : pathlib.Path ; path *relative to the base path* of the spectrum1d file
* full_filepath : pathlib.Path ; absolute path on the system to the spectrum1d file
* base_path : base path for lightcurves; usually will be Config value system.paths.lightcurves
* base_dir : synonym for base_path
* data_dict : the full dict described the schema wiki page linked above
* meta : data_dict['meta']
* combined: data_dict['combined']
* combined_meta: data_dict['combined']['meta']
* combined_data: data_dict['combined']['data']
* individual: data_dict['indivdual']
* id : UUID, the id of the spectrum
* provenance_id : UUID, the id of the spectrum's provenance
* diaobject_id : UUID, the id of the object for which this is a spectrum
* diaobject_position_id : UUID or None, the id of the object's improved position if any
* band : str, the band
* mjd_start : float, the MJD of the earliest component image
* mjd_end : float, the MJD + exposure time (in days) of the latest component image
* epoch : integer, the average MJD in millidays (i.e. MJD * 1000) of the comonent image MJDs
* images : list of Image, the component images
"""
_base_path_config_item = 'system.paths.spectra1d'
def __init__( self,
id=None,
data_dict=None,
provenance=None,
diaobject=None,
diaobject_position=None,
band=None,
mjd_start=None,
mjd_end=None,
epoch=None,
no_database=False,
dbclient=None,
filepath=None,
base_dir=None,
base_path=None,
full_filepath=None,
no_base_path=False,
):
"""Instantiate a Spectrum1d
Parameters
----------
id : UUID or str or NOne
ID of this lightcurve. If None, one will be generated, and
thereafter aavilable in the id property.
data_dict : dict
Must follow the format on
https://github.com/Roman-Supernova-PIT/Roman-Supernova-PIT/wiki/spectrum_1d
You must give one of data_dict or filepath; it is bad form
to specify both.
filepath : Path or str, default None
File path to find the lightcurve, realtive to base dir. You
must specify either data_dict or filepath; it is bad form to
specify both.
base_dir: Path or str, default None
Base directory that filepath is relative to. If None (which
is what you want if you're writing things to the database),
will use the config value of "system.paths.spectra1d".
provenance: Provenance or UUID or str or None
The provenance of this lightcurve. You may also set
data_dict['meta']['provenance_id'] to the UUID of the
provenance instead of passing it here.
diaobject: DiaObject or UUID or str or None
The DiaObject this is a spectrum for. You may also set
data_dict['meta']['diaobject_id'] to the UUID of the
diaboject instead of passing it here.
diaobject_position_id: dict or UUID or str or None
Either the improved position as returned form
DiaObject.get_position(), or the value of the id from the
dictionary returned by that call. You may also set data_dict['meta']['diaobject_position_id']
"""
super().__init__( filepath=filepath, base_path=base_path, base_dir=base_dir,
full_filepath=full_filepath, no_base_path=no_base_path )
if ( data_dict is None ) and ( self._filepath is None ):
raise ValueError( "Must specify either data_dict or filepath" )
if ( data_dict is not None ) and ( self._filepath is not None ):
SNLogger.warning( "Specifying both data_dict and filepath is bad form." )
if ( id is None ) and ( self._filepath is not None ):
match = re.search( r'([0-9a-f])/([0-9a-f])/([0-9a-f])/'
r'([0-9a-f]{8}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{12}).1dspec',
str(self._filepath) )
if match is None:
SNLogger.warning( "Could not parse filepath to find spectrum1d id, assigning a new one." )
else:
if any( [ match.group(1) != match.group(4)[0],
match.group(2) != match.group(4)[1],
match.group(3) != match.group(4)[2] ] ):
SNLogger.warning( "filepath didn't have consistent directory and filename, cannot parse "
"spectrum1d id from it, assigning a new one" )
else:
self.id = match.group(4)
self.id = asUUID( id ) if id is not None else uuid.uuid4()
self.provenance_id = ( provenance.id if isinstance( provenance, Provenance )
else asUUID( provenance, oknone=True ) )
self.diaobject_id = ( diaobject.id if isinstance( diaobject, DiaObject )
else asUUID( diaobject, oknone=True ) )
self.diaobject_position_id = ( asUUID(diaobject_position['id']) if isinstance( diaobject_position, dict )
else asUUID( diaobject_position, oknone=True ) )
self.no_database = no_database
self._band = None
self._mjd_start = None
self._mjd_end = None
self._epoch = None
self._images = None
if data_dict is None:
self._data_dict = None
else:
self._set_data_dict( data_dict, dbclient=dbclient )
@property
def band( self ):
if self._band is None:
self._fill_props()
return self._band
@property
def mjd_start( self ):
if self._mjd_start is None:
self._fill_props()
return self._mjd_start
@property
def mjd_end( self ):
if self._mjd_end is None:
self._fill_props()
return self._mjd_end
@property
def epoch( self ):
if self._epoch is None:
self._fill_props()
return self.epoch
@property
def images( self ):
if self._images is None:
self._fill_props()
return self.images
def _fill_props( self, dbclient=None ):
"""Fills self.images, self.band, self.mjd_start, self.mjd_end, and self.epoch based on data_dict."""
imageids = set( str(i['meta']['image_id']) for i in self.individual )
if ( self._images is None ) or ( set( str(i.id) for i in self._images ) != imageids ):
# Have to reload images:
dbclient = SNPITDBClient.get() if dbclient is None else dbclient
self._images = []
for imid in imageids:
try:
image = Image.get_image( imid, dbclient=dbclient )
if not isinstance( image, Image ):
raise TypeError( "Didn't get an Image back from Image.get_image; this should not happen." )
except Exception as ex:
SNLogger.error( f"Spectrum1d.save_to_db failed to get image {imid} from the database:\n{ex}" )
raise
self._images.append( image )
self._images.sort( key=lambda x: x.mjd )
self._mjd_start = self._images[0].mjd
self._mjd_end = self._images[-1].mjd + self._images[-1].exptime / 3600. / 24.
self._epoch = int( np.floor( sum([ i.mjd for i in self._images ]) / len(self._images) * 1000 + 0.5 ) )
if any( i.band != self._images[0].band for i in self._images ):
raise ValueError( "Images have inconsistent bands!" )
self._band = self._images[0].band
@property
def data_dict( self ):
if self._data_dict is None:
if self._filepath is None:
raise RuntimeError( "Can't find the data" )
self.read_data()
return self._data_dict
@data_dict.setter
def data_dict( self, val ):
self._data_dict = val
@property
def meta( self ):
return self.data_dict['meta']
@property
def combined( self ):
return self.data_dict['combined']
@property
def combined_meta( self ):
return self.data_dict['combined']['meta']
@property
def combined_data( self ):
return self.data_dict['combined']['data']
@property
def individual( self ):
return self.data_dict['individual']
[docs]
def generate_filepath( self, filetype='hdf5' ):
suffixdict = { 'hdf5': 'hdf5' }
if filetype not in suffixdict:
raise ValueError( f"Unknown filetype {filetype}" )
subdir = str(self.id)[0:3]
basename = f'{self.provenance_id}/{subdir[0]}/{subdir[1]}/{subdir[2]}/{self.id}'
self._filepath = pathlib.Path( f'{basename}_1dspec.{suffixdict[filetype]}' )
def _set_data_dict( self, data_dict, provenance=None, diaobject=None, diaobject_position=None, dbclient=None ):
"""Verifies and sets the data dict. Makes a copy, so will not mung the passed object."""
provenance = provenance.id if isinstance( provenance, Provenance ) else asUUID( provenance, oknone=True )
diaobject = diaobject.id if isinstance( diaobject, DiaObject) else asUUID( diaobject, oknone=True )
diaobject_position = ( diaobject_position['id'] if isinstance( diaobject_position, dict )
else asUUID( diaobject_position, oknone=True ) )
provenance = self.provenance_id if provenance is None else provenance
diaobject = self.diaobject_id if diaobject is None else None
diaobject_position = self.diaobject_position_id if diaobject_position is None else None
data_dict = copy.deepcopy( data_dict )
# Basic type checking
if not isinstance( data_dict, dict ):
raise TypeError( f"data_dict must be a dict, not a {type(data_dict)}" )
if set( data_dict.keys() ) != { 'meta', 'combined', 'individual' }:
raise ValueError( "data_dict must have keys 'meta', 'combined', and 'individual'" )
if not isinstance( data_dict['meta'], dict ):
raise TypeError( f"data_dict['meta'] must be a dict, not a {type(data_dict['meta'])}" )
if not isinstance( data_dict['combined'], dict ):
raise TypeError( f"data_dict['combined'] must be a dict, not a {type(data_dict['combined'])}" )
if set( data_dict['combined'].keys() ) != { 'meta', 'data' }:
raise ValueError( "data_dict['combined'] must have keys 'meta' and 'data'" )
if not isinstance( data_dict['individual'], list ):
raise TypeError( f"data_dict['individual'] must be a list, not a {type(data_dict['individual'])}" )
for indiv in data_dict['individual']:
if not isinstance( indiv, dict ):
raise TypeError( f"elements of the data_dict['individual'] list must be dicts, but at least one is "
f"a {type(indiv)}" )
if set( indiv.keys() ) != { 'meta', 'data' }:
raise ValueError( "Each dict in the data_dict['individual'] list must have keys 'meta' and 'data'" )
# Make sure the ids and provenances are all there
if not self.no_database:
for prop, val in zip( [ 'id', 'provenance_id', 'diaobject_id', 'diaobject_position_id' ],
[ self.id, provenance, diaobject, diaobject_position ] ):
if prop not in data_dict['meta']:
data_dict['meta'][prop] = val
try:
# This weird way of doing things is so that we will get the same error
# message if there's a uuid mismatch, or if asUUID fails.
# diaobject_position_id is the only one that can be None
_ok = ( ( ( val is None ) and ( prop == 'diaobject_position_id' ) )
or
( asUUID( data_dict['meta'][prop] ) == val )
)
data_dict['meta'][prop] = asUUID( data_dict['meta'][prop], oknone=True )
except Exception:
raise ValueError( f"Property {prop} in data_dict['meta'] has value {data_dict['meta'][prop]}, "
f"doesn't match expected value {val}" )
# Make sure the self attributes are set
self.provenance_id = data_dict['meta']['provenance_id']
self.diaobject_id = data_dict['meta']['diaobject_id']
self.diaobject_position_id = data_dict['meta']['diaobject_position_id']
data_dict['meta']['band'] = data_dict['band'] if 'band' in data_dict else None
data_dict['meta']['filepath'] = str( self.filepath )
# Make sure that if there's an nfiles in meta, it is right
if 'nfiles' in data_dict['combined']['meta']:
if data_dict['combined']['meta']['nfiles'] != len(data_dict['individual']):
raise ValueError( f"You have nfiles={data_dict['meta']['nfiles']} in meta, but the individual list "
f"is length {len(data_dict['individual'])}" )
else:
data_dict['meta']['combined']['nfiles'] = len( data_dict['individual'] )
# Make sure that we have an image_id for all the individual files
if not self.no_database:
for indiv_dict in data_dict['individual']:
if 'image_id' not in indiv_dict['meta']:
raise ValueError( "All 'individual' dictionaries must have an image_id key" )
# Make sure it uuidifies
_ = asUUID( indiv_dict['meta']['image_id'] )
# TODO VERIFY DATA FORMAT
self._data_dict = data_dict
if not self.no_database:
dbclient = SNPITDBClient.get() if dbclient is None else dbclient
self._fill_props( dbclient=dbclient )
[docs]
def write_file( self, filepath=None ):
"""Writes the file
Parameters
----------
filepath : str or pathlib.Path, default None
The full path to write the file to. If None, then will use
the base_path and filepath passed at object construction, or
if those were None, will generate a standard filepath used
for the database files. If you're writing to the database,
you usually want this to be None.
"""
filepath = pathlib.Path( filepath ) if filepath is not None else self.full_filepath
filepath.parent.mkdir( exist_ok=True, parents=True )
with h5py.File( filepath, 'w' ) as h5f:
topgrp = h5f.create_group( "spectrum1d" )
for key, val in self.data_dict['meta'].items():
if isinstance( val, uuid.UUID ):
topgrp.attrs[key] = str(val)
else:
topgrp.attrs[key] = val if val is not None else h5py.Empty('i')
combined = topgrp.create_group( "combined" )
for key, val in self.data_dict['combined']['meta'].items():
if isinstance( val, uuid.UUID ):
combined.attrs[key] = str( val )
else:
combined.attrs[key] = val if val is not None else h5py.Empty('i')
combined.create_dataset( 'lamb', data=self.data_dict['combined']['data']['lamb'] )
combined.create_dataset( 'flam', data=self.data_dict['combined']['data']['flam'] )
combined.create_dataset( 'func', data=self.data_dict['combined']['data']['func'] )
combined.create_dataset( 'count', data=self.data_dict['combined']['data']['count'] )
for dex, indiv in enumerate( self.data_dict['individual'] ):
indivgrp = topgrp.create_group( f"individual_{dex}" )
for key, val in indiv['meta'].items():
if isinstance( val, uuid.UUID ):
indivgrp.attrs[key] = str(val)
else:
indivgrp.attrs[key] = val if val is not None else h5py.Empty('i')
indivgrp.create_dataset( 'lamb', data=indiv['data']['lamb'] )
indivgrp.create_dataset( 'flam', data=indiv['data']['flam'] )
indivgrp.create_dataset( 'func', data=indiv['data']['func'] )
[docs]
def read_data( self, filepath=None, dbclient=None ):
"""Reads the file.
Populates self._data_dict
Parameters
----------
filepath : str or pathlib.Path, default None
The full path to write the file to. If None, then will use
the base_path and filepath passed at object construction.
"""
filepath = pathlib.Path( filepath ) if filepath is not None else self.full_filepath
self._data_dict = { 'meta': {},
'combined': { 'meta': {}, 'data': {} },
'individual': [] }
with h5py.File( filepath, 'r' ) as h5f:
topgrp = h5f['spectrum1d']
self._data_dict['meta'] = dict( topgrp.attrs )
for key in self._data_dict['meta']:
if self._data_dict['meta'][key] == h5py.Empty('i'):
self._data_dict['meta'][key] = None
combgrp = topgrp['combined']
self._data_dict['combined']['meta'] = dict( combgrp.attrs )
tmpd = self._data_dict['combined']['meta']
for key in tmpd:
if tmpd[key] == h5py.Empty('i'):
tmpd[key] = None
self._data_dict['combined']['data']['lamb'] = combgrp['lamb'][:]
self._data_dict['combined']['data']['flam'] = combgrp['flam'][:]
self._data_dict['combined']['data']['func'] = combgrp['func'][:]
self._data_dict['combined']['data']['count'] = combgrp['count'][:]
# Figure out how many individuals there are
nkeys = 0
for key in topgrp.keys():
mat = re.search( r'^individual_(\d+)$', key )
if mat is not None:
nkeys = max( nkeys, int(mat.group(1))+1 )
for indivdex in range(nkeys):
indiv = {}
indivgrp = topgrp[ f'individual_{indivdex}' ]
indiv['meta'] = dict( indivgrp.attrs )
for key in indiv['meta']:
if indiv['meta'][key] == h5py.Empty('i'):
indiv['meta'][key] = None
indiv['data'] = { 'lamb': indivgrp['lamb'][:],
'flam': indivgrp['flam'][:],
'func': indivgrp['func'][:] }
self._data_dict['individual'].append( indiv )
if not self.no_database:
dbclient = SNPITDBClient.get() if dbclient is None else dbclient
self._fill_props( dbclient=dbclient )
[docs]
def save_to_db( self, write=False, dbclient=None ):
"""Save spectrum to db.
Parmaters
---------
write : bool, default False
If write=True, then also write the file. If not, then you
must call write_file() first. (If you call write() and then
call this with write=True, you'll get a file exists error.)
dbclient : SNPITDBClient, default None
The connection to the database web server. If None, a new
one will be made that logs you in using the information in
Config.
Returns
-------
dict : the row of the database saved, for informational purposes
"""
if self.no_database:
raise RuntimeError( "Can't save a no_database spectrum to the database." )
dbclient = SNPITDBClient.get() if dbclient is None else dbclient
self._fill_props( dbclient=dbclient )
if write:
self.write_file()
data = { 'id': self.id,
'provenance_id': self.provenance_id,
'diaobject_id': self.diaobject_id,
'diaobject_position_id': self.diaobject_position_id,
'band': self._images[0].band,
'filepath': self.filepath,
'mjd_start': self._mjd_start,
'mjd_end': self._mjd_end,
'epoch': self._epoch }
return dbclient.send( "savespectrum1d", data=simplejson.dumps( data, cls=SNPITJsonEncoder ),
headers={'Content-Type': 'application/json'} )
[docs]
@classmethod
def get_spectrum1d( cls, spectrum1d_id, dbclient=None ):
"""Get a Specrum1d from the database.
Parameters
----------
spectrum1d_id : UUID or str that can be converted to a UUID
The id of the spectrum to fetch.
dbclient : SNPITDBClient or None
The connection to the database web server. If None, a new
one will be made that logs you in using the information in
Config.
Returns
-------
Spectrum1d
"""
dbclient = SNPITDBClient.get() if dbclient is None else dbclient
result = dbclient.send( f"getspectrum1d/{spectrum1d_id}" )
# Adjust the return dict to what's expected by Spectrum1d.__init__()
result['provenance'] = result['provenance_id']
result['diaobject'] = result['diaobject_id']
result['diaobject_position'] = result['diaobject_position_id']
del result['provenance_id']
del result['diaobject_id']
del result['diaobject_position_id']
del result['created_at']
return Spectrum1d( **result )
[docs]
@classmethod
def find_spectra( cls, provenance=None, provenance_tag=None, process=None, dbclient=None,
diaobject=None, **kwargs ):
"""Search the database for spectra.
Must pass either provenance, or both of (provenance_tag and
process). All the rest are optional; omitted parameters will
just not be used to filter the list of returned spectra.
Parameters
-----------
provenance : Provenance or UUID or str, default None
The Provenance, or the id of the Provenacne, of the
lightcurve you want. You must pass either provenance or
provenance_tag. (If you pass both, provenance_tag will be
ignored).
provenance_tag : str, default None
The provenance tag used to find the provenance of the
lightcurves you want. Ignored if provenance is not None.
Requires process.
process : str, default None
The process used together with provenance_tag to find the
provenance of the lightcurves you want. Required if
provenance_tag is not None.
dbclient : SNPITDBClient or None
The connection to the database (optional). If you don't
pass one, will use the cached connection, or will make a new
one based on what's in the config.
diaobject : DiaObject or UUID or str or None
The DiaObject, or the ID of the object, you want spectra for.
band : str
The band of the images that went into the spectrum
mjd_start, mjd_end : float The earliesr mjd, and latest mjd,
of the individual images that went into the exposure.
(mjd_end is actually the mjd of the final image, plus it's
exposure time converted to days).
mjd_start_min, mjd_start_max, mjd_end_min, mjd_end_max : float
Use these if you want to search a range of times.
order_by: str or list, default None
By default, the returned images are not sorted in any
particular way. Put a keyword here to sort by that value
(or by those values). Options include 'id',
'provenance_id', 'observation_id', 'sca', 'ra', 'dec', 'filepath',
'width', 'height', 'mjd', 'exptime'. Not all of these are
necessarily useful, and some of them may be null for many
objects in the database.
limit : int, default None
Only return this many objects at most.
offset : int, default None
Useful with limit and order_by ; offset the returned value
by this many entries. You can make repeated calls to
find_objects to get subsets of objects by passing the same
order_by and limit, but different offsets each time, to
slowly build up a list.
Returns
-------
List of spectra
"""
dbclient = SNPITDBClient.get() if dbclient is None else dbclient
params = kwargs
if provenance is not None:
if isinstance( provenance, Provenance ):
params['provenance'] = provenance.id
else:
params['provenance'] = asUUID( provenance )
else:
if ( provenance_tag is None ) or ( process is None ):
raise ValueError( "You must pass either provenance, or both of provenance_tag and process" )
params['provenance_tag'] = provenance_tag
params['process'] = process
if diaobject is not None:
params['diaobject_id'] = diaobject.id if isinstance( diaobject, DiaObject ) else asUUID( diaobject )
reses = dbclient.send( "/findspectra1d", data=simplejson.dumps( params, cls=SNPITJsonEncoder ),
headers={'Content-Type': 'application/json'} )
spectra1d = []
for res in reses:
# Worm things around to work for kwargs to __init__
res['provenance'] = res['provenance_id']
res['diaobject'] = res['diaobject_id']
res['diaobject_position'] = res['diaobject_position_id']
del res['provenance_id']
del res['diaobject_id']
del res['diaobject_position_id']
del res['created_at']
spectra1d.append( Spectrum1d( **res ) )
return spectra1d