Source code for phobos.classes.cred3

import numpy as np
from .. import shm, SANDBOX_MODE


from ..utils import Singleton

[docs] class Cred3(metaclass=Singleton): """ Singleton Class to interface with the Cred3 camera via shared memory. The camera writes frames to a shared memory location that can be read by this class. Optionally, dark frames can be subtracted. Configuration is loaded from `phobos.config`. """ _instance = None def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super(Cred3, cls).__new__(cls) cls._instance._initialized = False return cls._instance
[docs] def __init__(self): """ Initialize the Cred3 camera interface using global configuration. """ if self._initialized: return self._initialized = True if SANDBOX_MODE: print("⛱️ [SANDBOX] Cred3 running in mock mode") # Load configuration import phobos self.img_shm_path = phobos.config.get('cred3.img_shm_path') self.dark_shm_path = phobos.config.get('cred3.dark_shm_path') self.semid = phobos.config.get('cred3.semid', 0) self.use_dark = phobos.config.get('cred3.use_dark', False) # Normalize output_centers output_centers = phobos.config.get('cred3.output_centers') self.output_centers = np.array(output_centers) if output_centers else None # Normalize output_sizes output_sizes = phobos.config.get('cred3.output_sizes') if self.output_centers is not None and isinstance(output_sizes, (int, float)): self.output_sizes = [output_sizes] * len(self.output_centers) else: self.output_sizes = output_sizes # Normalize bulk_center bulk_center = phobos.config.get('cred3.bulk_center') self.bulk_center = np.array(bulk_center) if bulk_center else None self.bulk_size = phobos.config.get('cred3.bulk_size') # Initialize shared memory for camera self.cam = shm(self.img_shm_path, nosem=False) self.cam.catch_up_with_sem(self.semid) # Initialize dark frame if needed self.dark_shm_obj = None if self.use_dark: try: self.dark_shm_obj = shm(self.dark_shm_path) self.dark = self.dark_shm_obj.get_latest_data() mode_prefix = "⛱️ [SANDBOX] " if SANDBOX_MODE else "" print(f"{mode_prefix}Cred3 camera initialized with dark subtraction") except Exception as e: print(f"⚠️ Could not load dark frame: {e}") self.dark = None else: self.dark = None mode_prefix = "⛱️ [SANDBOX] " if SANDBOX_MODE else "" print(f"{mode_prefix}Cred3 camera initialized without dark subtraction")
[docs] def get_image(self, subtract_dark: bool = True, stack: int = 1) -> np.ndarray: """ Get the latest image from the shared memory. Parameters ---------- subtract_dark : bool, optional If True, subtract the dark frame. If None, uses the default set during initialization. Default is True. stack : int, optional Number of frames to stack for averaging. Default is 1. Returns ------- img : ndarray Latest camera frame, optionally dark-subtracted. Examples -------- >>> camera = Cred3() >>> img = camera.get_image() >>> img_no_dark = camera.get_image(subtract_dark=False) """ img = np.zeros_like(self.cam.get_latest_data(self.semid)) for _ in range(stack): img += self.cam.get_latest_data(self.semid) img = img / stack # Determine whether to subtract dark if subtract_dark is None: subtract_dark = self.use_dark if subtract_dark: # Update dark frame if possible if self.dark_shm_obj is not None: try: self.dark = self.dark_shm_obj.get_latest_data() except Exception: pass # Keep using existing dark if update fails if self.dark is not None: img = img - self.dark return img
def _crop_regions(self, img: np.ndarray, crop_centers: np.ndarray, crop_sizes) -> list[np.ndarray]: """ Internal helper to crop regions from an image. Parameters ---------- img : ndarray Input image to crop from. crop_centers : ndarray Array of (x, y) coordinates for crop centers, shape (N, 2). crop_sizes : int or array-like Size(s) of the crop windows. Returns ------- crops : list of ndarray List of cropped sub-images. """ img = np.transpose(img) # Transpose to match (x, y) indexing # Handle crop_sizes - convert to array n_outputs = crop_centers.shape[0] if isinstance(crop_sizes, (int, float)): crop_sizes_array = np.full(n_outputs, int(crop_sizes)) else: crop_sizes_array = np.array(crop_sizes) if len(crop_sizes_array) != n_outputs: raise ValueError(f"crop_sizes length ({len(crop_sizes_array)}) must match " f"number of centers ({n_outputs})") # Compute crops for each output crops = [] for i in range(n_outputs): x_center, y_center = crop_centers[i] crop_size = crop_sizes_array[i] half_size = crop_size // 2 # Define crop boundaries x1 = int(x_center - half_size) x2 = int(x1 + crop_size) y1 = int(y_center - half_size) y2 = int(y1 + crop_size) # Extract crop try: crop = img[x1:x2, y1:y2] crops.append(crop.T) except IndexError: print(f"⚠️ Crop region {i} outside image boundaries") crops.append(np.zeros((crop_size, crop_size))) return [crop.copy() for crop in crops]
[docs] def crop_outputs_from_image(self, img: np.ndarray) -> list[np.ndarray]: """ Crop output regions from an image using configured centers and sizes. Parameters ---------- img : ndarray Input image to crop from. Returns ------- crops : list of ndarray List of cropped sub-images for each configured output. """ if self.output_centers is None: raise ValueError("output_centers not configured in phobos.config") return self._crop_regions(img, self.output_centers, self.output_sizes)
[docs] def get_outputs(self, subtract_dark: bool = True, flux_mode: str = 'mean', stack=1, ) -> np.ndarray: """ Get the flux around configured output centers. This method crops regions around the output centers defined in the configuration and returns the flux in each region. The flux can be the mean pixel value or the sum of pixel values. Parameters ---------- subtract_dark : bool, optional Whether to subtract dark frame. Default is True. flux_mode : str, optional, 'mean' or 'sum' Method to compute the flux. 'mean': average of pixel values (default) 'sum': sum of pixel values stack : int, optional Number of frames to stack for averaging. Default is 1. Returns ------- flux : ndarray Flux in each cropped region, shape (N,). Examples -------- >>> camera = Cred3() >>> # Get averaged outputs from configured centers >>> flux = camera.get_outputs() >>> >>> # Get integrated flux (sum) >>> flux_sum = camera.get_outputs(flux_mode='sum') """ # Use configured centers and sizes crop_centers = self.output_centers crop_sizes = self.output_sizes if crop_centers is None: raise ValueError("output_centers not configured. Please set them in phobos.config") # Get the latest image img = self.get_image(subtract_dark=subtract_dark, stack=stack) crops = self.crop_outputs_from_image(img) # Compute flux flux = np.zeros(len(crops)) for i, crop in enumerate(crops): if flux_mode == 'sum': flux[i] = np.sum(crop) elif flux_mode == 'mean': flux[i] = np.mean(crop) else: raise ValueError(f"Unknown flux_mode: {flux_mode}. Use 'mean' or 'sum'.") return flux
[docs] def get_bulk(self, subtract_dark: bool = True, flux_mode: str = 'mean') -> float: """ Get the flux of the bulk channel using configured center and size. Parameters ---------- subtract_dark : bool, optional Whether to subtract dark frame. Default is True. flux_mode : str, optional 'mean' or 'sum'. Default is 'mean'. Returns ------- flux : float Flux in the bulk region. """ # Use configured bulk values if self.bulk_center is None: raise ValueError("bulk_center not configured in phobos.config") crop_center = self.bulk_center crop_size = self.bulk_size # Get the latest image img = self.get_image(subtract_dark=subtract_dark) # Use internal helper for cropping centers = np.array([crop_center]) crops = self._crop_regions(img, centers, crop_size) if not crops: return 0.0 crop = crops[0] if flux_mode == 'sum': return float(np.sum(crop)) elif flux_mode == 'mean': return float(np.mean(crop)) else: raise ValueError(f"Unknown flux_mode: {flux_mode}")
[docs] def update_dark(self, dark_shm_path: str = None): """ Update the dark frame from shared memory. Parameters ---------- dark_shm_path : str, optional Shared memory path for the new dark frame. If None, uses the default dark_shm_path. Default is None. Examples -------- >>> camera = Cred3() >>> camera.update_dark() # Reload dark from default location """ if dark_shm_path is None: dark_shm_path = self.dark_shm_path try: dk = shm(dark_shm_path) self.dark = dk.get_latest_data() self.use_dark = True print(f"Dark frame updated from {dark_shm_path}") except Exception as e: print(f"⚠️ Could not update dark frame: {e}")
[docs] def take_darks(self, nb_frames: int = 1000, save_to_shm: bool = True) -> np.ndarray: """ Acquire dark frames and compute the average. This method acquires a specified number of frames, computes their average, and optionally saves the result to shared memory for use as a dark frame. Parameters ---------- nb_frames : int, optional Number of frames to acquire and average. Default is 1000. save_to_shm : bool, optional If True, save the averaged dark frame to shared memory at self.dark_shm_path. Default is True. Returns ------- dark_mean : ndarray The averaged dark frame. Notes ----- - Ensure the camera source is turned OFF before acquiring darks - The method checks for late frames (when semaphore value > 0) - The dark frame is automatically loaded if save_to_shm is True Examples -------- >>> camera = Cred3() >>> # Acquire 100 dark frames (default) >>> dark = camera.take_darks() >>> >>> # Acquire 50 frames without saving to shared memory >>> dark = camera.take_darks(nb_frames=50, save_to_shm=False) """ try: from tqdm import tqdm use_tqdm = True except ImportError: use_tqdm = False print("⚠️ tqdm not available, progress bar disabled") if SANDBOX_MODE: print(f"⛱️ [SANDBOX] Simulating acquisition of {nb_frames} dark frames") # In sandbox mode, generate random dark frames import time time.sleep(0.5) # Simulate acquisition time dark_mean = np.random.randint(0, 100, size=(320, 256), dtype=np.uint16) print(f"⛱️ [SANDBOX] Dark acquisition complete") return dark_mean # images = [] log_sem = [] print(f"Taking {nb_frames} dark frames...") # Acquire frames iterator = tqdm(range(nb_frames)) if use_tqdm else range(nb_frames) images = np.zeros_like(self.cam.get_latest_data(self.semid)) # Catch up with semaphore before starting self.cam.catch_up_with_sem(self.semid) for ii in iterator: img = self.cam.get_latest_data(self.semid) images += img semval = self.cam.sems[self.semid].value log_sem.append(semval) # images = np.array(images) print('Acquisition complete') # Check for late frames log_sem = np.array(log_sem) if np.any(log_sem > 0): mask = np.where(log_sem > 0)[0] print(f'⚠️ Total late frames: {len(mask)}') # Compute average dark # dark_mean = images.mean(axis=0) dark_mean = images / nb_frames # Save to shared memory if requested if save_to_shm: print(f'Saving dark frame to {self.dark_shm_path}...') try: dark_shm_obj = shm(self.dark_shm_path, data=dark_mean) print(' Dark frame saved to shared memory') # Update the internal dark frame self.dark = dark_mean self.dark_shm_obj = dark_shm_obj self.use_dark = True except Exception as e: print(f"⚠️ Could not save dark to shared memory: {e}") return dark_mean
[docs] def check_cropping(self, subtract_dark: bool = True): """ Display the cropped output regions to verify they well capture the output signals. Acquires a frame and shows each cropped output as a subplot with a shared color scale, useful for checking that ``output_centers`` and ``output_sizes`` are correctly configured. Parameters ---------- subtract_dark : bool, optional Whether to subtract the dark frame. Default is True. Examples -------- >>> camera = Cred3() >>> camera.check_cropping() """ import matplotlib.pyplot as plt img = self.get_image(subtract_dark=subtract_dark) crops = self.crop_outputs_from_image(img) n = len(crops) ncols = 1 if n == 1 else 2 nrows = (n + ncols - 1) // ncols vmax = max(crop.max() for crop in crops) plt.figure(figsize=(10, 10)) for i, crop in enumerate(crops): plt.subplot(nrows, ncols, i + 1) plt.title(f'Channel {i}') plt.imshow(crop, origin='lower', cmap='jet', vmin=0, vmax=vmax) plt.colorbar() plt.tight_layout() plt.show()
[docs] def close(self): """ Close the shared memory connections. This method is provided for compatibility but shared memory objects are typically managed by the system. """ # Shared memory objects don't need explicit closing in xaosim print("Cred3 camera interface closed")
[docs] def reset(self): """ Reset the camera settings (use_dark, outputs strategies) to the configuration. """ # Reload configuration import phobos self.use_dark = phobos.config.get('cred3.use_dark', False) # Re-apply dark mechanism if self.use_dark: if self.dark_shm_obj is None: try: self.dark_shm_obj = shm(self.dark_shm_path) self.dark = self.dark_shm_obj.get_latest_data() except Exception: pass else: self.dark = None # Restore crop settings output_centers = phobos.config.get('cred3.output_centers') self.output_centers = np.array(output_centers) if output_centers else None output_sizes = phobos.config.get('cred3.output_sizes') if self.output_centers is not None and isinstance(output_sizes, (int, float)): self.output_sizes = [output_sizes] * len(self.output_centers) else: self.output_sizes = output_sizes bulk_center = phobos.config.get('cred3.bulk_center') self.bulk_center = np.array(bulk_center) if bulk_center else None self.bulk_size = phobos.config.get('cred3.bulk_size') print(" Cred3 reset to configuration defaults.")