-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgenerate_images.py
180 lines (146 loc) · 6.87 KB
/
generate_images.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
"""Python generator synthetic image stream.
"""
import sys
import astropy.units as u
from image_util import generate_noiseless_image
from astropy.table import Table
from astropy.nddata import CCDData
from photutils.datasets import make_gaussian_sources_image
class ImageStream:
"""Python generator synthetic image stream.
Attributes:
curr_frame_num (int): current frame number
exptime (astropy.Quantity): Exposure time in u.seconds.
flash_info_dict (dict): Dictionary of flashes, frame number as key.
gunagala_config_filename (str): absolute path to gunagala configuration file.
imager_filter_name (str): Imaging filter name.
num_frames (int): Number of frames to be generated.
"""
# From https://treyhunner.com/2018/06/how-to-make-an-iterator-in-python/
def __init__(self,
gunagala_config_filename,
num_frames=10,
imager_filter_name='g',
exptime=0.005 * u.s,
flash_list=None,
snr_limit=3.
):
"""Init function to setup the generator.
Args:
gunagala_config_filename (str): absolute path to gunagala configuration file.
num_frames (int, optional): Number of frames to be generated.
imager_filter_name (str, optional): Imaging filter name.
exptime (astropy.Quantity, optional): Exposure time in u.seconds.
flash_list (list, optional): List of flash info tuples (x, y, S/N, frame #).
snr_limit (float, optional): Include stars with luminosities S/N > snr_limit.
"""
# following are not implemented yet
# fraction_of_field=1,
# fps=20,
# n_cameras=1,
self.exptime = exptime
self.curr_frame_num = 0
self.imager_filter_name = imager_filter_name
self.gunagala_config_filename = gunagala_config_filename
self.num_frames = num_frames
# prep flash info
self.flash_info_dict = {}
if flash_list is not None:
for flash_info in flash_list:
# (x_location, y_location, signal_to_noise, frame_no)
self.flash_info_dict[flash_info[-1]] = flash_info
print('Preparing noiseless image - this will take a minute or so')
self.noiseless_image, self.imager = generate_noiseless_image(
gunagala_config_filename=self.gunagala_config_filename,
exptime=self.exptime,
imager_filter_name=self.imager_filter_name,
snr_limit=snr_limit)
print('Created noiseless image - ready to run iterator')
def __iter__(self):
"""Access point to new generator instance.
Returns:
ImageStream: New instance of ImageStream.
"""
return self
def generate_flash_image(self, imager, flash_info):
"""Generate a single noiseless image containing only a single flash.
Args:
imager (gunagala.Imager): Instance of a gunagala imaging system.
flash_info (tuple): Tuple with flash info: (x, y, S/N, frame #).
Returns:
CCDData: Noiseless image containing a flash.
"""
x_location, y_location, signal_to_noise, _ = flash_info
flash_magnitude = imager.point_source_limit(total_exp_time=self.exptime,
sub_exp_time=self.exptime,
filter_name=self.imager_filter_name,
snr_target=signal_to_noise)
flash_rate = self.imager.ABmag_to_rate(
flash_magnitude, self.imager_filter_name)
table = Table()
table['amplitude'] = [flash_rate.value]
table['x_mean'] = [x_location]
table['y_mean'] = [y_location]
# PSF width = https://brainder.org/2011/08/20/gaussian-kernels-convert-fwhm-to-sigma/
table['x_stddev'] = [self.imager.psf.fwhm / 2.355]
table['y_stddev'] = [self.imager.psf.fwhm / 2.355]
table['theta'] = [0]
naxis1, naxis2 = self.imager.wcs.pixel_shape
flash_image = make_gaussian_sources_image(
(naxis2, naxis1), table) * flash_rate.unit / u.pixel
return(CCDData(flash_image, wcs=self.imager.wcs))
def __next__(self):
"""Main python generator routine. Returns
a noisey image with a flash if appropriate.
Returns:
CCDData: Noisy imaging data.
Raises:
StopIteration: When current frame is greater than requested frame numbers, stop loop.
"""
self.curr_frame_num += 1
if self.curr_frame_num > self.num_frames:
raise StopIteration
if self.curr_frame_num in self.flash_info_dict.keys():
# inject flash
flash_image = self.generate_flash_image(self.imager,
self.flash_info_dict[self.curr_frame_num])
flash_image.data = flash_image.data + self.noiseless_image.data
real_data = self.imager.make_image_real(flash_image,
self.exptime)
else:
real_data = self.imager.make_image_real(self.noiseless_image,
self.exptime)
return(real_data)
if __name__ == '__main__':
"""Test the stream generator. Produce 1 flash."""
import numpy as np
sys.path.append("/Users/lspitler/Documents/huntsman-ms/")
from find_blobs import main_from_arrays
import timeit
if len(sys.argv) < 2:
print(
'\n\nERROR, Usage:\npython generate_images.py [absolute_path_to_gunagala_configuration_file]\n\n')
sys.exit(1)
gunagala_config_filename = sys.argv[1]
# (x_location, y_location, signal_to_noise, frame_no)
flash_list = None
flash_list = [(1555, 1670, 100, 2)] # , (1000, 1000, 1000, 3)]
ref_image = None
for i, image in enumerate(ImageStream(num_frames=5,
gunagala_config_filename=gunagala_config_filename,
flash_list=flash_list)):
if ref_image is None:
ref_image = image
else:
if 1:
t = timeit.timeit(
"main_from_arrays(ref_image.data.astype(np.int16), image.data.astype(np.int16))", globals=globals(), number=10)
print(i, t)
else:
diff_image = CCDData((image.data.astype(np.int16) -
ref_image.data.astype(np.int16)) * image.unit, wcs=image.wcs)
n_sigmas = 3
noise = 5.5105687
print(len(diff_image.data[(diff_image.data > n_sigmas * noise)]))
diff_image.write(f'out_diff_{i+1}.fits', overwrite=True)
image.write(f'out_synth_{i+1}.fits', overwrite=True)