diff --git a/dlclivegui/cameras/backends/aravis_backend.py b/dlclivegui/cameras/backends/aravis_backend.py index b437c3c..60059c4 100644 --- a/dlclivegui/cameras/backends/aravis_backend.py +++ b/dlclivegui/cameras/backends/aravis_backend.py @@ -52,6 +52,8 @@ def __init__(self, settings): self._actual_width: int | None = None self._actual_height: int | None = None self._actual_fps: float | None = None + self._camera_pixel_format: str | None = None + self._actual_output_format: str | None = None self._camera = None self._stream = None @@ -69,6 +71,16 @@ def actual_fps(self) -> float | None: """Return the actual frame rate of the camera after opening.""" return self._actual_fps + @property + def actual_pixel_format(self) -> str | None: + """Camera/native pixel format requested/reported for Aravis.""" + return self._camera_pixel_format or self._pixel_format + + @property + def actual_output_format(self) -> str | None: + """Current Aravis backend emits BGR uint8 frames.""" + return self._actual_output_format or "BGR8" + @classmethod def is_available(cls) -> bool: """Check if Aravis is available on this system.""" @@ -615,10 +627,12 @@ def _configure_pixel_format(self) -> None: if self._pixel_format in format_map: self._camera.set_pixel_format(format_map[self._pixel_format]) + self._camera_pixel_format = self._pixel_format LOG.info(f"Pixel format set to '{self._pixel_format}'") else: # Try setting as string self._camera.set_pixel_format_from_string(self._pixel_format) + self._camera_pixel_format = self._pixel_format LOG.info(f"Pixel format set to '{self._pixel_format}' (from string)") except Exception as e: LOG.warning(f"Failed to set pixel format '{self._pixel_format}': {e}") diff --git a/dlclivegui/cameras/backends/basler_backend.py b/dlclivegui/cameras/backends/basler_backend.py index 02e1052..0f54e97 100644 --- a/dlclivegui/cameras/backends/basler_backend.py +++ b/dlclivegui/cameras/backends/basler_backend.py @@ -9,7 +9,7 @@ import numpy as np -from ...config import SINGLE_CAMERA_WORKER_DO_LOG_TIMING, CameraTriggerSettings +from ...config import BASLER_DO_LOG_TIMING, CameraTriggerSettings from ...utils.stats import WorkerTimingStats from ..base import CameraBackend, SupportLevel, register_backend @@ -45,6 +45,11 @@ def __init__(self, settings): super().__init__(settings) self._props: dict = settings.properties if isinstance(settings.properties, dict) else {} + self._preserve_mono: bool = bool( + getattr(settings, "preserve_mono", False) or self.ns.get("preserve_mono", False) + ) + self._camera_pixel_format: str | None = None + self._logged_first_frame: bool = False # Optional fast-start hint for probe workers # (may skip StartGrabbing and converter setup for faster capability probing; not suitable for normal capture) @@ -116,7 +121,7 @@ def __init__(self, settings): timing_id, logger=LOG, log_interval=1.0, - enabled=SINGLE_CAMERA_WORKER_DO_LOG_TIMING, + enabled=BASLER_DO_LOG_TIMING, ) @property @@ -137,6 +142,24 @@ def actual_exposure(self) -> float | None: def actual_gain(self) -> float | None: return self._actual_gain + @property + def actual_pixel_format(self) -> str | None: + """Camera/native pixel format reported by Basler, e.g. 'Mono8'.""" + return self._camera_pixel_format + + @property + def actual_output_format(self) -> str | None: + """Backend output frame format emitted to the app, e.g. 'Mono8' or 'BGR8'.""" + if not self._camera_pixel_format: + return None + return "Mono8" if self._should_output_mono() else "BGR8" + + @property + def recommended_preserve_mono(self) -> bool | None: + if not self._camera_pixel_format: + return None + return self._is_camera_mono() + @classmethod def is_available(cls) -> bool: return pylon is not None @@ -153,6 +176,7 @@ def static_capabilities(cls) -> dict[str, SupportLevel]: "device_discovery": SupportLevel.BEST_EFFORT, "stable_identity": SupportLevel.SUPPORTED, "hardware_trigger": SupportLevel.BEST_EFFORT, + "preserve_mono": SupportLevel.SUPPORTED, } ) return caps @@ -179,6 +203,17 @@ def _ensure_mutable_ns(self) -> dict: self.settings.properties[self.OPTIONS_KEY] = ns return ns + def _read_camera_pixel_format(self) -> str: + pixel_format = self._feature_value(self._feature("PixelFormat"), "") + self._camera_pixel_format = str(pixel_format or "") + return self._camera_pixel_format + + def _is_camera_mono(self) -> bool: + return bool(self._camera_pixel_format and self._camera_pixel_format.startswith("Mono")) + + def _should_output_mono(self) -> bool: + return bool(self._preserve_mono and self._is_camera_mono()) + @classmethod def _enumerate_devices_cls(cls): """Enumerate DeviceInfo entries (unit-testable via monkeypatch).""" @@ -452,6 +487,37 @@ def _configure_frame_rate(self) -> None: except Exception: self._actual_fps = None + def _configure_converter(self) -> None: + """Configure pypylon image converter. + + Default behavior remains BGR8 for compatibility. + + If preserve_mono=True and the camera PixelFormat is Mono*, + return Mono8 frames as 2D arrays to avoid 3x BGR expansion. + """ + if self._camera is None: + return + + camera_pixel_format = self._camera_pixel_format or self._read_camera_pixel_format() + + self._converter = pylon.ImageFormatConverter() + self._converter.OutputBitAlignment = pylon.OutputBitAlignment_MsbAligned + + if self._should_output_mono(): + self._converter.OutputPixelFormat = pylon.PixelType_Mono8 + LOG.info( + "[Basler] Converter configured for Mono8 output (camera PixelFormat=%s preserve_mono=%s)", + camera_pixel_format, + self._preserve_mono, + ) + else: + self._converter.OutputPixelFormat = pylon.PixelType_BGR8packed + LOG.info( + "[Basler] Converter configured for BGR8 output (camera PixelFormat=%s preserve_mono=%s)", + camera_pixel_format, + self._preserve_mono, + ) + def open(self) -> None: if pylon is None: raise RuntimeError("pypylon is required for the Basler backend but is not installed") @@ -537,6 +603,8 @@ def open(self) -> None: except Exception: self._actual_gain = None + self._read_camera_pixel_format() + # ---------------------------- # Start acquisition (skip for fast probe) # ---------------------------- @@ -549,9 +617,7 @@ def open(self) -> None: pass # Converter BEFORE StartGrabbing - self._converter = pylon.ImageFormatConverter() - self._converter.OutputPixelFormat = pylon.PixelType_BGR8packed - self._converter.OutputBitAlignment = pylon.OutputBitAlignment_MsbAligned + self._configure_converter() # Force stream configuration reset try: @@ -627,6 +693,20 @@ def read(self) -> tuple[np.ndarray, float]: with self._timing.measure("Basler.get_array"): frame = image.GetArray() + if not self._logged_first_frame: + self._logged_first_frame = True + LOG.info( + "[Basler] first frame device_id=%s shape=%s dtype=%s nbytes=%.2f MB " + "camera_pixel_format=%s output_format=%s preserve_mono=%s", + self._device_id, + frame.shape, + frame.dtype, + frame.nbytes / (1024 * 1024), + self._camera_pixel_format, + self.actual_output_format, + self._preserve_mono, + ) + with self._timing.measure("Basler.release"): grab_result.Release() grab_result = None diff --git a/dlclivegui/cameras/backends/gentl_backend.py b/dlclivegui/cameras/backends/gentl_backend.py index 26d5c2f..a433fb1 100644 --- a/dlclivegui/cameras/backends/gentl_backend.py +++ b/dlclivegui/cameras/backends/gentl_backend.py @@ -99,6 +99,9 @@ def __init__(self, settings): self._pixel_format: str = ns.get("pixel_format") or props.get("pixel_format", "auto") self._pixel_format = str(self._pixel_format).strip() + self._camera_pixel_format: str | None = None + self._actual_output_format: str | None = None + self._rotate: int = int(ns.get("rotate", props.get("rotate", 0))) % 360 self._crop: tuple[int, int, int, int] | None = self._parse_crop(ns.get("crop", props.get("crop"))) @@ -176,6 +179,16 @@ def actual_exposure(self) -> float | None: def actual_gain(self) -> float | None: return self._actual_gain + @property + def actual_pixel_format(self) -> str | None: + """Camera/native pixel format selected on the GenICam PixelFormat node.""" + return self._camera_pixel_format or (self._pixel_format if self._pixel_format != "auto" else None) + + @property + def actual_output_format(self) -> str | None: + """Current GenTL backend emits OpenCV-native BGR uint8 frames.""" + return self._actual_output_format or "BGR8" + @classmethod def is_available(cls) -> bool: return Harvester is not None @@ -587,6 +600,21 @@ def waits_for_hardware_trigger(self) -> bool: role = str(self._trigger_attr(getattr(self, "_trigger", None), "role", "off") or "off").lower() return role in {"external", "follower"} + @staticmethod + def _output_format_for_frame(frame: np.ndarray) -> str: + if frame.ndim == 2: + if frame.dtype == np.uint8: + return "Mono8" + return f"Mono{frame.dtype}" + if frame.ndim == 3: + channels = frame.shape[2] + if channels == 3 and frame.dtype == np.uint8: + return "BGR8" + if channels == 4 and frame.dtype == np.uint8: + return "BGRA8" + return f"{channels}ch-{frame.dtype}" + return str(frame.dtype) + def read(self) -> tuple[np.ndarray, float]: if self._acquirer is None: raise RuntimeError("GenTL image acquirer not initialised") @@ -625,6 +653,7 @@ def read(self) -> tuple[np.ndarray, float]: self._read_telemetry(self._acquirer.remote_device.node_map) except Exception: pass + self._actual_output_format = self._output_format_for_frame(frame) return frame, timestamp @@ -1312,11 +1341,14 @@ def _configure_pixel_format(self, node_map) -> None: pixel_format_node.value = selected self._pixel_format = str(pixel_format_node.value) + self._camera_pixel_format = self._pixel_format LOG.debug("GenTL pixel format selected: %s", self._pixel_format) except Exception as e: LOG.warning("Failed to configure pixel format '%s': %s", self._pixel_format, e) + if self._pixel_format and self._pixel_format.lower() != "auto": + self._camera_pixel_format = self._pixel_format def _configure_trigger(self, node_map) -> None: cfg = self._trigger @@ -1783,7 +1815,13 @@ def _read_telemetry(self, node_map) -> None: pixel_format = self._node_str(node_map, "PixelFormat") if pixel_format is not None: + self._camera_pixel_format = pixel_format ns["actual_pixel_format"] = pixel_format + ns["detected_pixel_format"] = pixel_format + + output_format = self.actual_output_format + if output_format is not None: + ns["actual_output_format"] = output_format except Exception: pass diff --git a/dlclivegui/cameras/backends/opencv_backend.py b/dlclivegui/cameras/backends/opencv_backend.py index 74fdede..869dde4 100644 --- a/dlclivegui/cameras/backends/opencv_backend.py +++ b/dlclivegui/cameras/backends/opencv_backend.py @@ -254,6 +254,16 @@ def actual_gain(self) -> None: """Not supported by OpenCV backend.""" return None + @property + def actual_pixel_format(self) -> str | None: + """OpenCV does not reliably expose native camera pixel format.""" + return None + + @property + def actual_output_format(self) -> str | None: + """OpenCV VideoCapture returns BGR frames in this backend.""" + return "BGR8" + # ---------------------------- # Internal helpers # ---------------------------- diff --git a/dlclivegui/cameras/base.py b/dlclivegui/cameras/base.py index fefedd1..f86f3d1 100644 --- a/dlclivegui/cameras/base.py +++ b/dlclivegui/cameras/base.py @@ -68,6 +68,7 @@ class SupportLevel(str, Enum): "set_fps": SupportLevel.UNSUPPORTED, "set_exposure": SupportLevel.UNSUPPORTED, "set_gain": SupportLevel.UNSUPPORTED, + "preserve_mono": SupportLevel.UNSUPPORTED, "device_discovery": SupportLevel.UNSUPPORTED, "stable_identity": SupportLevel.UNSUPPORTED, "hardware_trigger": SupportLevel.UNSUPPORTED, @@ -98,6 +99,14 @@ def static_capabilities(cls) -> dict[str, SupportLevel]: """Return a dict describing supported features for UI purposes.""" return DEFAULT_CAPABILITIES + @property + def actual_pixel_format(self) -> str | None: + return None + + @property + def recommended_preserve_mono(self) -> bool | None: + return None + @classmethod def options_key(cls) -> str: """Return the key used to store this backend's options in CameraSettings.""" diff --git a/dlclivegui/config.py b/dlclivegui/config.py index dac523d..6b3afac 100644 --- a/dlclivegui/config.py +++ b/dlclivegui/config.py @@ -24,9 +24,12 @@ ## Debug ### Timing logs -SINGLE_CAMERA_WORKER_DO_LOG_TIMING: bool = True -MULTI_CAMERA_WORKER_DO_LOG_TIMING: bool = True +SINGLE_CAMERA_WORKER_DO_LOG_TIMING: bool = False +MULTI_CAMERA_WORKER_DO_LOG_TIMING: bool = False +REC_DO_LOG_TIMING: bool = False # MAIN_WINDOW_DO_LOG_TIMING: bool = False +#### Backends +BASLER_DO_LOG_TIMING: bool = False class CameraSettings(BaseModel): @@ -42,6 +45,7 @@ class CameraSettings(BaseModel): exposure: int = 0 # 0=auto else µs gain: float = 0.0 # 0.0=auto else value + preserve_mono: bool = False # if True, preserve mono images as mono (not BGR) when reading crop_x0: int = 0 crop_y0: int = 0 @@ -65,6 +69,7 @@ def pretty(self) -> str: f" fps={self.fps}, size={self.width or 'auto'}x{self.height or 'auto'}, " f"exposure={self.exposure or 'auto'}, gain={self.gain or 'auto'}\n" f" rotation={self.rotation}, crop={crop}\n" + f" preserve_mono={self.preserve_mono}, max_devices={self.max_devices}\n" f"]" ) diff --git a/dlclivegui/gui/camera_config/camera_config_dialog.py b/dlclivegui/gui/camera_config/camera_config_dialog.py index d8defb4..617f801 100644 --- a/dlclivegui/gui/camera_config/camera_config_dialog.py +++ b/dlclivegui/gui/camera_config/camera_config_dialog.py @@ -359,6 +359,7 @@ def _mark_dirty(*_args): self.cam_rotation.currentIndexChanged.connect(lambda *_: _mark_dirty()) self.cam_enabled_checkbox.stateChanged.connect(lambda *_: _mark_dirty()) + self.cam_preserve_mono_checkbox.stateChanged.connect(lambda *_: _mark_dirty()) # ------------------------------- # UI state updates @@ -423,6 +424,8 @@ def _set_detected_labels(self, cam: CameraSettings) -> None: det_res = ns.get("detected_resolution") det_fps = ns.get("detected_fps") + det_output_format = ns.get("detected_output_format") + det_pixel_format = ns.get("detected_pixel_format") if isinstance(det_res, (list, tuple)) and len(det_res) == 2: try: @@ -438,6 +441,18 @@ def _set_detected_labels(self, cam: CameraSettings) -> None: else: self.detected_fps_label.setText("—") + self.detected_output_format_label.setText(str(det_output_format) if det_output_format else "—") + + tooltip_parts = [] + if det_output_format: + tooltip_parts.append(f"Backend output: {det_output_format}") + if det_pixel_format: + tooltip_parts.append(f"Camera PixelFormat: {det_pixel_format}") + + self.detected_output_format_label.setToolTip( + "\n".join(tooltip_parts) if tooltip_parts else "Backend-reported output frame format emitted to the app." + ) + def _refresh_camera_labels(self) -> None: cam_list = getattr(self, "active_cameras_list", None) if not cam_list: @@ -466,11 +481,12 @@ def _format_camera_label(self, cam: CameraSettings, index: int = -1) -> str: status = "✓" if cam.enabled else "○" this_id = f"{(cam.backend or '').lower()}:{cam.index}" dlc_indicator = " [DLC]" if this_id == self._dlc_camera_id and cam.enabled else "" + mono_indicator = " [Mono]" if getattr(cam, "preserve_mono", False) else "" trigger_role = self._trigger_role_for_label(cam) trigger_indicator = "" if trigger_role in {"off", "disabled"} else f" [{trigger_role}]" - return f"{status} {cam.name} [{cam.backend}:{cam.index}]{trigger_indicator}{dlc_indicator}" + return f"{status} {cam.name} [{cam.backend}:{cam.index}]{trigger_indicator}{dlc_indicator}{mono_indicator}" def _selected_detected_camera(self) -> DetectedCamera | None: row = self.available_cameras_list.currentRow() @@ -529,6 +545,9 @@ def apply(widget, feature: str, label: str, *, allow_best_effort: bool = True): apply(self.cam_exposure, "set_exposure", "Exposure") apply(self.cam_gain, "set_gain", "Gain") + # Output format / preserve mono + apply(self.cam_preserve_mono_checkbox, "preserve_mono", "Preserve mono output") + # Hardware trigger / sync apply(self.trigger_settings_btn, "hardware_trigger", "Hardware trigger") @@ -959,6 +978,7 @@ def _build_model_from_form(self, base: CameraSettings) -> CameraSettings: "crop_y0": int(self.cam_crop_y0.value()), "crop_x1": int(self.cam_crop_x1.value()), "crop_y1": int(self.cam_crop_y1.value()), + "preserve_mono": bool(self.cam_preserve_mono_checkbox.isChecked()), } ) # Validate and coerce; if invalid, Pydantic will raise @@ -977,6 +997,7 @@ def _load_camera_to_form(self, cam: CameraSettings) -> None: self.cam_crop_y0, self.cam_crop_x1, self.cam_crop_y1, + self.cam_preserve_mono_checkbox, ] for widget in block: if hasattr(widget, "blockSignals"): @@ -991,6 +1012,7 @@ def _load_camera_to_form(self, cam: CameraSettings) -> None: self.cam_index_label.setText(str(cam.index)) self.cam_backend_label.setText(cam.backend) self._update_controls_for_backend(cam.backend) + self.cam_preserve_mono_checkbox.setChecked(bool(getattr(cam, "preserve_mono", False))) self.cam_width.setValue(cam.width) self.cam_height.setValue(cam.height) self.cam_fps.setValue(cam.fps) @@ -1026,6 +1048,7 @@ def _write_form_to_cam(self, cam: CameraSettings) -> None: cam.crop_y0 = int(self.cam_crop_y0.value()) cam.crop_x1 = int(self.cam_crop_x1.value()) cam.crop_y1 = int(self.cam_crop_y1.value()) + cam.preserve_mono = bool(self.cam_preserve_mono_checkbox.isChecked()) def _commit_pending_edits(self, *, reason: str = "") -> bool: """ @@ -1186,6 +1209,8 @@ def _clear_settings_form(self) -> None: self.cam_backend_label.setText("") self.detected_resolution_label.setText("—") self.detected_fps_label.setText("—") + self.detected_output_format_label.setText("—") + self.detected_output_format_label.setToolTip("Backend-reported output frame format emitted to the app.") self.cam_width.setValue(0) self.cam_height.setValue(0) self.cam_fps.setValue(0.0) @@ -1196,6 +1221,7 @@ def _clear_settings_form(self) -> None: self.cam_crop_y0.setValue(0) self.cam_crop_x1.setValue(0) self.cam_crop_y1.setValue(0) + self.cam_preserve_mono_checkbox.setChecked(False) self.apply_settings_btn.setEnabled(False) self.reset_settings_btn.setEnabled(False) @@ -1292,6 +1318,8 @@ def _reset_selected_camera(self, *, clear_backend_cache: bool = False) -> None: else: ns.pop("detected_resolution", None) ns.pop("detected_fps", None) + ns.pop("detected_pixel_format", None) + ns.pop("detected_output_format", None) ns.pop("last_applied_resolution", None) # Update UI immediately to show "Auto" while probing @@ -1363,12 +1391,16 @@ def _start_probe_for_camera(self, cam: CameraSettings, *, apply_to_requested: bo ns = props.get(backend, {}) if isinstance(props.get(backend, None), dict) else {} if not apply_to_requested: det_res = ns.get("detected_resolution") + det_output = ns.get("detected_output_format") + has_res = False if isinstance(det_res, (list, tuple)) and len(det_res) == 2: try: - if int(det_res[0]) > 0 and int(det_res[1]) > 0: - return + has_res = int(det_res[0]) > 0 and int(det_res[1]) > 0 except Exception: - pass + has_res = False + + if has_res and det_output: + return # Start probe worker (settings will be opened in GUI thread for safety) self._probe_worker = CameraProbeWorker(cam, self) @@ -1394,6 +1426,9 @@ def _on_probe_success(self, payload) -> None: actual_res = getattr(be, "actual_resolution", None) actual_fps = getattr(be, "actual_fps", None) + actual_pixel_format = getattr(be, "actual_pixel_format", None) + actual_output_format = getattr(be, "actual_output_format", None) + recommended_preserve_mono = getattr(be, "recommended_preserve_mono", None) try: be.close() @@ -1416,12 +1451,33 @@ def _on_probe_success(self, payload) -> None: # Store regardless of "set_*" support. This is just "what device reports". if actual_res and isinstance(actual_res, (list, tuple)) and len(actual_res) == 2: ns["detected_resolution"] = [int(actual_res[0]), int(actual_res[1])] - elif actual_res and isinstance(actual_res, tuple) and len(actual_res) == 2: - ns["detected_resolution"] = [int(actual_res[0]), int(actual_res[1])] if isinstance(actual_fps, (int, float)) and float(actual_fps) > 0: ns["detected_fps"] = float(actual_fps) - self._append_status(f"[Probe] actual_res={actual_res}, actual_fps={actual_fps}") + + if actual_pixel_format: + ns["detected_pixel_format"] = str(actual_pixel_format) + self._append_status(f"[Probe] PixelFormat={actual_pixel_format}") + + if actual_output_format: + ns["detected_output_format"] = str(actual_output_format) + self._append_status(f"[Probe] OutputFormat={actual_output_format}") + + if recommended_preserve_mono is not None: + ns["recommended_preserve_mono"] = bool(recommended_preserve_mono) + + # ---- Generic capability-driven recommendation ---- + caps = CameraFactory.backend_capabilities(backend) + preserve_mono_cap = caps.get("preserve_mono") + preserve_mono_supported = preserve_mono_cap is not None and preserve_mono_cap.value != "unsupported" + + if preserve_mono_supported and recommended_preserve_mono is True: + if not bool(getattr(c, "preserve_mono", False)): + c.preserve_mono = True + self._append_status("[Probe] Mono pixel format detected; enabled Preserve mono frames.") + + if actual_pixel_format and str(actual_pixel_format).startswith("Mono"): + ns["detected_output_format"] = "Mono8" # ---- Apply detected -> requested (Reset behavior) ---- if self._probe_apply_to_requested and self._probe_target_row == i: @@ -1450,7 +1506,9 @@ def _on_probe_success(self, payload) -> None: # Always refresh detected labels if currently selected if self._current_edit_index == i: + self._load_camera_to_form(c) self._set_detected_labels(c) + break except Exception as exc: @@ -1678,7 +1736,7 @@ def _should_restart_preview(self, old: CameraSettings, new: CameraSettings) -> b Backend-agnostic for now (no OpenCV special casing). """ # Restart on these changes - for key in ("width", "height", "fps", "exposure", "gain"): + for key in ("width", "height", "fps", "exposure", "gain", "preserve_mono"): try: if getattr(old, key, None) != getattr(new, key, None): return True diff --git a/dlclivegui/gui/camera_config/ui_blocks.py b/dlclivegui/gui/camera_config/ui_blocks.py index 07a8025..4c91275 100644 --- a/dlclivegui/gui/camera_config/ui_blocks.py +++ b/dlclivegui/gui/camera_config/ui_blocks.py @@ -276,7 +276,7 @@ def build_settings_group(dlg: CameraConfigDialog) -> QGroupBox: ) dlg.settings_form.addRow(detected_row) - # --- Requested resolution controls (Auto = 0) --- + # --- Requested resolution/output format controls (Auto = 0) --- dlg.cam_width = QSpinBox() dlg.cam_width.setRange(0, 10000) dlg.cam_width.setValue(0) @@ -290,6 +290,24 @@ def build_settings_group(dlg: CameraConfigDialog) -> QGroupBox: res_row = make_two_field_row("W", dlg.cam_width, "H", dlg.cam_height, key_width=30) dlg.settings_form.addRow("Resolution:", res_row) + # --- Output format controls --- + dlg.cam_preserve_mono_checkbox = QCheckBox("Preserve mono output") + dlg.cam_preserve_mono_checkbox.setToolTip( + "For monochrome cameras, keep frames as single-channel Mono8 instead of converting to BGR. " + "This reduces memory bandwidth and recording overhead. Display/overlay/DLC may convert later if needed." + ) + + dlg.detected_output_format_label = QLabel("—") + dlg.detected_output_format_label.setTextInteractionFlags(Qt.TextSelectableByMouse) + dlg.detected_output_format_label.setToolTip( + "Backend-reported output frame format emitted to the app, for example Mono8 or BGR8." + ) + + output_row = make_two_field_row( + None, dlg.cam_preserve_mono_checkbox, "Detected:", dlg.detected_output_format_label, key_width=60, gap=40 + ) + dlg.settings_form.addRow("Output:", output_row) + # --- FPS + Rotation grouped --- dlg.cam_fps = QDoubleSpinBox() dlg.cam_fps.setRange(0.0, 240.0) diff --git a/dlclivegui/gui/main_window.py b/dlclivegui/gui/main_window.py index b6f1d99..9609b5a 100644 --- a/dlclivegui/gui/main_window.py +++ b/dlclivegui/gui/main_window.py @@ -65,7 +65,7 @@ scan_processor_package, ) from ..services.dlc_processor import DLCLiveProcessor, PoseResult -from ..services.multi_camera_controller import MultiCameraController, MultiFrameData, get_camera_id +from ..services.multi_camera_controller import MultiCameraController, MultiFrameData, get_camera_id, get_display_id from ..utils.display import BBoxColors, compute_tile_info, create_tiled_frame, draw_bbox, draw_pose from ..utils.settings_store import DLCLiveGUISettingsStore, ModelPathStore from ..utils.stats import format_dlc_stats @@ -164,6 +164,7 @@ def __init__(self, config: ApplicationSettings | None = None): # Multi-camera state self._multi_camera_mode = False self._multi_camera_frames: dict[str, np.ndarray] = {} + self._multi_camera_display_ids: dict[str, str] = {} # camera_id -> display_id (for labeling) # DLC pose rendering info for tiled view self._dlc_tile_offset: tuple[int, int] = (0, 0) # (x, y) offset in tiled frame self._dlc_tile_scale: tuple[float, float] = (1.0, 1.0) # (scale_x, scale_y) @@ -1379,6 +1380,7 @@ def _on_multi_frame_processing_ready(self, frame_data: MultiFrameData) -> None: 2. Recording (queued writes, non-blocking) """ self._multi_camera_frames = frame_data.frames + self._multi_camera_display_ids = frame_data.display_ids or {} src_id = frame_data.source_camera_id if src_id: self._fps_tracker.note_frame(src_id) # Track FPS @@ -1439,6 +1441,7 @@ def _on_multi_frame_display_ready(self, frame_data: MultiFrameData) -> None: Called at GUI_MAX_DISPLAY_FPS, not at camera capture FPS for performance reasons. """ self._multi_camera_frames = frame_data.frames + self._multi_camera_display_ids = frame_data.display_ids or {} self._display_dirty = True def _on_multi_camera_started(self) -> None: @@ -1459,6 +1462,7 @@ def _on_multi_camera_stopped(self) -> None: self.stop_preview_button.setEnabled(False) self._current_frame = None self._multi_camera_frames.clear() + self._multi_camera_display_ids.clear() self.video_label.setPixmap(QPixmap()) self.video_label.setText("Camera preview not started") self.statusBar().showMessage("Multi-camera preview stopped", 3000) @@ -1496,11 +1500,13 @@ def _start_multi_camera_recording(self) -> None: session_name = self.session_name_edit.text().strip() if hasattr(self, "session_name_edit") else "" use_ts = self.use_timestamp_checkbox.isChecked() if hasattr(self, "use_timestamp_checkbox") else True + actual_fps_by_camera = self.multi_camera_controller.actual_fps_by_camera_id() run_dir = self._rec_manager.start_all( recording, active_cams, self._multi_camera_frames, + frame_rates=actual_fps_by_camera, session_name=session_name, use_timestamp=use_ts, all_or_nothing=False, @@ -1593,6 +1599,7 @@ def _start_preview(self) -> None: self._raw_frame = None self._last_pose = None self._multi_camera_frames.clear() + self._multi_camera_display_ids.clear() self._fps_tracker.clear() self._last_display_time = 0.0 @@ -1735,7 +1742,7 @@ def _update_display_from_pending(self) -> None: self._display_dirty = False # Create tiled frame on demand (moved from camera thread for performance) - tiled = create_tiled_frame(self._multi_camera_frames) + tiled = create_tiled_frame(self._multi_camera_frames, labels=self._multi_camera_display_ids) if tiled is not None: self._current_frame = tiled self._update_video_display(tiled) @@ -1751,10 +1758,11 @@ def _update_metrics(self) -> None: active_cams = self._config.multi_camera.get_active_cameras() lines = [] for cam in active_cams: - cam_id = get_camera_id(cam) # e.g., "opencv:0" or "pylon:1" + cam_id = get_camera_id(cam) + display_id = get_display_id(cam) fps = self._fps_tracker.fps(cam_id) # Make a compact label: name [backend:index] @ fps - label = f"{cam.name or cam_id} [{cam.backend}:{cam.index}]" + label = f"{display_id} [{cam.backend}:{cam.index}]" if fps > 0: lines.append(f"{label} @ {fps:.1f} fps") else: diff --git a/dlclivegui/gui/recording_manager.py b/dlclivegui/gui/recording_manager.py index 49ac993..d12b19e 100644 --- a/dlclivegui/gui/recording_manager.py +++ b/dlclivegui/gui/recording_manager.py @@ -8,7 +8,8 @@ from dlclivegui.config import CameraSettings, RecordingSettings from dlclivegui.services.multi_camera_controller import get_camera_id -from dlclivegui.services.video_recorder import RecorderStats, VideoRecorder +from dlclivegui.services.video_recorder import VideoRecorder +from dlclivegui.utils.stats import RecorderStats from dlclivegui.utils.utils import build_run_dir, sanitize_name log = logging.getLogger(__name__) @@ -38,6 +39,55 @@ def session_dir(self) -> Path | None: def run_dir(self) -> Path | None: return self._run_dir + @staticmethod + def _backend_ns(cam: CameraSettings) -> dict: + backend = (cam.backend or "").lower() + props = cam.properties if isinstance(cam.properties, dict) else {} + ns = props.get(backend, {}) + return ns if isinstance(ns, dict) else {} + + @classmethod + def _resolve_recording_fps( + cls, + cam: CameraSettings, + cam_id: str, + frame_rates: dict[str, float] | None, + ) -> float | None: + """Resolve writer FPS. + + Prefer runtime measured FPS, then backend-probed detected_fps, + then explicit requested cam.fps. Auto/unknown returns None. + """ + measured_fps = 0.0 + if frame_rates: + try: + measured_fps = float(frame_rates.get(cam_id, 0.0) or 0.0) + except Exception: + measured_fps = 0.0 + + if measured_fps > 0.0: + return measured_fps + + ns = cls._backend_ns(cam) + + try: + detected_fps = float(ns.get("detected_fps", 0.0) or 0.0) + except Exception: + detected_fps = 0.0 + + if detected_fps > 0.0: + return detected_fps + + try: + requested_fps = float(getattr(cam, "fps", 0.0) or 0.0) + except Exception: + requested_fps = 0.0 + + if requested_fps > 0.0: + return requested_fps + + return None + def pop(self, cam_id: str, default=None) -> VideoRecorder | None: return self._recorders.pop(cam_id, default) @@ -47,6 +97,7 @@ def start_all( active_cams: list[CameraSettings], current_frames: dict[str, np.ndarray], *, + frame_rates: dict[str, float] | None = None, session_name: str = "session", use_timestamp: bool = True, all_or_nothing: bool = False, @@ -96,13 +147,25 @@ def start_all( frame = current_frames.get(cam_id) frame_size = (frame.shape[0], frame.shape[1]) if frame is not None else None + recorder_fps = self._resolve_recording_fps(cam, cam_id, frame_rates) + + log.debug( + "Starting recorder %s -> %s frame_size=%s requested_fps=%s detected_fps=%s recorder_fps=%s", + cam_id, + cam_path, + frame_size, + getattr(cam, "fps", None), + self._backend_ns(cam).get("detected_fps"), + f"{recorder_fps:.3f}" if recorder_fps else "auto/fallback", + ) recorder = VideoRecorder( cam_path, frame_size=frame_size, - frame_rate=float(cam.fps), + frame_rate=recorder_fps, codec=recording.codec, crf=recording.crf, + convert_grayscale_to_rgb=not bool(getattr(cam, "preserve_mono", False)), ) try: recorder.start() diff --git a/dlclivegui/services/multi_camera_controller.py b/dlclivegui/services/multi_camera_controller.py index aa5aaad..5ccf33c 100644 --- a/dlclivegui/services/multi_camera_controller.py +++ b/dlclivegui/services/multi_camera_controller.py @@ -49,6 +49,7 @@ class SingleCameraWorker(QObject): frame_captured = Signal(str, object, float) # camera_id, frame, timestamp error_occurred = Signal(str, str) # camera_id, error_message + runtime_info = Signal(str, object) # camera_id, dict of runtime info started = Signal(str) # camera_id stopped = Signal(str) # camera_id @@ -95,6 +96,15 @@ def run(self) -> None: ) self._backend.open() + self.runtime_info.emit( + self._camera_id, + { + "actual_fps": getattr(self._backend, "actual_fps", None), + "actual_resolution": getattr(self._backend, "actual_resolution", None), + "actual_pixel_format": getattr(self._backend, "actual_pixel_format", None), + "actual_output_format": getattr(self._backend, "actual_output_format", None), + }, + ) except Exception as exc: LOGGER.exception(f"Failed to initialize camera {self._camera_id}", exc_info=exc) self.error_occurred.emit(self._camera_id, f"Failed to initialize camera: {exc}") @@ -207,6 +217,22 @@ def _log_trigger_wait_throttled(self, exc: BaseException) -> None: def get_display_id(settings: CameraSettings) -> str: + """Return the human-friendly camera label used for GUI display. + Intentionally different from get_camera_id(), which should return a stable + internal, reliable and unambiguous identity and may contain serials or machine paths. + """ + name = str(getattr(settings, "name", "") or "").strip() + if name: + return name + + backend = (settings.backend or "").lower() + props = settings.properties if isinstance(settings.properties, dict) else {} + ns = props.get(backend, {}) if isinstance(props.get(backend), dict) else {} + + device_name = str(ns.get("device_name", "") or "").strip() + if device_name: + return device_name + return f"{settings.backend}:{settings.index}" @@ -287,6 +313,7 @@ def __init__(self): self._workers: dict[str, SingleCameraWorker] = {} self._threads: dict[str, QThread] = {} self._settings: dict[str, CameraSettings] = {} + self._runtime_info: dict[str, dict] = {} self._frames: dict[str, np.ndarray] = {} self._timestamps: dict[str, float] = {} self._frame_lock = Lock() @@ -421,6 +448,7 @@ def _start_camera(self, settings: CameraSettings) -> None: # Connections unchanged thread.started.connect(worker.run) + worker.runtime_info.connect(self._on_camera_runtime_info) worker.frame_captured.connect(self._on_frame_captured) worker.started.connect(self._on_camera_started) worker.stopped.connect(self._on_camera_stopped) @@ -589,6 +617,36 @@ def _on_frame_captured(self, camera_id: str, frame: np.ndarray, timestamp: float timing.note_frame() timing.maybe_log() + def _on_camera_runtime_info(self, camera_id: str, info: object) -> None: + if not isinstance(info, dict): + return + + self._runtime_info[camera_id] = dict(info) + + actual_fps = info.get("actual_fps") + LOGGER.info( + "Camera %s runtime info: actual_fps=%s actual_resolution=%s pixel_format=%s output_format=%s", + camera_id, + actual_fps, + info.get("actual_resolution"), + info.get("actual_pixel_format"), + info.get("actual_output_format"), + ) + + def actual_fps_by_camera_id(self) -> dict[str, float]: + out: dict[str, float] = {} + + for camera_id, info in self._runtime_info.items(): + try: + fps = float(info.get("actual_fps") or 0.0) + except Exception: + fps = 0.0 + + if fps > 0.0: + out[camera_id] = fps + + return out + @staticmethod def apply_rotation(frame: np.ndarray, degrees: int) -> np.ndarray: """Apply rotation to frame.""" @@ -661,98 +719,6 @@ def to_display_pixmap(frame: np.ndarray) -> QPixmap: q_img = QImage(frame.data, w, h, bytes_per_line, QImage.Format.Format_RGB888).copy() return QPixmap.fromImage(q_img) - def _create_tiled_frame(self) -> np.ndarray: - """Create a tiled frame from all camera frames. - - The tiled frame is scaled to fit within a maximum canvas size - while maintaining aspect ratio of individual camera frames. - """ - if not self._frames: - return np.zeros((480, 640, 3), dtype=np.uint8) - - frames_list = [self._frames[idx] for idx in sorted(self._frames.keys())] - num_frames = len(frames_list) - - if num_frames == 0: - return np.zeros((480, 640, 3), dtype=np.uint8) - - # Determine grid layout - if num_frames == 1: - rows, cols = 1, 1 - elif num_frames == 2: - rows, cols = 1, 2 - elif num_frames <= 4: - rows, cols = 2, 2 - else: - rows, cols = 2, 2 # Limit to 4 - - # Maximum canvas size to fit on screen (leaving room for UI elements) - max_canvas_width = 1200 - max_canvas_height = 800 - - # Calculate tile size based on frame aspect ratio and available space - first_frame = frames_list[0] - frame_h, frame_w = first_frame.shape[:2] - frame_aspect = frame_w / frame_h if frame_h > 0 else 1.0 - - # Calculate tile dimensions that fit within the canvas - tile_w = max_canvas_width // cols - tile_h = max_canvas_height // rows - - # Maintain aspect ratio of original frames - tile_aspect = tile_w / tile_h if tile_h > 0 else 1.0 - - if frame_aspect > tile_aspect: - # Frame is wider than tile slot - constrain by width - tile_h = int(tile_w / frame_aspect) - else: - # Frame is taller than tile slot - constrain by height - tile_w = int(tile_h * frame_aspect) - - # Ensure minimum size - tile_w = max(160, tile_w) - tile_h = max(120, tile_h) - - # Create canvas - canvas = np.zeros((rows * tile_h, cols * tile_w, 3), dtype=np.uint8) - - # Get sorted camera IDs for consistent ordering - cam_ids = sorted(self._frames.keys()) - frames_list = [self._frames[cam_id] for cam_id in cam_ids] - - # Place each frame in the grid - for idx, frame in enumerate(frames_list[: rows * cols]): - row = idx // cols - col = idx % cols - - # Ensure frame is 3-channel - frame = MultiCameraController.ensure_color_bgr(frame) - - # Resize to tile size - resized = MultiCameraController.apply_resize(frame, tile_w, tile_h, allow_upscale=True) - - # Add camera ID label - if idx < len(cam_ids): - label = cam_ids[idx] - cv2.putText( - resized, - label, - (10, 30), - cv2.FONT_HERSHEY_SIMPLEX, - 0.7, - (0, 255, 0), - 2, - ) - - # Place in canvas - y_start = row * tile_h - y_end = y_start + tile_h - x_start = col * tile_w - x_end = x_start + tile_w - canvas[y_start:y_end, x_start:x_end] = resized - - return canvas - def _on_camera_started(self, camera_id: str) -> None: """Handle camera start event.""" self._started_cameras.add(camera_id) @@ -815,20 +781,3 @@ def _on_camera_error(self, camera_id: str, message: str) -> None: if camera_id not in self._started_cameras: self._failed_cameras[camera_id] = message self.camera_error.emit(camera_id, message) - - def get_frame(self, camera_id: str) -> np.ndarray | None: - """Get the latest frame from a specific camera.""" - with self._frame_lock: - return self._frames.get(camera_id) - - def get_all_frames(self) -> dict[str, np.ndarray]: - """Get the latest frames from all cameras.""" - with self._frame_lock: - return dict(self._frames) - - def get_tiled_frame(self) -> np.ndarray | None: - """Get a tiled view of all camera frames.""" - with self._frame_lock: - if self._frames: - return self._create_tiled_frame() - return None diff --git a/dlclivegui/services/video_recorder.py b/dlclivegui/services/video_recorder.py index e2ae15c..76bfc1d 100644 --- a/dlclivegui/services/video_recorder.py +++ b/dlclivegui/services/video_recorder.py @@ -9,12 +9,14 @@ import threading import time from collections import deque -from dataclasses import dataclass from pathlib import Path from typing import Any import numpy as np +from dlclivegui.config import REC_DO_LOG_TIMING +from dlclivegui.utils.stats import RecorderStats, WorkerTimingStats + try: from vidgear.gears import WriteGear except ImportError: # pragma: no cover - handled at runtime @@ -26,20 +28,6 @@ STOP_JOIN_TIMEOUT = 5.0 # seconds -@dataclass -class RecorderStats: - """Snapshot of recorder throughput metrics.""" - - frames_enqueued: int = 0 - frames_written: int = 0 - dropped_frames: int = 0 - queue_size: int = 0 - average_latency: float = 0.0 - last_latency: float = 0.0 - write_fps: float = 0.0 - buffer_seconds: float = 0.0 - - _SENTINEL = object() @@ -54,6 +42,7 @@ def __init__( codec: str = "libx264", crf: int = 23, buffer_size: int = 240, + convert_grayscale_to_rgb: bool = True, ): # Config self._output = Path(output) @@ -63,6 +52,7 @@ def __init__( self._codec = codec self._crf = int(crf) self._buffer_size = max(1, int(buffer_size)) + self._convert_grayscale_to_rgb = bool(convert_grayscale_to_rgb) # Worker state self._queue: queue.Queue[Any] | None = None self._writer_thread: threading.Thread | None = None @@ -80,6 +70,14 @@ def __init__( self._encode_error: Exception | None = None self._last_log_time = 0.0 self._frame_timestamps: list[float] = [] + # Timing + self._process_timing = WorkerTimingStats( + f"RecorderProcess[{self._output.name}]", logger=logger, log_interval=1.0, enabled=REC_DO_LOG_TIMING + ) + self._writer_timing = WorkerTimingStats( + f"RecorderWriter[{self._output.name}]", logger=logger, log_interval=1.0, enabled=REC_DO_LOG_TIMING + ) + self._logged_first_frame = False @property def is_running(self) -> bool: @@ -111,7 +109,28 @@ def start(self) -> None: self._queue = None self._writer_thread = None - fps_value = float(self._frame_rate) if self._frame_rate else 30.0 + if self._frame_rate and float(self._frame_rate) > 0.0: + fps_value = float(self._frame_rate) + else: + fps_value = 30.0 + logger.warning( + "VideoRecorder frame_rate missing/zero for %s; falling back to %.3f FPS. " + "Video playback duration may not match capture timestamps.", + self._output.name, + fps_value, + ) + + logger.info( + "Starting VideoRecorder output=%s frame_size=%s frame_rate=%.3f " + "codec=%s crf=%s buffer_size=%s convert_grayscale_to_rgb=%s", + self._output, + self._frame_size, + fps_value, + self._codec, + self._crf, + self._buffer_size, + self._convert_grayscale_to_rgb, + ) writer_kwargs: dict[str, Any] = { "compression_mode": True, @@ -120,7 +139,15 @@ def start(self) -> None: "-vcodec": (self._codec or "libx264").strip() or "libx264", "-crf": int(self._crf), } - # TODO deal with pixel format + # if not self._convert_grayscale_to_rgb: + # writer_kwargs.update( + # { + # "-pix_fmt": "yuv420p", + # } + # ) + # if self._frame_size is not None: + # h, w = self._frame_size + # writer_kwargs["-output_dimensions"] = (int(w), int(h)) self._output.parent.mkdir(parents=True, exist_ok=True) self._writer = WriteGear(output=str(self._output), **writer_kwargs) @@ -160,41 +187,57 @@ def write(self, frame: np.ndarray, timestamp: float | None = None) -> bool: if timestamp is None: timestamp = time.time() - # Convert frame to uint8 if needed - if frame.dtype != np.uint8: - frame_float = frame.astype(np.float32, copy=False) - max_val = float(frame_float.max()) if frame_float.size else 0.0 - scale = 1.0 - if max_val > 0: - scale = 255.0 / max_val if max_val > 255.0 else (255.0 if max_val <= 1.0 else 1.0) - frame = np.clip(frame_float * scale, 0.0, 255.0).astype(np.uint8) - - # Convert grayscale to RGB if needed - if frame.ndim == 2: - frame = np.repeat(frame[:, :, None], 3, axis=2) - - # Ensure contiguous array - frame = np.ascontiguousarray(frame) - - # Check if frame size matches expected size - if self._frame_size is not None: - expected_h, expected_w = self._frame_size - actual_h, actual_w = frame.shape[:2] - if (actual_h, actual_w) != (expected_h, expected_w): - logger.warning( - f"Frame size mismatch: expected (h={expected_h}, w={expected_w}), " - f"got (h={actual_h}, w={actual_w}). " - "Stopping recorder to prevent encoding errors." + with self._process_timing.measure("Recorder.preprocess"): + # Convert frame to uint8 if needed + if frame.dtype != np.uint8: + frame_float = frame.astype(np.float32, copy=False) + max_val = float(frame_float.max()) if frame_float.size else 0.0 + scale = 1.0 + if max_val > 0: + scale = 255.0 / max_val if max_val > 255.0 else (255.0 if max_val <= 1.0 else 1.0) + frame = np.clip(frame_float * scale, 0.0, 255.0).astype(np.uint8) + + # Convert grayscale to RGB if needed + if self._convert_grayscale_to_rgb and frame.ndim == 2: + frame = np.repeat(frame[:, :, None], 3, axis=2) + + # Ensure contiguous array + frame = np.ascontiguousarray(frame) + + if not self._logged_first_frame: + self._logged_first_frame = True + logger.info( + "Recorder %s first frame: shape=%s dtype=%s " + "contiguous=%s nbytes=%.2f MB convert_grayscale_to_rgb=%s", + self._output.name, + frame.shape, + frame.dtype, + frame.flags.c_contiguous, + frame.nbytes / (1024 * 1024), + self._convert_grayscale_to_rgb, ) - # Set error to stop recording gracefully - with self._stats_lock: - self._encode_error = ValueError( - f"Frame size changed from (h={expected_h}, w={expected_w}) to (h={actual_h}, w={actual_w})" + + # Check if frame size matches expected size + if self._frame_size is not None: + expected_h, expected_w = self._frame_size + actual_h, actual_w = frame.shape[:2] + if (actual_h, actual_w) != (expected_h, expected_w): + logger.warning( + f"Frame size mismatch: expected (h={expected_h}, w={expected_w}), " + f"got (h={actual_h}, w={actual_w}). " + "Stopping recorder to prevent encoding errors." ) - return False + with self._stats_lock: + self._encode_error = ValueError( + f"Frame size changed from (h={expected_h}, w={expected_w}) to (h={actual_h}, w={actual_w})" + ) + self._process_timing.note_error() + self._process_timing.maybe_log() + return False try: - q.put((frame, timestamp), block=False) + with self._process_timing.measure("Recorder.queue_put"): + q.put((frame, timestamp), block=False) except queue.Full: with self._stats_lock: self._dropped_frames += 1 @@ -204,9 +247,16 @@ def write(self, frame: np.ndarray, timestamp: float | None = None) -> bool: queue_size, self._buffer_size, ) + self._process_timing.note_error() + self._process_timing.maybe_log() return False + with self._stats_lock: self._frames_enqueued += 1 + + self._process_timing.note_frame() + self._process_timing.maybe_log() + return True def stop(self) -> None: @@ -328,12 +378,17 @@ def _writer_loop(self) -> None: writer = self._writer if writer is None: raise RuntimeError("WriteGear writer is not initialized") - writer.write(frame) + + with self._writer_timing.measure("Recorder.writer_write"): + writer.write(frame) + except Exception as exc: with self._stats_lock: self._encode_error = exc logger.exception("Video encoding failed while writing frame", exc_info=exc) self._stop_event.set() + self._writer_timing.note_error() + self._writer_timing.maybe_log() break else: elapsed = time.perf_counter() - start @@ -348,6 +403,9 @@ def _writer_loop(self) -> None: self._compute_write_fps_locked() self._last_log_time = now + self._writer_timing.note_frame() + self._writer_timing.maybe_log() + finally: # Ensure queue accounting is correct for every item pulled from q try: diff --git a/dlclivegui/utils/stats.py b/dlclivegui/utils/stats.py index 38e3798..3a00c02 100644 --- a/dlclivegui/utils/stats.py +++ b/dlclivegui/utils/stats.py @@ -3,9 +3,25 @@ import logging import time +from dataclasses import dataclass +from typing import TYPE_CHECKING -from dlclivegui.services.dlc_processor import ProcessorStats -from dlclivegui.services.video_recorder import RecorderStats +if TYPE_CHECKING: + from dlclivegui.services.dlc_processor import ProcessorStats + + +@dataclass +class RecorderStats: + """Snapshot of recorder throughput metrics.""" + + frames_enqueued: int = 0 + frames_written: int = 0 + dropped_frames: int = 0 + queue_size: int = 0 + average_latency: float = 0.0 + last_latency: float = 0.0 + write_fps: float = 0.0 + buffer_seconds: float = 0.0 class WorkerTimingStats: diff --git a/tests/cameras/backends/test_basler_backend.py b/tests/cameras/backends/test_basler_backend.py index 64496b9..18f49a1 100644 --- a/tests/cameras/backends/test_basler_backend.py +++ b/tests/cameras/backends/test_basler_backend.py @@ -232,7 +232,7 @@ def test_basler_exposure_gain_fps_are_applied_when_nonzero( # --------------------------------------------------------------------- -def test_basler_static_capabilities_advertises_hardware_trigger_best_effort( +def test_basler_static_capabilities_advertises_hardware_trigger_best_effort_and_mono( patch_basler_sdk, ): import dlclivegui.cameras.backends.basler_backend as bb @@ -240,6 +240,7 @@ def test_basler_static_capabilities_advertises_hardware_trigger_best_effort( caps = bb.BaslerCameraBackend.static_capabilities() assert caps["hardware_trigger"] == SupportLevel.BEST_EFFORT + assert caps["preserve_mono"] == SupportLevel.SUPPORTED def test_basler_default_trigger_is_off_and_free_runs( diff --git a/tests/gui/test_rec_manager.py b/tests/gui/test_rec_manager.py index c789078..b3654a2 100644 --- a/tests/gui/test_rec_manager.py +++ b/tests/gui/test_rec_manager.py @@ -6,7 +6,7 @@ from dlclivegui.config import CameraSettings from dlclivegui.gui.recording_manager import RecordingManager from dlclivegui.services.multi_camera_controller import get_camera_id, get_display_id -from dlclivegui.services.video_recorder import RecorderStats +from dlclivegui.utils.stats import RecorderStats @pytest.fixture @@ -306,7 +306,7 @@ def test_recording_manager_uses_stable_camera_id_not_display_id( display_id = get_display_id(cam) assert stable_id == "gentl:serial:SER0" - assert display_id == "gentl:0" + assert display_id == "GenTL cam" assert stable_id != display_id frame = np.zeros((480, 640, 3), dtype=np.uint8) diff --git a/tests/services/test_multicam_controller.py b/tests/services/test_multicam_controller.py index e5bf609..855b01a 100644 --- a/tests/services/test_multicam_controller.py +++ b/tests/services/test_multicam_controller.py @@ -157,7 +157,7 @@ def test_get_display_id_is_human_index_label(): ).apply_defaults() assert get_camera_id(cam) == "gentl:serial:30220469" - assert get_display_id(cam) == "gentl:3" + assert get_display_id(cam) == "GenTL Cam" assert get_camera_id(cam) != get_display_id(cam) @@ -193,6 +193,23 @@ def test_trigger_role_from_settings_aliases(role, expected): assert _trigger_role_from_settings(cam) == expected +@pytest.mark.unit +def test_get_display_id_falls_back_to_backend_index_without_name(): + cam = CameraSettings( + name="", + backend="gentl", + index=3, + properties={ + "gentl": { + "device_id": "serial:30220469", + "serial_number": "30220469", + } + }, + ).apply_defaults() + + assert get_display_id(cam) == "gentl:3" + + @pytest.mark.unit def test_camera_start_priority_orders_trigger_roles(): external = CameraSettings( @@ -330,9 +347,8 @@ def test_controller_uses_stable_camera_id_not_display_id(qtbot, patch_factory): display_id = get_display_id(cam) assert stable_id == "gentl:serial:SER0" - assert display_id == "gentl:0" + assert display_id == "C1" assert stable_id != display_id - seen = [] def on_ready(mfd): @@ -348,6 +364,8 @@ def on_ready(mfd): mfd = seen[-1] + assert stable_id in mfd.frames + assert mfd.display_ids[stable_id] == "C1" assert mfd.source_camera_id == stable_id assert stable_id in mfd.frames assert stable_id in mfd.timestamps diff --git a/tests/services/test_video_recorder.py b/tests/services/test_video_recorder.py index 28bb856..efde6e2 100644 --- a/tests/services/test_video_recorder.py +++ b/tests/services/test_video_recorder.py @@ -372,3 +372,49 @@ def test_stop_timeout_marks_abandoned_and_prevents_restart( assert rec._abandoned is False rec.start() rec.stop() + + +def test_video_recorder_preserves_gray_when_requested(monkeypatch, tmp_path): + written = [] + + class FakeWriter: + def write(self, frame): + written.append(frame) + + def close(self): + pass + + monkeypatch.setattr("dlclivegui.services.video_recorder.WriteGear", lambda *a, **k: FakeWriter()) + + rec = vr_mod.VideoRecorder( + tmp_path / "out.mp4", + frame_size=(10, 20), + frame_rate=100, + convert_grayscale_to_rgb=False, + ) + rec.start() + rec.write(np.zeros((10, 20), dtype=np.uint8)) + rec.stop() + + assert written + assert written[0].shape == (10, 20) + + +def test_video_recorder_expands_gray_by_default(monkeypatch, tmp_path): + written = [] + + class FakeWriter: + def write(self, frame): + written.append(frame) + + def close(self): + pass + + monkeypatch.setattr("dlclivegui.services.video_recorder.WriteGear", lambda *a, **k: FakeWriter()) + + rec = vr_mod.VideoRecorder(tmp_path / "out.mp4", frame_size=(10, 20), frame_rate=100) + rec.start() + rec.write(np.zeros((10, 20), dtype=np.uint8)) + rec.stop() + + assert written[0].shape == (10, 20, 3)