#!/usr/bin/env python
"""CCDPROC.PY - Generic CCD image calibration/reduction
"""
from __future__ import print_function
__authors__ = 'David Nidever <dnidever@montana.edu>'
__version__ = '20211021' # yyyymmdd
import matplotlib.pyplot as plt
from astropy.io import fits
import numpy as np
import os
from glob import glob
import time
import re
import subprocess
from astropy.modeling.models import Gaussian2D
#import matplotlib
#matplotlib.use('nbagg')
[docs]def datadir():
""" Get package data directory."""
fil = os.path.abspath(__file__)
codedir = os.path.dirname(fil)
datadir = codedir+'/data/'
return datadir
[docs]def ccdlist(input=None):
if input is None: input='*.fits'
if type(input) is list:
files = input
else:
files = glob(input)
nfiles = len(files)
dt = np.dtype([('file',np.str,100),('object',np.str,100),('naxis1',int),('naxis2',int),
('imagetyp',np.str,100),('exptime',float),('filter',np.str,100),
('dateobs',np.str,50),('jd',float)])
cat = np.zeros(nfiles,dtype=dt)
for i,f in enumerate(files):
base = os.path.basename(f)
base = base.split('.')[0]
h = fits.getheader(f)
cat['file'][i] = f
cat['object'][i] = h.get('object')
cat['naxis1'][i] = h.get('naxis1')
cat['naxis2'][i] = h.get('naxis2')
cat['imagetyp'][i] = h.get('imagetyp')
cat['exptime'][i] = h.get('exptime')
cat['filter'][i] = h.get('filter')
cat['dateobs'][i] = h.get('date-obs')
cat['jd'][i] = h.get('jd')
print(base+' '+str(cat['naxis1'][i])+' '+str(cat['naxis2'][i])+' '+cat['imagetyp'][i]+' '+str(cat['exptime'][i])+' '+cat['filter'][i])
return cat
[docs]def library():
""" Get calibration library file info."""
ddir = datadir()
files = []
for p in ['*.fit','*.fit.gz','*.fits','*.fits.gz']:
files += glob(ddir+p)
nfiles = len(files)
if nfiles==0:
return None
cat = np.zeros(nfiles,dtype=np.dtype([('file',np.str,200),('name',np.str,100),('naxis1',int),
('naxis2',int),('imagetyp',np.str,50),
('type',np.str,50),('filter',np.str,40),('dateobs',np.str,30),
('jd',float),('master',bool)]))
for i in range(nfiles):
head = fits.getheader(files[i])
cat['file'][i] = files[i]
cat['name'][i] = os.path.basename(files[i])
cat['naxis1'][i] = head.get('naxis1')
cat['naxis2'][i] = head.get('naxis2')
imagetyp = head.get('imagetyp')
cat['imagetyp'][i] = imagetyp
for t in ['bias','dark','flat','light']:
if t in imagetyp.lower():
cat['type'][i] = t
if 'bpm' in cat['name'][i]:
cat['type'][i] = 'bpm'
cat['filter'][i] = head.get('filter')
cat['dateobs'][i] = head.get('date-obs')
cat['jd'][i] = head.get('jd')
if 'master' in cat['name'][i]:
cat['master'][i] = True
else:
cat['master'][i] = False
return cat
[docs]def overscan(im,head,verbose=False):
""" This calculate the overscan and subtracts it from the data and then trims off the overscan region"""
# y = [0:40] and [3429:3464]
# x = [0:13] and [2726:2727]
# DATA = [14:2725,41:3428]
# 2712 x 3388
nx,ny = im.shape
# Use trimsec
trimsec = head.get('TRIMSEC')
if trimsec is None:
raise ValueError('No TRIMSEC found in header')
trim = [int(s) for s in re.findall(r'\d+',trimsec)]
# biassec
biassec = head.get('BIASSEC')
biassec2 = None
if biassec is None:
biassec = head.get('BIASSEC1')
biassec2 = head.get('BIASSEC2')
if biassec is None:
raise ValueError('No BIASSEC found in header')
bias = [int(s) for s in re.findall(r'\d+',biassec)]
# Y first, then X
o = im[bias[2]-1:bias[3],bias[0]-1:bias[1]]
# check for second biassec
if biassec2 is not None:
bias2 = [int(s) for s in re.findall(r'\d+',biassec2)]
o2 = im[bias2[2]-1:bias2[3],bias2[0]-1:bias2[1]]
o = np.hstack((o,o2))
# Subtract overscan
oshape = o.shape
if oshape[0] > oshape[1]:
# Take the mean
mno = np.mean(o,axis=1)
# Fit line to it
coef = np.polyfit(np.arange(nx),mno,1)
fit = np.poly1d(coef)(np.arange(nx))
# Subtract from entire image
oim = np.repeat(fit,ny).reshape(nx,ny)
out = im.astype(float)-oim
else:
# Take the mean
mno = np.mean(o,axis=0)
# Fit line to it
coef = np.polyfit(np.arange(ny),mno,1)
fit = np.poly1d(coef)(np.arange(ny))
# Subtract from entire image
oim = np.repeat(fit,nx).reshape(nx,ny)
out = im.astype(float)-oim
# Trim the overscan
out = out[trim[2]-1:trim[3],trim[0]-1:trim[1]]
#out = out[14:2726,41:3429]
# Update header
nx1, ny1 = out.shape
head2 = head.copy()
head2['NAXIS1'] = ny1
head2['NAXIS2'] = nx1
head2['BITPIX'] = -32
if biassec2 is not None:
head2['BIASSEC1'] = biassec
head2['BIASSEC2'] = biassec2
else:
head2['BIASSEC'] = biassec
head2['TRIMSEC'] = trimsec
head2['OVSNMEAN'] = np.mean(oim)
head2['TRIM'] = time.ctime()+' Trim is '+trimsec
if biassec2 is not None:
head2['OVERSCAN'] = time.ctime()+' Overscan is '+biassec+' and '+biassec2+', mean '+str(np.mean(oim))
else:
head2['OVERSCAN'] = time.ctime()+' Overscan is '+biassec+', mean '+str(np.mean(oim))
if verbose:
print(head2['OVERSCAN'])
print(time.ctime()+' Trimming to '+trimsec)
return out, head2
[docs]def fixpixconv(im,mask,yind,xind,filt):
ny,nx = im.shape
npix,npix = filt.shape
nh = npix//2
# image indices
ylo = np.maximum(yind-nh,0)
yhi = np.minimum(yind+nh+1,ny)
xlo = np.maximum(xind-nh,0)
xhi = np.minimum(xind+nh+1,nx)
slc = (slice(ylo,yhi,None),slice(xlo,xhi,None))
# filter indices
fylo = np.maximum(yind-nh,0)-yind+nh
fyhi = np.minimum(yind+nh+1,ny)-yind+nh
fxlo = np.maximum(xind-nh,0)-xind+nh
fxhi = np.minimum(xind+nh+1,nx)-xind+nh
fslc = (slice(fylo,fyhi,None),slice(fxlo,fxhi,None))
ngd = np.sum(mask[slc])
if ngd==0:
return None
totf = np.sum(filt[fslc])
new = np.sum(im[slc]*(mask[slc]==0)*filt[fslc])/totf
return new
[docs]def fixpix(im,mask,head,verbose=False):
""" Interpolate over bad pixels."""
ny,nx = im.shape
yind,xind = np.where(mask>0)
nind = len(xind)
# Make filters
xx,yy = np.meshgrid(np.arange(3),np.arange(3))
filt3 = np.exp(-0.5*((xx-1)**2+(yy-1)**2)/1.0**2)
xx,yy = np.meshgrid(np.arange(7),np.arange(7))
filt7 = np.exp(-0.5*((xx-3)**2+(yy-3)**2)/1.0**2)
xx,yy = np.meshgrid(np.arange(11),np.arange(11))
filt11 = np.exp(-0.5*((xx-5)**2+(yy-5)**2)/1.0**2)
# Loop over pixels
nfix = 0
for i in range(nind):
yind1 = yind[i]
xind1 = xind[i]
# convolve with filter
new = fixpixconv(im,mask,yind1,xind1,filt3)
if new is None:
new = fixpixconv(im,mask,yind1,xind1,filt7)
if new is None:
new = fixpixconv(im,mask,yind1,xind1,filt11)
if new is not None:
im[yind1,xind1] = new
mask[yind1,xind1] += 4
nfix += 1
head['FIXPIX'] = time.ctime()+' FIXPIX: '+str(nfix)+' pixels interpolated over'
if verbose:
print(time.ctime()+' Fixpix: '+str(nfix)+' pixels interpolated over')
return im,mask,head
[docs]def masterbias(files,med=False,outfile=None,clobber=True,verbose=False):
"""
Load the bias images. Overscan correct and trim them. Then average them.
Parameters
----------
files : list
List of bias FITS files.
med : boolean, optional
Use the median of all the files. By default med=False and the mean is calculated.
outfile : string, optional
Filename to write the master bias image to.
clobber : boolean, optional
If the output file already exists, then overwrite it. Default is True.
verbose : boolean, optional
Verbose output to the screen. Default is False.
Returns
-------
aim : numpy image
The 2D master bias image.
ahead : header dictionary
The master bias header.
Example
-------
bias, bhead = masterbias(bias_files)
"""
nfiles = len(files)
if verbose:
print('Creating master bias using '+str(nfiles)+' files')
# File loop
for i in range(nfiles):
im,head = fits.getdata(files[i],0,header=True)
sh = im.shape
if verbose:
print(str(i+1)+' '+files[i]+' ['+str(sh[1])+','+str(sh[0])+']')
# Fix header, if necessary
if (head.get('TRIMSEC') is None) | (head.get('BIASSEC') is None):
head = fixheader(head)
# Check image type
imagetyp = head['IMAGETYP']
if 'bias' not in imagetyp.lower() and 'zero' not in imagetyp.lower():
raise ValueError(files[i]+' is not a bias')
# Image processing, overscan+trim
im2,head2 = ccdproc(im,head)
# Initialize array
if i==0:
ny,nx = im2.shape
if med:
imarr = np.zeros((ny, nx, nfiles),float)
else:
totim = np.zeros(im2.shape,float)
if med:
imarr[:,:,i] = im2
else:
totim += im2
if i==0: ahead=head2.copy()
ahead['CMB'+str(i+1)] = files[i]
# Final calculation
if med:
aim = np.median(imarr,axis=2)
ahead['HISTORY'] = 'Median combine'
else:
aim = totim/nfiles
ahead['HISTORY'] = 'Mean combine'
ahead['NCOMBINE'] = nfiles
ahead['HISTORY'] = time.ctime()+' bias combine'
aim = aim.astype(np.float32) # convert to 32 bit
# Output file
if outfile is not None:
if os.path.exists(outfile):
if clobber is False:
raise ValueError(outfile+' already exists and clobber=False')
else:
os.remove(outfile)
if verbose:
print('Writing master bias to '+outfile)
hdu = fits.PrimaryHDU(aim,ahead).writeto(outfile)
return aim, ahead
[docs]def masterdark(files,zero,med=False,outfile=None,clobber=True,verbose=False):
"""
Load the dark images. Overscan correct and trim them. zero subtract. Then average them.
Parameters
----------
files : list
List of dark FITS files.
zero : numpy image or str
Master bias. This can be the image or the filename.
med : boolean, optional
Use the median of all the files. By default med=False and the mean is calculated.
outfile : string, optional
Filename to write the master dark image to.
clobber : boolean, optional
If the output file already exists, then overwrite it. Default is True.
verbose : boolean, optional
Verbose output to the screen. Default is False.
Returns
-------
aim : numpy image
The 2D master dark image.
ahead : header dictionary
The master dark header.
Example
-------
dark, dhead = masterdark(dark_files,zero)
"""
nfiles = len(files)
if verbose:
print('Creating master dark using '+str(nfiles)+' files')
# Load master bias if filename input
if type(zero) is str:
zerofile = zero
zero,zhead = fits.getdata(zerofile,header=True)
# File loop
for i in range(nfiles):
im,head = fits.getdata(files[i],0,header=True)
sh = im.shape
if verbose:
print(str(i+1)+' '+files[i]+' ['+str(sh[1])+','+str(sh[0])+'] '+str(head['exptime'])+' sec')
# Fix header, if necessary
if (head.get('TRIMSEC') is None) | (head.get('BIASSEC') is None):
head = fixheader(head)
# Check image type
imagetyp = head['IMAGETYP']
if 'dark' not in imagetyp.lower():
raise ValueError(files[i]+' is not a dark')
# Image processing, overscan+trim+zercorr
im2,head2 = ccdproc(im,head,zero=zero)
# Initialize array
if i==0:
ny,nx = im2.shape
if med:
imarr = np.zeros((ny,nx, nfiles),float)
else:
totim = np.zeros(im2.shape,float)
if med:
imarr[:,:,i] = im2 / head['exptime'] # divide by exposure time
else:
totim += im2 / head['exptime']
if i==0: ahead=head2.copy()
ahead['CMB'+str(i+1)] = files[i]
# Final calculation
if med:
aim = np.median(imarr,axis=2)
ahead['HISTORY'] = 'Median combine'
else:
aim = totim/nfiles
ahead['HISTORY'] = 'Mean combine'
# Make sure they are all non-negative
aim = np.maximum(aim,0)
ahead['NCOMBINE'] = nfiles
ahead['HISTORY'] = time.ctime()+' dark combine'
aim = aim.astype(np.float32) # convert to 32 bit
# Output file
if outfile is not None:
if os.path.exists(outfile):
if clobber is False:
raise ValueError(outfile+' already exists and clobber=False')
else:
os.remove(outfile)
if verbose:
print('Writing master dark to '+outfile)
hdu = fits.PrimaryHDU(aim,ahead).writeto(outfile)
return aim, ahead
[docs]def masterflat(files,zero,dark,med=False,outfile=None,clobber=True,verbose=False):
"""
Load the flat images. Overscan correct and trim them. Bias and dark subtract. Then divide by median and average them.
Parameters
----------
files : list
List of flat FITS files.
zero : numpy image or str
Master bias. This can be the image or the filename.
dark : numpy image or str
Master dark. This can be the image or the filename.
med : boolean, optional
Use the median of all the files. By default med=False and the mean is calculated.
outfile : string, optional
Filename to write the master flat image to.
clobber : bool, optional
If the output file already exists, then overwrite it.
If the output file already exists, then overwrite it. Default is True.
verbose : boolean, optional
Verbose output to the screen. Default is False.
Returns
-------
aim : numpy image
The 2D master flat image.
ahead : header dictionary
The master flat header.
Example
-------
flat, fhead = masterflat(flat_files,zero,dark)
"""
nfiles = len(files)
if verbose:
print('Creating master flat using '+str(nfiles)+' files')
# Load master bias if filename input
if type(zero) is str:
zerofile = zero
zero,zhead = fits.getdata(zerofile,header=True)
# Load master dark if filename input
if type(dark) is str:
darkfile = dark
dark,dhead = fits.getdata(darkfile,header=True)
# File loop
for i in range(nfiles):
im,head = fits.getdata(files[i],0,header=True)
sh = im.shape
# Fix header, if necessary
if (head.get('TRIMSEC') is None) | (head.get('BIASSEC') is None):
head = fixheader(head)
# Check image type
imagetyp = head['IMAGETYP']
if 'flat' not in imagetyp.lower():
raise ValueError(files[i]+' is not a flat')
# Image processing, overscan+trim+zercorr+darkcorr
im2,head2 = ccdproc(im,head,zero=zero,dark=dark)
if verbose:
print(str(i+1)+' '+files[i]+' ['+str(sh[1])+','+str(sh[0])+'] '+str(head['exptime'])+' sec %8.2f ADU' %(np.median(im2)))
# Initialize array
if i==0:
ny,nx = im2.shape
if med:
imarr = np.zeros((ny,nx,nfiles),float)
else:
totim = np.zeros(im2.shape,float)
if med:
imarr[:,:,i] = im2 / np.median(im2) # divide by median flux
else:
totim += im2 / np.median(im2)
if i==0: ahead=head2.copy()
ahead['CMB'+str(i+1)] = files[i]
# Final calculation
if med:
aim = np.median(imarr,axis=2)
ahead['HISTORY'] = 'Median combine'
else:
aim = totim / nfiles
ahead['HISTORY'] = 'Mean combine'
ahead['NCOMBINE'] = nfiles
ahead['HISTORY'] = time.ctime()+' flat combine'
aim = aim.astype(np.float32) # convert to 32 bit
# Output file
if outfile is not None:
if os.path.exists(outfile):
if clobber is False:
raise ValueError(outfile+' already exists and clobber=False')
else:
os.remove(outfile)
if verbose:
print('Writing master flat to '+outfile)
hdu = fits.PrimaryHDU(aim,ahead).writeto(outfile)
return aim, ahead
[docs]def makebpm(zero,dark=None,flat=None,maxzero=100,maxdark=1,maxflat=1.2,
outfile=None,verbose=False,clobber=True,compress=False):
"""
Create bad pixel mask from master bias, dark and flat.
Parameters
----------
zero : filename or numpy 2D array, optional
The master bias. Either the 2D image or the filename.
dark : filename or numpy 2D array, optional
The master dark. Either the 2D image or the filename.
flat : filename or numpy 2D array, optional
The master flat. Either the 2D image or the filename.
maxzero : int, optional
The cutoff to use for the bias image. Values above that will
be considered bad. Default is 100.
maxdark : int, optional
The cutoff to use for the dark image. Values above that will
be considered bad. Default is 1.0
maxflat : int, optional
The cutoff to use for the flat image. Values above that will
be considered bad. Default is 1.2
outfile : string, optional
Filename to write the processed image to.
verbose : boolean, optional
Verbose output to the screen.
clobber : boolean, optional
If the output file already exists, then overwrite it.
compress : boolean, optional
Gzip compress output file. Default is False.
Returns
-------
bpm : numpy image
The 2D processed image.
fhead : header dictionary
The header for the processed image.
Example
-------
bpm, head = makebpm(zero,dark,flat)
"""
# Load master bias if filename input
if type(zero) is str:
zerofile = zero
zero,zhead = fits.getdata(zerofile,header=True)
# Load master dark if filename input
if type(dark) is str:
darkfile = dark
dark,dhead = fits.getdata(darkfile,header=True)
# Load master flat if filename input
if type(flat) is str:
flatfile = flat
flat,fhead = fits.getdata(flatfile,header=True)
bhead = zhead.copy()
# Initialize array
ny,nx = zero.shape
bpm = np.zeros((ny,nx),int)
# Check zero
bpm[zero>maxzero] = 1
# Check dark
if dark is not None:
bpm[dark>maxdark] = 1
# Check flat
if flat is not None:
bpm[flat>maxflat] = 1
nbad = np.sum(bpm)
bhead['HISTORY'] = str(nbad)+' pixels marked as bad'
bhead['HISTORY'] = time.ctime()+' BPM creation'
# Output file
if outfile is not None:
if os.path.exists(outfile):
if clobber is False:
raise ValueError(outfile+' already exists and clobber=False')
else:
os.remove(outfile)
if verbose:
print('Writing bpm to '+outfile)
hdu = fits.PrimaryHDU(bpm,bhead).writeto(outfile)
return bpm, bhead
[docs]def ccdproc(data,head=None,bpm=None,zero=None,dark=None,flat=None,outfile=None,outsuffix='_red',
verbose=False,clobber=True,compress=False,fix=False):
"""
Overscan subtract, trim, subtract master zero, subtract master dark, flat field.
Parameters
----------
data : list or numpy 2D array
This can either be (1) a list of image filenames, (2) a file with a list of
names image filenames, or (3) a 2D image (head must also be input).
head : header dictionary, optional
The header if a single image is input (data).
bpm : filename, numpy 2D array, or boolean, optional
The bad pixel mask. Either the 2D image, the filename, or
a boolean. If bpm=True, then a library BPM will be used.
zero : filename, numpy 2D array, or boolean, optional
The master bias. Either the 2D image, the filename, or
a boolean. If zero=True, then a library zero will be used.
dark : filename, numpy 2D array, or boolean, optional
The master dark. Either the 2D image, the filename, or
a boolean. If dark=True, then a library dark will be used.
flat : filename, numpy 2D array, or boolean, optional
The master flat. Either the 2D image, the filename, or
a boolean. If flat=True, then a library flat will be used.
fix : boolean, optional
Interpolate over bad pixels. Default is False.
outfile : string or boolean, optional
Filename to write the processed image to. If outfile=True, then
the output filename will be the input filename with outsuffix
added (i.e. image.fits -> image_red.fits).
outsuffix : string, optional
Suffix to use for output files. Default is "_red".
verbose : boolean, optional
Verbose output to the screen.
clobber : boolean, optional
If the output file already exists, then overwrite it.
compress : boolean, optional
Gzip compress output file. Default is False.
Returns
-------
fim : numpy image
The 2D processed image.
fhead : header dictionary
The header for the processed image.
Example
-------
flat, fhead = ccdproc(files,bpm,zero,dark,flat)
"""
# Check the inputs
if type(data) is str: # filname input
files = [data]
# Check if it's a FITS filename or a list
try:
head = fits.getheader(data)
except:
# This might be a list of filenames
f = open(data,'r')
files = f.readlines()
f.close()
files = [l.rstrip('\n') for l in files] # strip newlines
elif type(data) is list: # list of files input
files = data
elif type(data) is np.ndarray: # numpy array
if data.ndim==1 and (data.dtype.type==np.str or data.dtype.type==np.str_): # array of filenames
files = list(data)
elif data.ndim==2: # 2D image
files = ['']
im = data
else:
raise ValueError('Input numpy data not understood')
else:
raise ValueError('Input data not understood')
nfiles = len(files)
# Get calibration library information
cals = library()
# Load calibration files
#-----------------------
# -- BPM ---
if type(bpm) is str:
if os.path.exists(bpm):
bpmim,bpmhead = fits.getdata(bpm,0,header=True)
else:
raise ValueError(bpm+' NOT FOUND')
# Use calibration library file
elif bpm is True:
bpmind, = np.where(cals['type']=='bpm')
if len(bpmind)==0:
raise ValueError('No library BPM found')
bpmfile = cals['file'][bpmind[0]]
if os.path.exists(bpmfile):
bpmim,bpmhead = fits.getdata(bpmfile,0,header=True)
else:
raise ValueError('Library '+bpmfile+' file NOT FOUND')
# Image input
else:
bpmim = bpm
# --- ZERO ---
# Filename input
if type(zero) is str:
if os.path.exists(zero):
zeroim,zerohead = fits.getdata(zero,0,header=True)
else:
raise ValueError(zero+' NOT FOUND')
# Use calibration library file
elif zero is True:
zeroind, = np.where((cals['type']=='bias') & (cals['master']==True))
if len(zeroind)==0:
raise ValueError('No library master Zero found for this image')
zerofile = cals['file'][zeroind[0]]
if os.path.exists(zerofile):
zezroim,zerohead = fits.getdata(zerofile,0,header=True)
else:
raise ValueError('Library '+zerofile+' file NOT FOUND')
# Image input
else:
zeroim = zero
# --- DARK ---
# Filename input
if type(dark) is str:
if os.path.exists(dark):
darkim,darkhead = fits.getdata(dark,0,header=True)
else:
raise ValueError(dark+' NOT FOUND')
# Use calibration library file
elif dark is True:
darkind, = np.where((cals['type']=='dark') & (cals['master']==True))
if len(darkind)==0:
raise ValueError('No library master Dark found for this image')
darkfile = cals['file'][darkind[0]]
if os.path.exists(darkfile):
darkim,darkhead = fits.getdata(darkfile,0,header=True)
else:
raise ValueError('Library '+darkfile+' file NOT FOUND')
# Image input
else:
darkim = dark
# FLATS are filter-specific
# Image loop
#-----------
for i in range(nfiles):
# Data input
if len(files)==1 and files[0]=='':
if head is None:
raise ValueError('Header not input')
# Load the data
else:
if verbose:
print('Loading '+files[0])
if os.path.exists(files[i]):
im,head = fits.getdata(files[i],0,header=True)
else:
raise ValueError(files[i]+' NOT FOUND')
# Fix header, if necessary
if (head.get('TRIMSEC') is None) | (head.get('BIASSEC') is None):
head = fixheader(head)
# Overscan subtract and trim
#---------------------------
if head.get('OVERSCAN') is None:
fim,fhead = overscan(im,head,verbose=verbose)
else:
print('Already OVERSCAN corrected')
fim = im.copy()
fhead = head.copy()
# Initialize error and mask image
error = np.zeros(fim.shape,float)
mask = np.zeros(fim.shape,np.uint8) # uint8, can handle values up to 255
# Bad pixel mask
#---------------
if (bpm is not None):
# Not corrected yet
if head.get('BPMCOR') is None:
# Check sizes
if bpmim.shape != fim.shape:
raise ValueError('BPM shape and image shape do not match')
# Do the correction
nbadbpm = np.sum(bpmim>0)
if nbadbpm>0:
fim[bpmim>0] = 0.0
mask[bpmim>0] = 1
error[bpmim>0] = 1e30
fhead['BPMCOR'] = time.ctime()+' BPM: masked '+str(nbadbpm)+' bad pixels'
if verbose:
print(fhead['BPMCOR'])
# Corrected already
else:
print('Already ZERO subtracted')
# Set mask and error for saturated pixels
#----------------------------------------
saturation = head.get('saturate')
if saturation is None:
saturation = 64000
sat = (fim>saturation) & (mask==0)
mask[sat] = 2
error[sat] = 1e30
# Subtract master bias
#---------------------
if (zero is not None):
# Not corrected yet
if head.get('ZEROCOR') is None:
# Check sizes
if zeroim.shape != fim.shape:
raise ValueError('ZERO shape and image shape do not match')
# Do the correction
fim[mask==0] -= zeroim[mask==0]
fhead['ZEROCOR'] = time.ctime()+' ZERO: mean %6.2f, stdev %6.2f' % (np.mean(zeroim[mask==0]),np.std(zeroim[mask==0]))
if verbose:
print(fhead['ZEROCOR'])
# Corrected already
else:
print('Already ZERO subtracted')
# Calculate error array
#------------------------
gain = head.get('gain')
if gain is None:
gain = 1.0
rdnoise = head.get('rdnoise')
if rdnoise is None:
rdnoise = 0.0
# Add Poisson noise and readnoise in quadrature
error[mask==0] = np.sqrt(np.maximum(fim[mask==0]/gain,0)+rdnoise**2)
# Subtract master dark scaled to this exposure time
#--------------------------------------------------
if (dark is not None):
# Not corrected yet
if head.get('DARKCOR') is None:
# Check sizes
if darkim.shape != fim.shape:
raise ValueError('DARK shape and image shape do not match')
# Do the correction
fim[mask==0] -= darkim[mask==0]*head['exptime']
fhead['DARKCOR'] = time.ctime()+' DARK: mean %6.2f, stdev %6.2f' % \
(np.mean(darkim[mask==0]*head['exptime']),np.std(darkim[mask==0]*head['exptime']))
if verbose:
print(fhead['DARKCOR'])
# Corrected already
else:
print('Already DARK corrected')
# Flat field
#-----------
if (flat is not None):
# Not corrected yet
if head.get('FLATCOR') is None:
# Filename input
if type(flat) is str:
if os.path.exists(flat):
flatim,flathead = fits.getdata(flat,0,header=True)
else:
raise ValueError(flat+' NOT FOUND')
# Use calibration library file
elif flat is True:
flatind, = np.where((cals['type']=='flat') & (cals['master']==True)
& (cals['filter']==head['filter']))
if len(flatind)==0:
raise ValueError('No library master Flat found for this image')
flatfile = cals['file'][flatind[0]]
if os.path.exists(flatfile):
flatim,flathead = fits.getdata(flatfile,0,header=True)
else:
raise ValueError('Library '+flatfile+' file NOT FOUND')
# Image input
else:
flatim = flat
# Check sizes
if flatim.shape != fim.shape:
raise ValueError('FLAT shape and image shape do not match')
# Do the correction
fim[mask==0] /= flatim[mask==0]
error[mask==0] /= flatim[mask==0] # need to divide error as well
fhead['FLATCOR'] = time.ctime()+' FLAT: mean %6.2f, stdev %6.2f' % (np.mean(flatim[mask==0]),np.std(flatim[mask==0]))
if verbose:
print(fhead['FLATCOR'])
# Already corrected
else:
print('Already FLAT corrected')
# Fix pix
#--------
# interpolate over bad pixels
if fix:
fim,mask,fhead = fixpix(fim,mask,fhead,verbose=verbose)
fhead['CCDPROC'] = time.ctime()+' CCD processing done'
if verbose:
print(time.ctime()+' CCD processing done')
# Convert images
fim = fim.astype(np.float32) # to 32 bit
error = error.astype(np.float32) # to 32 bit
# Write to output file
if outfile is not None:
# Use automatic name with suffix
if outfile is True:
if files[i] != '':
outdir = os.path.dirname(files[i])
if outdir=='': outdir='.'
outbase = os.path.basename(files[i])
if '.gz' in outbase:
outbase = outbase[:-3]
outbase,outext = os.path.splitext(outbase)
outfil = outdir+'/'+outbase+outsuffix+'.fits'
else:
outfil = 'inputimage'+outsuffix+'.fits'
else:
outfil = outfile
# Check if the output file exists already
if os.path.exists(outfil):
if clobber is False:
raise ValueError(outfil+' already exists and clobber=False')
else:
os.remove(outfil)
print('Writing processed file to '+outfil)
hdulist = fits.HDUList()
hdu = fits.PrimaryHDU(fim,fhead)
hdulist.append(hdu)
# Add error image
hdulist.append(fits.ImageHDU(error))
hdulist[1].header['BUNIT'] = 'error'
# Add mask image
hdulist.append(fits.ImageHDU(mask))
hdulist[2].header['BUNIT'] = 'mask'
hdulist[2].header['HISTORY'] = ' Mask values'
hdulist[2].header['HISTORY'] = ' 0: good'
hdulist[2].header['HISTORY'] = ' 1: bad pixel'
hdulist[2].header['HISTORY'] = ' 2: saturated'
hdulist[2].header['HISTORY'] = ' 4: interpolated'
hdulist.writeto(outfil,overwrite=clobber)
hdulist.close()
# Gzip compress
if compress:
if verbose:
print('Gzip compressing')
if os.path.exists(outfil+'.gz') and clobber:
os.remove(outfil+'.gz')
out = subprocess.run(['gzip',outfil])
else:
print(outfil+'.gz already exists and clobber=False')
return fim, fhead
[docs]def redrun(files):
"""
Automatically reduce an entire run of data.
"""
tab = ccdlist(files)
# Step 1) Reduce bias/zeros
#--------------------------
# Step 2) Make master zero
#--------------------------
# Step 3) Reduce darks
#---------------------
# Step 4) Make master dark
#-------------------------
# Step 5) Reduce flats
#---------------------
# Step 6) Make master flat
#-------------------------
# Step 7) Reduce science/object exposures
#----------------------------------------
pass
[docs]def autored(datadir='.'):
""" Automatically pick up FITS files from a directory and reduce."""
# While loop
count = 0
wait = 10
flag = 0
lastfiles = []
while (flag==0):
# Check directory for new files
files = glob(datadir+'/*.fit*')
nfiles = len(files)
# If there are new files then reduce them
if files!=lastfiles:
newfiles = [f for f in files if f not in lastfiles]
nnewfiles = len(newfiles)
print(time.ctime()+' Found '+str(nnewfiles)+' new files: '+','.join(newfiles))
for i in range(nnewfiles):
base = os.path.basename(newfiles[i])
outfile = datadir+'/red/'+base
out = ccdproc(newfiles[i],outfile=outfile,compress=True)
# Sleep for a while
time.sleep(wait)
# Last list of files
lastfiles = files