|
|
@@ -1033,7 +1033,7 @@ async def _capture_snapshot_for_notification(printer_id: int, printer, logger) -
|
|
|
frame_data = await capture_frame(printer.external_camera_url, printer.external_camera_type or "mjpeg")
|
|
|
if frame_data and len(frame_data) <= 2_500_000:
|
|
|
logger.info("[SNAPSHOT] External camera frame: %s bytes", len(frame_data))
|
|
|
- return frame_data
|
|
|
+ return _apply_camera_rotation(frame_data, printer, logger)
|
|
|
|
|
|
# Try buffered frame from active stream
|
|
|
from backend.app.api.routes.camera import _active_chamber_streams, _active_streams, get_buffered_frame
|
|
|
@@ -1045,7 +1045,7 @@ async def _capture_snapshot_for_notification(printer_id: int, printer, logger) -
|
|
|
if (active_for_printer or active_chamber) and buffered_frame:
|
|
|
logger.info("[SNAPSHOT] Using buffered frame for printer %s: %s bytes", printer_id, len(buffered_frame))
|
|
|
if len(buffered_frame) <= 2_500_000:
|
|
|
- return buffered_frame
|
|
|
+ return _apply_camera_rotation(buffered_frame, printer, logger)
|
|
|
|
|
|
# Fresh capture from printer camera
|
|
|
logger.info("[SNAPSHOT] Capturing fresh frame for printer %s", printer_id)
|
|
|
@@ -1056,7 +1056,7 @@ async def _capture_snapshot_for_notification(printer_id: int, printer, logger) -
|
|
|
)
|
|
|
if frame_data and len(frame_data) <= 2_500_000:
|
|
|
logger.info("[SNAPSHOT] Fresh camera frame: %s bytes", len(frame_data))
|
|
|
- return frame_data
|
|
|
+ return _apply_camera_rotation(frame_data, printer, logger)
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.warning("[SNAPSHOT] Failed to capture snapshot for printer %s: %s", printer_id, e)
|
|
|
@@ -1064,6 +1064,30 @@ async def _capture_snapshot_for_notification(printer_id: int, printer, logger) -
|
|
|
return None
|
|
|
|
|
|
|
|
|
+def _apply_camera_rotation(image_data: bytes, printer, logger) -> bytes:
|
|
|
+ """Apply camera rotation to snapshot image if configured."""
|
|
|
+ rotation = getattr(printer, "camera_rotation", 0)
|
|
|
+ if not rotation or rotation == 0:
|
|
|
+ return image_data
|
|
|
+
|
|
|
+ try:
|
|
|
+ from io import BytesIO
|
|
|
+
|
|
|
+ from PIL import Image
|
|
|
+
|
|
|
+ img = Image.open(BytesIO(image_data))
|
|
|
+ # PIL rotate is counter-clockwise, so negate for clockwise rotation
|
|
|
+ img = img.rotate(-rotation, expand=True)
|
|
|
+ buf = BytesIO()
|
|
|
+ img.save(buf, format="JPEG", quality=90)
|
|
|
+ rotated = buf.getvalue()
|
|
|
+ logger.info("[SNAPSHOT] Applied %d° rotation: %s → %s bytes", rotation, len(image_data), len(rotated))
|
|
|
+ return rotated
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning("[SNAPSHOT] Failed to apply rotation: %s", e)
|
|
|
+ return image_data
|
|
|
+
|
|
|
+
|
|
|
async def _send_print_start_notification(
|
|
|
printer_id: int,
|
|
|
data: dict,
|