46 import astropy.io.fits
as pfits
47 from tqdm
import trange, tqdm
50 from typing
import List, Iterable
54 """ This class implements generic supervisor to handle compass simulation
57 context : (CarmaContext) : a CarmaContext instance
59 config : (config) : Parameters structure
61 telescope : (TelescopeComponent) : a TelescopeComponent instance
63 atmos : (AtmosComponent) : An AtmosComponent instance
65 target : (targetComponent) : A TargetComponent instance
67 wfs : (WfsComponent) : A WfsComponent instance
69 dms : (DmComponent) : A DmComponent instance
71 rtc : (RtcComponent) : A Rtc component instance
73 is_init : (bool) : Flag equals to True if the supervisor has already been initialized
75 iter : (int) : Frame counter
77 cacao : (bool) : CACAO features enabled in the RTC
79 def __init__(self, config, cacao : bool=
False):
80 """ Instantiates a CompassSupervisor object
83 config: (config module) : Configuration module
85 cacao : (bool, optional) : If True, enables CACAO features in RTC (Default is False)
86 /!\ Requires OCTOPUS to be installed
89 GenericSupervisor.__init__(self, config)
99 """Initialize the telescope component of the supervisor as a TelescopeCompass
103 def _init_atmos(self):
104 """Initialize the atmosphere component of the supervisor as a AtmosCompass
109 """Initialize the DM component of the supervisor as a DmCompass
113 def _init_target(self):
114 """Initialize the target component of the supervisor as a TargetCompass
116 if self.
tel is not None:
119 raise ValueError(
"Configuration not loaded or Telescope not initilaized")
122 """Initialize the wfs component of the supervisor as a WfsCompass
124 if self.
tel is not None:
127 raise ValueError(
"Configuration not loaded or Telescope not initilaized")
130 """Initialize the rtc component of the supervisor as a RtcCompass
132 if self.
wfs is not None:
135 raise ValueError(
"Configuration not loaded or Telescope not initilaized")
137 def next(self, *, move_atmos: bool =
True, nControl: int = 0,
138 tar_trace: Iterable[int] =
None, wfs_trace: Iterable[int] =
None,
139 do_control: bool =
True, apply_control: bool =
True, compute_tar_psf: bool =
True) ->
None:
140 """Iterates the AO loop, with optional parameters.
142 Overload the GenericSupervisor next() method to handle the GEO controller
143 specific raytrace order operations
146 move_atmos: (bool), optional: move the atmosphere for this iteration. Default is True
148 nControl: (int, optional): Controller number to use. Default is 0 (single control configuration)
150 tar_trace: (List, optional): list of targets to trace. None is equivalent to all (default)
152 wfs_trace: (List, optional): list of WFS to trace. None is equivalent to all (default)
154 do_control : (bool, optional) : Performs RTC operations if True (Default)
156 apply_control: (bool): (optional) if True (default), apply control on DMs
158 compute_tar_psf : (bool, optional) : If True (default), computes the PSF at the end of the iteration
160 if (self.
config.p_controllers
is not None and
161 self.
config.p_controllers[nControl].type == scons.ControllerType.GEO):
162 if tar_trace
is None and self.
target is not None:
163 tar_trace = range(len(self.
config.p_targets))
164 if wfs_trace
is None and self.
wfs is not None:
165 wfs_trace = range(len(self.
config.p_wfss))
167 if move_atmos
and self.
atmos is not None:
168 self.
atmos.move_atmos()
170 if tar_trace
is not None:
172 if self.
atmos.is_enable:
175 self.
target.raytrace(t, tel=self.
tel, ncpa=
False)
177 if do_control
and self.
rtc is not None:
178 self.
rtc.do_control(nControl, sources=self.
target.sources)
179 self.
target.raytrace(t, dms=self.
dms, ncpa=
True, reset=
False)
181 self.
rtc.apply_control(nControl)
185 for tar_index
in tar_trace:
186 self.
target.comp_tar_image(tar_index)
187 self.
target.comp_strehl(tar_index)
192 GenericSupervisor.next(self, move_atmos=move_atmos, nControl=nControl,
193 tar_trace=tar_trace, wfs_trace=wfs_trace,
194 do_control=do_control, apply_control=apply_control, compute_tar_psf=compute_tar_psf)
202 self, cb_count: int, projection_matrix : np.ndarray, sub_sample: int = 1, controller_index: int = 0,
203 tar_index: int = 0, see_atmos: bool =
True, cube_data_type: str =
None,
204 cube_data_file_path: str =
"", ncpa: int = 0, ncpa_wfs: np.ndarray =
None,
205 ref_slopes: np.ndarray =
None, ditch_strehl: bool =
True):
206 """ Used to record a synchronized circular buffer AO loop data.
209 cb_count: (int) : the number of iterations to record.
211 projection_matrix : (np.ndarray) : projection matrix on modal basis to compute residual coefficients
213 sub_sample: (int) : sub sampling of the data (default=1, I.e no subsampling)
215 controller_index: (int) :
217 tar_index: (int) : target number
219 see_atmos: (int) : used for the next function to enable or not the Atmos
221 cube_data_type: (int) : if specified ("tarPhase" or "psfse") returns the target phase or short exposure PSF data cube in the output variable
223 cube_data_file_path: (int) : if specified it will also save the target phase cube data (full path on the server)
225 ncpa: (int) : !!experimental!!!: Used only in the context of PYRWFS + NCPA compensation on the fly (with optical gain)
226 defines how many iters the NCPA refslopes are updates with the proper optical gain. Ex: if NCPA=10 refslopes will be updates every 10 iters.
228 ncpa_wfs: (int) : the ncpa phase as seen from the wfs array with dims = size of Mpupil
230 ref_slopes: (int) : the reference slopes to use.
232 ditch_strehl: (int) : resets the long exposure SR computation at the beginning of the Circular buffer (default= True)
235 slopes: (int) : the slopes CB
237 volts: (int) : the volts applied to the DM(s) CB
239 ai: (int) : the modal coefficient of the residual phase projected on the currently used modal Basis
241 psf_le: (int) : Long exposure PSF over the <cb_count> iterations (I.e SR is reset at the begining of the CB if ditch_strehl=True)
243 sthrel_se_list: (int) : The SR short exposure evolution during CB recording
245 sthrel_le_list: (int) : The SR long exposure evolution during CB recording
247 g_ncpa_list: (int) : the gain applied to the NCPA (PYRWFS CASE) if NCPA is set to True
249 cube_data: (int) : the tarPhase or psfse cube data (see cube_data_type)
263 for i
in range(len(self.
config.p_targets)):
264 self.
target.reset_strehl(i)
267 for j
in range(cb_count):
271 ncpa_diff = ref_slopes[
None, :]
272 ncpa_turbu = self.
calibration.do_imat_phase(controller_index,
273 -ncpa_wfs[
None, :, :], noise=
False,
277 np.dot(ncpa_diff, ncpa_diff.T) / np.dot(
278 ncpa_turbu, ncpa_turbu.T)))
281 print(
'Warning NCPA ref slopes gain too high!')
282 g_ncpa_list.append(g_ncpa)
283 self.
rtc.set_ref_slopes(-ref_slopes * g_ncpa)
285 g_ncpa_list.append(g_ncpa)
286 print(
'NCPA ref slopes gain: %4.3f' % g_ncpa)
287 self.
rtc.set_ref_slopes(-ref_slopes / g_ncpa)
289 self.
atmos.enable_atmos(see_atmos)
291 for t
in range(len(self.
config.p_targets)):
292 self.
target.comp_tar_image(t)
294 srse, srle, _, _ = self.
target.get_strehl(tar_index)
295 sthrel_se_list.append(srse)
296 sthrel_le_list.append(srle)
297 if (j % sub_sample == 0):
298 ai_vector = self.
calibration.compute_modal_residuals(projection_matrix, selected_actus=self.
basis.selected_actus)
299 if (ai_data
is None):
300 ai_data = np.zeros((len(ai_vector), int(cb_count / sub_sample)))
301 ai_data[:, k] = ai_vector
303 slopes_vector = self.
rtc.get_slopes(controller_index)
304 if (slopes_data
is None):
305 slopes_data = np.zeros((len(slopes_vector),
306 int(cb_count / sub_sample)))
307 slopes_data[:, k] = slopes_vector
309 volts_vector = self.
rtc.get_command(controller_index)
310 if (volts_data
is None):
311 volts_data = np.zeros((len(volts_vector),
312 int(cb_count / sub_sample)))
313 volts_data[:, k] = volts_vector
316 if (cube_data_type ==
"tarPhase"):
317 dataArray = self.
target.get_tar_phase(tar_index, pupil=
True)
318 elif (cube_data_type ==
"psfse"):
319 dataArray = self.
target.get_tar_image(tar_index, expo_type=
"se")
321 raise ValueError(
"unknown dataData" % cube_data_type)
322 if (cube_data
is None):
323 cube_data = np.zeros((*dataArray.shape,
324 int(cb_count / sub_sample)))
325 cube_data[:, :, k] = dataArray
327 if (cube_data_file_path !=
""):
328 print(
"Saving tarPhase cube at: ", cube_data_file_path)
329 pfits.writeto(cube_data_file_path, cube_data, overwrite=
True)
331 psf_le = self.
target.get_tar_image(tar_index, expo_type=
"le")
332 return slopes_data, volts_data, ai_data, psf_le, sthrel_se_list, sthrel_le_list, g_ncpa_list, cube_data