47 from typing
import Callable
52 def __init__(self, config_file: str =
None, brahma: bool =
False,
54 """ Init the COMPASS wih the config_file
57 config_file : (str, optional) : path to the configuration file
59 brahma : (bool, optional) : Flag to use brahma
61 cacao : (bool, optional) : Flag to use cacao rtc
71 if config_file
is not None:
87 """ Performs a single loop iteration
97 def get_tar_image(self, tar_index, expo_type: str =
"se") -> np.ndarray:
100 raise NotImplementedError(
"Not implemented")
102 def set_command(self, nctrl: int, command: np.ndarray) ->
None:
103 """ Immediately sets provided command to DMs - does not affect integrator
106 nctrl : (int) : Controller index (unused)
108 command : (np.ndarray) : Command vector to send
116 """ Get command from DM
119 command : (np.ndarray) : Command vector
137 s =
'--- BenchSupervisor ---\nRTC: ' + repr(self.
rtc)
138 if hasattr(self,
'_cam'):
139 s +=
'\nCAM: ' + repr(self._cam)
140 if hasattr(self,
'_dm'):
141 s +=
'\nDM: ' + repr(self._dm)
145 """ Acquire a new WFS frame and load it
148 centro_index : (int) : Index of the centroider where to load the frame
151 if (type(self.
frame)
is tuple):
152 centro_index = len(self.
frame)
153 for i
in range(centro_index):
154 self.
rtc.d_centro[i].load_img(self.
frame[i], self.
frame[i].shape[0],
155 self.
frame[i].shape[1])
157 self.
rtc.d_centro[centro_index].load_img(self.
frame, self.
frame.shape[0],
161 """ Compute the WFS frame: calibrate, centroid, commands.
164 for centro
in self.
rtc.d_centro:
165 centro.calibrate_img()
166 self.
rtc.do_centroids(0)
167 self.
rtc.do_control(0)
168 self.
rtc.do_clipping(0)
169 self.
rtc.comp_voltage(0)
171 def set_one_actu(self, nctrl: int, nactu: int, ampli: float = 1,
172 reset: bool =
True) ->
None:
173 """ Push the selected actuator
176 nctrl : (int) : controller index
178 nactu : (int) : actuator index to push
180 ampli : (float, optional) : amplitude to apply. Default is 1 volt
182 reset : (bool) : reset the previous command vector. Default is True
187 command[nactu] = ampli
191 """ Active all the GPU devices specified in the parameters file
192 Required for using with widgets, due to multithreaded init
193 and in case GPU 0 is not used by the simu
196 current_device = self.
context.active_device
197 for device
in range(len(self.config.p_loop.devices)):
204 if hasattr(self,
'_dm'):
208 """ Reset the nctrl Controller command buffer, reset all controllers if nctrl == -1
211 nctrl : (int, optional) : Controller index. If -1 (default), all controllers commands are reset
214 for control
in self.
rtc.d_control:
215 control.d_com.reset()
217 self.
rtc.d_control[nctrl].d_com.reset()
219 def load_config(self, config_file: str =
None) ->
None:
220 """ Init the COMPASS with the config_file
223 config_file : (str) : path to the configuration file
226 load_config_from_file(self, config_file)
229 """ Set the externally defined function that allows to grab frames
232 cam_callback : (Callable) : function that allows to grab frames
236 def set_dm_callback(self, dm_get_callback: Callable, dm_set_callback: Callable):
237 """ Set the externally defined function that allows to communicate with the DM
240 dm_get_callback : (Callable) : function that allows to retrieve commands
241 dm_set_callback : (Callable) : function that allows to set commands
246 def init(self) -> None:
247 """ Initialize the bench
253 if (hasattr(self.config,
'p_loop')
and self.config.p_loop.devices.size > 1):
254 self.
context = carmaWrap_context.get_instance_ngpu(
255 self.config.p_loop.devices.size, self.config.p_loop.devices)
257 self.
context = carmaWrap_context.get_instance_1gpu(
258 self.config.p_loop.devices[0])
259 nact = self.config.p_controllers[0].nactu
272 if self.config.p_wfss[wfs].type == WFSType.SH:
273 self.
_npix.append(self.config.p_wfss[wfs].npix)
274 if self.config.p_wfss[wfs]._validsubsx
is None or \
275 self.config.p_wfss[wfs]._validsubsy
is None:
277 from hraa.tools.doit
import makessp
278 roiTab = makessp(self.config.p_wfss[wfs].nxsub, obs=0., rmax=0.98)
279 self.config.p_wfss[wfs]._nvalid = roiTab[0].size
281 wfs]._validsubsx = roiTab[0] * self.config.p_wfss[wfs].npix
283 wfs]._validsubsy = roiTab[1] * self.config.p_wfss[wfs].npix
285 self.config.p_wfss[wfs]._nvalid = self.config.p_wfss[
286 wfs]._validsubsx.size
289 np.array([self.config.p_wfss[wfs]._nvalid], dtype=np.int32))
292 self.
_delay.append(self.config.p_controllers[0].delay)
293 self.
_offset.append((self.config.p_wfss[wfs].npix - 1) / 2)
298 elif self.config.p_wfss[wfs].type == WFSType.PYRHR
or self.config.p_wfss[
299 wfs].type == WFSType.PYRLR:
301 np.array([self.config.p_wfss[wfs]._nvalid],
304 self.
_delay.append(self.config.p_controllers[0].delay)
309 self.config.p_wfss[wfs].nPupils * self.
_nvalid[wfs][0])
312 raise ValueError(
'WFS type not supported')
319 self.
slopes_index = np.cumsum([0] + [wfs.nslopes
for wfs
in self.
rtc.d_centro])
323 self.
rtc.d_centro[wfs].load_validpos(
324 self.config.p_wfss[wfs]._validsubsx,
325 self.config.p_wfss[wfs]._validsubsy,
326 self.config.p_wfss[wfs]._validsubsx.size)
327 if self.config.p_centroiders[wfs].type
is CentroiderType.BPCOG:
328 self.
rtc.d_centro[wfs].set_nmax(self.config.p_centroiders[wfs].nmax)
329 self.
rtc.d_centro[wfs].set_npix(self.
_npix[wfs])
331 self.config.p_centroiders[wfs]._nslope = self.
rtc.d_centro[wfs].nslopes
334 cMat = np.zeros((nact, size), dtype=np.float32)
335 print(
"Size of cMat:", cMat.shape)
338 self.
rtc.d_control[0].set_cmat(cMat)
339 self.
rtc.d_control[0].set_decayFactor(
340 np.ones(nact, dtype=np.float32) * (self.
_gain[0] - 1))
341 self.
rtc.d_control[0].set_matE(np.identity(nact, dtype=np.float32))
342 self.
rtc.d_control[0].set_modal_gains(
343 np.ones(nact, dtype=np.float32) * -self.
_gain[0])
345 print(
"RTC initialized")
348 """ Re-centre the centroiding boxes around the spots, and loads
349 the new box coordinates in the slopes computation supervisor
353 init_config : (bool): Flag to reset to the default positions of boxes. Default is False
355 centro_index : (int) : centroider index
359 ij_subap = self.config.p_wfss[centro_index].get_validsub()
360 nsubap = ij_subap.shape[1]
361 self.
rtc.d_centro[centro_index].load_validpos(ij_subap[0], ij_subap[1],
367 for i
in range(nslopes):
370 s = s + self.get_slopes()[self.
slopes_index[centro_index]:self.
371 slopes_index[centro_index + 1]]
375 i_subap = np.array(self.
rtc.d_centro[centro_index].d_validx)
376 j_subap = np.array(self.
rtc.d_centro[centro_index].d_validy)
378 nsubap = i_subap.shape[0]
380 s = np.resize(s, (2, nsubap))
382 new_i_subap = (i_subap + s[0, :].round()).astype(int)
383 new_j_subap = (j_subap + s[1, :].round()).astype(int)
385 self.
rtc.d_centro[centro_index].load_validpos(new_i_subap, new_j_subap,
389 """ Returns the currently used subapertures positions
392 centro_index : (int) : Index of the centroider
395 current_pos : (tuple) : (i_subap, j_subap)
397 i_subap = np.array(self.
rtc.d_centro[centro_index].d_validx)
398 j_subap = np.array(self.
rtc.d_centro[centro_index].d_validy)
399 return i_subap, j_subap
402 """ Return the index of the first position of each WFS slopes vector
403 inside the global RTC slopes vector
406 slopes_index : (np.ndarray) : Slopes index