| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204 |
- """Tests for capture PID tracking and cleanup exclusion (#172).
- The Obico detection service spawns short-lived ffmpeg processes for snapshot
- capture via capture_camera_frame_bytes(). These must be registered in
- _active_capture_pids so the cleanup task in routes/camera.py does not kill
- them as orphaned.
- """
- import asyncio
- from unittest.mock import AsyncMock, MagicMock, patch
- import pytest
- from backend.app.services.camera import (
- _active_capture_pids,
- capture_camera_frame_bytes,
- )
- @pytest.fixture(autouse=True)
- def _clear_capture_pids():
- """Ensure _active_capture_pids is empty before/after each test."""
- _active_capture_pids.clear()
- yield
- _active_capture_pids.clear()
- class TestCapturePidRegistration:
- """Verify PIDs are added/removed from _active_capture_pids."""
- @pytest.mark.asyncio
- async def test_pid_registered_during_capture(self):
- """PID is in _active_capture_pids while ffmpeg is running."""
- observed_pids_during_run: set[int] = set()
- fake_process = MagicMock()
- fake_process.pid = 99999
- fake_process.returncode = 0
- async def fake_communicate():
- # Snapshot what's in the set while "ffmpeg is running"
- observed_pids_during_run.update(_active_capture_pids)
- return (b"\xff\xd8" + b"\x00" * 200 + b"\xff\xd9", b"")
- fake_process.communicate = fake_communicate
- fake_proxy_server = AsyncMock()
- fake_proxy_server.close = MagicMock()
- with (
- patch("backend.app.services.camera.is_chamber_image_model", return_value=False),
- patch("backend.app.services.camera.get_camera_port", return_value=322),
- patch("backend.app.services.camera.create_tls_proxy", return_value=(12345, fake_proxy_server)),
- patch("backend.app.services.camera.get_ffmpeg_path", return_value="/usr/bin/ffmpeg"),
- patch("asyncio.create_subprocess_exec", return_value=fake_process),
- ):
- result = await capture_camera_frame_bytes("192.168.1.1", "test", "P2S", timeout=10)
- # PID was registered during capture
- assert 99999 in observed_pids_during_run
- # PID is removed after capture completes
- assert 99999 not in _active_capture_pids
- # Capture returned data
- assert result is not None
- @pytest.mark.asyncio
- async def test_pid_removed_after_failure(self):
- """PID is cleaned up even when ffmpeg returns non-zero."""
- fake_process = MagicMock()
- fake_process.pid = 88888
- fake_process.returncode = 1
- async def fake_communicate():
- return (b"", b"some error")
- fake_process.communicate = fake_communicate
- fake_proxy_server = AsyncMock()
- fake_proxy_server.close = MagicMock()
- with (
- patch("backend.app.services.camera.is_chamber_image_model", return_value=False),
- patch("backend.app.services.camera.get_camera_port", return_value=322),
- patch("backend.app.services.camera.create_tls_proxy", return_value=(12345, fake_proxy_server)),
- patch("backend.app.services.camera.get_ffmpeg_path", return_value="/usr/bin/ffmpeg"),
- patch("asyncio.create_subprocess_exec", return_value=fake_process),
- ):
- result = await capture_camera_frame_bytes("192.168.1.1", "test", "P2S", timeout=10)
- assert result is None
- assert 88888 not in _active_capture_pids
- @pytest.mark.asyncio
- async def test_pid_removed_after_timeout(self):
- """PID is cleaned up when ffmpeg times out."""
- fake_process = MagicMock()
- fake_process.pid = 77777
- fake_process.returncode = None
- fake_process.kill = MagicMock()
- async def fake_communicate():
- await asyncio.sleep(60) # Will be cancelled by wait_for
- return (b"", b"")
- fake_process.communicate = fake_communicate
- async def fake_wait():
- fake_process.returncode = -9
- fake_process.wait = fake_wait
- fake_proxy_server = AsyncMock()
- fake_proxy_server.close = MagicMock()
- with (
- patch("backend.app.services.camera.is_chamber_image_model", return_value=False),
- patch("backend.app.services.camera.get_camera_port", return_value=322),
- patch("backend.app.services.camera.create_tls_proxy", return_value=(12345, fake_proxy_server)),
- patch("backend.app.services.camera.get_ffmpeg_path", return_value="/usr/bin/ffmpeg"),
- patch("asyncio.create_subprocess_exec", return_value=fake_process),
- ):
- result = await capture_camera_frame_bytes("192.168.1.1", "test", "P2S", timeout=0.01)
- assert result is None
- assert 77777 not in _active_capture_pids
- @pytest.mark.asyncio
- async def test_no_pid_tracked_for_chamber_image_models(self):
- """Chamber image models (A1/P1) don't spawn ffmpeg — no PID tracking."""
- with (
- patch("backend.app.services.camera.is_chamber_image_model", return_value=True),
- patch("backend.app.services.camera.read_chamber_image_frame", return_value=b"\xff\xd8test\xff\xd9"),
- ):
- result = await capture_camera_frame_bytes("192.168.1.1", "test", "A1", timeout=10)
- assert result is not None
- assert len(_active_capture_pids) == 0
- @pytest.mark.asyncio
- async def test_no_pid_tracked_when_subprocess_fails(self):
- """If create_subprocess_exec raises, process is None — no PID to track."""
- fake_proxy_server = AsyncMock()
- fake_proxy_server.close = MagicMock()
- with (
- patch("backend.app.services.camera.is_chamber_image_model", return_value=False),
- patch("backend.app.services.camera.get_camera_port", return_value=322),
- patch("backend.app.services.camera.create_tls_proxy", return_value=(12345, fake_proxy_server)),
- patch("backend.app.services.camera.get_ffmpeg_path", return_value="/usr/bin/ffmpeg"),
- patch("asyncio.create_subprocess_exec", side_effect=FileNotFoundError("ffmpeg")),
- ):
- result = await capture_camera_frame_bytes("192.168.1.1", "test", "P2S", timeout=10)
- assert result is None
- assert len(_active_capture_pids) == 0
- class TestCleanupExcludesCapturePids:
- """Verify cleanup_orphaned_streams skips PIDs in _active_capture_pids."""
- @pytest.mark.asyncio
- async def test_cleanup_skips_capture_pids(self):
- """A PID in _active_capture_pids must not be killed by cleanup."""
- from backend.app.api.routes.camera import cleanup_orphaned_streams
- _active_capture_pids.add(42000)
- with (
- patch("backend.app.api.routes.camera._scan_bambu_ffmpeg_pids", return_value=[42000]),
- patch("backend.app.api.routes.camera._active_streams", {}),
- patch("backend.app.api.routes.camera._spawned_ffmpeg_pids", {}),
- patch("os.kill") as mock_kill,
- ):
- await cleanup_orphaned_streams()
- # os.kill should NOT have been called with SIGKILL for our capture PID
- for call in mock_kill.call_args_list:
- pid, sig = call[0]
- assert pid != 42000, "cleanup killed an active capture PID"
- @pytest.mark.asyncio
- async def test_cleanup_kills_non_capture_pids(self):
- """PIDs NOT in _active_capture_pids should still be killed."""
- import signal
- from backend.app.api.routes.camera import cleanup_orphaned_streams
- # 42000 is a capture PID, 43000 is truly orphaned
- _active_capture_pids.add(42000)
- with (
- patch("backend.app.api.routes.camera._scan_bambu_ffmpeg_pids", return_value=[42000, 43000]),
- patch("backend.app.api.routes.camera._active_streams", {}),
- patch("backend.app.api.routes.camera._spawned_ffmpeg_pids", {}),
- patch("os.kill") as mock_kill,
- ):
- await cleanup_orphaned_streams()
- # 43000 should have been killed
- mock_kill.assert_any_call(43000, signal.SIGKILL)
- # 42000 should NOT have been killed with SIGKILL
- killed_pids = [call[0][0] for call in mock_kill.call_args_list if call[0][1] == signal.SIGKILL]
- assert 42000 not in killed_pids
|