conftest.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581
  1. """Shared test fixtures for BamBuddy backend tests."""
  2. import asyncio
  3. import atexit
  4. import json
  5. import logging
  6. import os
  7. import shutil
  8. import tempfile
  9. from collections.abc import AsyncGenerator
  10. from pathlib import Path
  11. from unittest.mock import AsyncMock, MagicMock, patch
  12. import pytest
  13. # IMPORTANT: Set environment variables BEFORE any app imports
  14. # This must happen before settings/config are loaded
  15. os.environ["LOG_TO_FILE"] = "false"
  16. os.environ["DEBUG"] = "false"
  17. from httpx import ASGITransport, AsyncClient # noqa: E402
  18. from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine # noqa: E402
  19. # Ensure settings use our env vars - import and override before database import
  20. from backend.app.core.config import settings # noqa: E402
  21. settings.log_to_file = False
  22. # Use a temp directory for plate calibration to avoid deleting real calibration files
  23. _test_plate_cal_dir = Path(tempfile.mkdtemp(prefix="bambuddy_test_plate_cal_"))
  24. settings.plate_calibration_dir = _test_plate_cal_dir
  25. # Clean up temp directory when tests finish
  26. def _cleanup_test_plate_cal_dir():
  27. if _test_plate_cal_dir.exists():
  28. shutil.rmtree(_test_plate_cal_dir, ignore_errors=True)
  29. atexit.register(_cleanup_test_plate_cal_dir)
  30. from backend.app.core.database import Base # noqa: E402
  31. # Use in-memory SQLite for tests
  32. TEST_DATABASE_URL = "sqlite+aiosqlite:///:memory:"
  33. @pytest.fixture(scope="session")
  34. def event_loop():
  35. """Create an instance of the default event loop for each test session."""
  36. loop = asyncio.get_event_loop_policy().new_event_loop()
  37. yield loop
  38. loop.close()
  39. @pytest.fixture
  40. async def test_engine():
  41. """Create a test database engine."""
  42. engine = create_async_engine(TEST_DATABASE_URL, echo=False)
  43. # Import all models to register them
  44. from backend.app.models import (
  45. ams_history,
  46. api_key,
  47. archive,
  48. external_link,
  49. filament,
  50. group,
  51. kprofile_note,
  52. maintenance,
  53. notification,
  54. notification_template,
  55. print_queue,
  56. printer,
  57. project,
  58. settings,
  59. smart_plug,
  60. user,
  61. )
  62. async with engine.begin() as conn:
  63. await conn.run_sync(Base.metadata.create_all)
  64. yield engine
  65. async with engine.begin() as conn:
  66. await conn.run_sync(Base.metadata.drop_all)
  67. await engine.dispose()
  68. @pytest.fixture
  69. async def db_session(test_engine) -> AsyncGenerator[AsyncSession, None]:
  70. """Create a test database session."""
  71. async_session_maker = async_sessionmaker(test_engine, class_=AsyncSession, expire_on_commit=False)
  72. async with async_session_maker() as session:
  73. yield session
  74. @pytest.fixture
  75. async def async_client(test_engine, db_session) -> AsyncGenerator[AsyncClient, None]:
  76. """Create an async test client."""
  77. from backend.app.core.database import async_session, get_db
  78. from backend.app.main import app
  79. # Create a new session maker for the test engine
  80. test_async_session = async_sessionmaker(test_engine, class_=AsyncSession, expire_on_commit=False)
  81. async def override_get_db():
  82. async with test_async_session() as session:
  83. yield session
  84. app.dependency_overrides[get_db] = override_get_db
  85. # Mock init_printer_connections to prevent MQTT connection attempts during tests
  86. async def mock_init_printer_connections(db):
  87. pass # No-op - don't connect to real printers
  88. # Also patch the module-level async_session used by services, auth, and middleware
  89. with (
  90. patch("backend.app.core.database.async_session", test_async_session),
  91. patch("backend.app.core.auth.async_session", test_async_session),
  92. patch("backend.app.main.async_session", test_async_session),
  93. patch("backend.app.main.init_printer_connections", mock_init_printer_connections),
  94. ):
  95. # Seed default groups for tests that need them
  96. from backend.app.core.database import seed_default_groups
  97. await seed_default_groups()
  98. async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
  99. yield client
  100. app.dependency_overrides.clear()
  101. # ============================================================================
  102. # Mock External Services
  103. # ============================================================================
  104. @pytest.fixture
  105. def mock_tasmota_service():
  106. """Mock the Tasmota service for smart plug tests."""
  107. # Patch both the module where it's defined and where it's imported
  108. with (
  109. patch("backend.app.services.tasmota.tasmota_service") as mock,
  110. patch("backend.app.api.routes.smart_plugs.tasmota_service") as mock2,
  111. ):
  112. mock.turn_on = AsyncMock(return_value=True)
  113. mock.turn_off = AsyncMock(return_value=True)
  114. mock.toggle = AsyncMock(return_value=True)
  115. mock.get_status = AsyncMock(return_value={"state": "ON", "reachable": True, "device_name": "Test Plug"})
  116. mock.get_energy = AsyncMock(
  117. return_value={
  118. "power": 150.5,
  119. "voltage": 120.0,
  120. "current": 1.25,
  121. "today": 2.5,
  122. "total": 100.0,
  123. "factor": 0.95,
  124. }
  125. )
  126. mock.test_connection = AsyncMock(return_value={"success": True, "state": "ON", "device_name": "Test Plug"})
  127. # Copy mocks to second patch target
  128. mock2.turn_on = mock.turn_on
  129. mock2.turn_off = mock.turn_off
  130. mock2.toggle = mock.toggle
  131. mock2.get_status = mock.get_status
  132. mock2.get_energy = mock.get_energy
  133. mock2.test_connection = mock.test_connection
  134. yield mock
  135. @pytest.fixture
  136. def mock_homeassistant_service():
  137. """Mock the Home Assistant service for smart plug tests."""
  138. # Patch both the module where it's defined and where it's imported
  139. with (
  140. patch("backend.app.services.homeassistant.homeassistant_service") as mock,
  141. patch("backend.app.api.routes.smart_plugs.homeassistant_service") as mock2,
  142. ):
  143. mock.turn_on = AsyncMock(return_value=True)
  144. mock.turn_off = AsyncMock(return_value=True)
  145. mock.toggle = AsyncMock(return_value=True)
  146. mock.get_status = AsyncMock(return_value={"state": "ON", "reachable": True, "device_name": "Test HA Entity"})
  147. mock.get_energy = AsyncMock(return_value=None) # Most HA entities don't have power monitoring
  148. mock.test_connection = AsyncMock(return_value={"success": True, "message": "API running", "error": None})
  149. mock.list_entities = AsyncMock(
  150. return_value=[
  151. {
  152. "entity_id": "switch.printer_plug",
  153. "friendly_name": "Printer Plug",
  154. "state": "on",
  155. "domain": "switch",
  156. },
  157. {"entity_id": "switch.test", "friendly_name": "Test Switch", "state": "off", "domain": "switch"},
  158. ]
  159. )
  160. mock.configure = MagicMock()
  161. # Copy mocks to second patch target
  162. mock2.turn_on = mock.turn_on
  163. mock2.turn_off = mock.turn_off
  164. mock2.toggle = mock.toggle
  165. mock2.get_status = mock.get_status
  166. mock2.get_energy = mock.get_energy
  167. mock2.test_connection = mock.test_connection
  168. mock2.list_entities = mock.list_entities
  169. mock2.configure = mock.configure
  170. yield mock
  171. @pytest.fixture
  172. def mock_mqtt_client():
  173. """Mock the MQTT client for printer communication tests."""
  174. with patch("backend.app.services.bambu_mqtt.BambuMQTTClient") as mock:
  175. instance = MagicMock()
  176. instance.state = MagicMock(connected=True, state="IDLE", progress=0, temperatures={"nozzle": 25, "bed": 25})
  177. instance.connect = MagicMock()
  178. instance.disconnect = MagicMock()
  179. mock.return_value = instance
  180. yield mock
  181. @pytest.fixture
  182. def mock_mqtt_smart_plug_service():
  183. """Mock the MQTT smart plug service for MQTT plug tests."""
  184. with patch("backend.app.api.routes.smart_plugs.mqtt_relay") as mock:
  185. # Create a mock smart_plug_service
  186. mock_service = MagicMock()
  187. mock_service.is_configured = MagicMock(return_value=True)
  188. mock_service.has_broker_settings = MagicMock(return_value=True)
  189. mock_service.configure = AsyncMock(return_value=True)
  190. mock_service.subscribe = MagicMock()
  191. mock_service.unsubscribe = MagicMock()
  192. mock_service.get_plug_data = MagicMock(return_value=None)
  193. mock_service.is_reachable = MagicMock(return_value=False)
  194. mock.smart_plug_service = mock_service
  195. yield mock
  196. @pytest.fixture
  197. def mock_ftp_client():
  198. """Mock the FTP client for file transfer tests."""
  199. with (
  200. patch("backend.app.services.bambu_ftp.download_file_async") as download_mock,
  201. patch("backend.app.services.bambu_ftp.list_files_async") as list_mock,
  202. ):
  203. download_mock.return_value = True
  204. list_mock.return_value = []
  205. yield {"download": download_mock, "list": list_mock}
  206. @pytest.fixture
  207. def mock_httpx_client():
  208. """Mock httpx for webhook/notification HTTP calls."""
  209. with patch("httpx.AsyncClient") as mock_class:
  210. mock_instance = AsyncMock()
  211. mock_response = MagicMock()
  212. mock_response.status_code = 200
  213. mock_response.text = "OK"
  214. mock_response.json.return_value = {}
  215. mock_instance.get = AsyncMock(return_value=mock_response)
  216. mock_instance.post = AsyncMock(return_value=mock_response)
  217. mock_instance.__aenter__ = AsyncMock(return_value=mock_instance)
  218. mock_instance.__aexit__ = AsyncMock()
  219. mock_class.return_value = mock_instance
  220. yield mock_instance
  221. @pytest.fixture
  222. def mock_printer_manager():
  223. """Mock the printer manager for status checks."""
  224. with patch("backend.app.services.printer_manager.printer_manager") as mock:
  225. mock.get_status = MagicMock(
  226. return_value=MagicMock(
  227. connected=True,
  228. state="IDLE",
  229. progress=0,
  230. temperatures={"nozzle": 25, "bed": 25, "chamber": 25},
  231. raw_data={},
  232. )
  233. )
  234. mock.mark_printer_offline = MagicMock()
  235. yield mock
  236. # ============================================================================
  237. # Factory Fixtures for Test Data
  238. # ============================================================================
  239. @pytest.fixture
  240. def smart_plug_factory(db_session):
  241. """Factory to create test smart plugs."""
  242. async def _create_plug(**kwargs):
  243. from backend.app.models.smart_plug import SmartPlug
  244. # Determine defaults based on plug_type
  245. plug_type = kwargs.get("plug_type", "tasmota")
  246. defaults = {
  247. "name": "Test Plug",
  248. "plug_type": plug_type,
  249. "enabled": True,
  250. "auto_on": True,
  251. "auto_off": True,
  252. "off_delay_mode": "time",
  253. "off_delay_minutes": 5,
  254. "off_temp_threshold": 70,
  255. "schedule_enabled": False,
  256. "power_alert_enabled": False,
  257. }
  258. # Set required fields based on plug_type
  259. if plug_type == "homeassistant":
  260. defaults["ha_entity_id"] = "switch.test"
  261. defaults["ip_address"] = None
  262. elif plug_type == "mqtt":
  263. # Legacy fields (for backward compatibility tests)
  264. defaults["mqtt_topic"] = kwargs.get("mqtt_topic", "test/topic")
  265. defaults["mqtt_multiplier"] = kwargs.get("mqtt_multiplier", 1.0)
  266. # New separate topic/path/multiplier fields
  267. defaults["mqtt_power_topic"] = kwargs.get("mqtt_power_topic")
  268. defaults["mqtt_power_path"] = kwargs.get("mqtt_power_path", "power")
  269. defaults["mqtt_power_multiplier"] = kwargs.get("mqtt_power_multiplier", 1.0)
  270. defaults["mqtt_energy_topic"] = kwargs.get("mqtt_energy_topic")
  271. defaults["mqtt_energy_path"] = kwargs.get("mqtt_energy_path")
  272. defaults["mqtt_energy_multiplier"] = kwargs.get("mqtt_energy_multiplier", 1.0)
  273. defaults["mqtt_state_topic"] = kwargs.get("mqtt_state_topic")
  274. defaults["mqtt_state_path"] = kwargs.get("mqtt_state_path")
  275. defaults["mqtt_state_on_value"] = kwargs.get("mqtt_state_on_value")
  276. defaults["ip_address"] = None
  277. defaults["ha_entity_id"] = None
  278. else:
  279. defaults["ip_address"] = "192.168.1.100"
  280. defaults["ha_entity_id"] = None
  281. defaults.update(kwargs)
  282. plug = SmartPlug(**defaults)
  283. db_session.add(plug)
  284. await db_session.commit()
  285. await db_session.refresh(plug)
  286. return plug
  287. return _create_plug
  288. @pytest.fixture
  289. def printer_factory(db_session):
  290. """Factory to create test printers."""
  291. _counter = [0] # Use list to allow mutation in nested function
  292. async def _create_printer(**kwargs):
  293. from backend.app.models.printer import Printer
  294. _counter[0] += 1
  295. counter = _counter[0]
  296. defaults = {
  297. "name": "Test Printer",
  298. "serial_number": f"00M09A{counter:09d}", # Unique serial per printer
  299. "ip_address": f"192.168.1.{100 + counter}", # Unique IP per printer
  300. "access_code": "12345678",
  301. "is_active": True,
  302. "auto_archive": True,
  303. "model": "X1C",
  304. }
  305. defaults.update(kwargs)
  306. printer = Printer(**defaults)
  307. db_session.add(printer)
  308. await db_session.commit()
  309. await db_session.refresh(printer)
  310. return printer
  311. return _create_printer
  312. @pytest.fixture
  313. def notification_provider_factory(db_session):
  314. """Factory to create test notification providers."""
  315. async def _create_provider(**kwargs):
  316. from backend.app.models.notification import NotificationProvider
  317. config = kwargs.pop("config", {"server": "https://ntfy.sh", "topic": "test-topic"})
  318. if isinstance(config, dict):
  319. config = json.dumps(config)
  320. defaults = {
  321. "name": "Test Provider",
  322. "provider_type": "ntfy",
  323. "enabled": True,
  324. "config": config,
  325. "on_print_start": True,
  326. "on_print_complete": True,
  327. "on_print_failed": True,
  328. "on_print_stopped": True,
  329. "on_print_progress": False,
  330. "on_printer_offline": False,
  331. "on_printer_error": False,
  332. "on_filament_low": False,
  333. "on_maintenance_due": False,
  334. "on_ams_humidity_high": False,
  335. "on_ams_temperature_high": False,
  336. "on_bed_cooled": False,
  337. "quiet_hours_enabled": False,
  338. "daily_digest_enabled": False,
  339. }
  340. defaults.update(kwargs)
  341. provider = NotificationProvider(**defaults)
  342. db_session.add(provider)
  343. await db_session.commit()
  344. await db_session.refresh(provider)
  345. return provider
  346. return _create_provider
  347. @pytest.fixture
  348. def archive_factory(db_session):
  349. """Factory to create test archives."""
  350. async def _create_archive(printer_id: int, **kwargs):
  351. from backend.app.models.archive import PrintArchive
  352. defaults = {
  353. "printer_id": printer_id,
  354. "filename": "test_print.gcode.3mf",
  355. "print_name": "Test Print",
  356. "file_path": "archives/test/test_print.gcode.3mf",
  357. "file_size": 1024000,
  358. "status": "completed",
  359. "filament_type": "PLA",
  360. "filament_used_grams": 50.0,
  361. "print_time_seconds": 3600,
  362. }
  363. defaults.update(kwargs)
  364. archive = PrintArchive(**defaults)
  365. db_session.add(archive)
  366. await db_session.commit()
  367. await db_session.refresh(archive)
  368. return archive
  369. return _create_archive
  370. # ============================================================================
  371. # Sample Data Fixtures
  372. # ============================================================================
  373. @pytest.fixture
  374. def sample_mqtt_print_start():
  375. """Sample MQTT message for print start."""
  376. return {
  377. "print": {
  378. "command": "project_file",
  379. "param": "/sdcard/test.gcode.3mf",
  380. "subtask_name": "test_print",
  381. "gcode_state": "RUNNING",
  382. "mc_percent": 0,
  383. }
  384. }
  385. @pytest.fixture
  386. def sample_mqtt_print_complete():
  387. """Sample MQTT message for print complete."""
  388. return {
  389. "print": {
  390. "gcode_state": "FINISH",
  391. "mc_percent": 100,
  392. "subtask_name": "test_print",
  393. }
  394. }
  395. @pytest.fixture
  396. def sample_printer_status():
  397. """Sample printer status data."""
  398. return {
  399. "connected": True,
  400. "state": "IDLE",
  401. "progress": 0,
  402. "layer_num": 0,
  403. "total_layers": 0,
  404. "temperatures": {
  405. "nozzle": 25.0,
  406. "bed": 25.0,
  407. "chamber": 25.0,
  408. },
  409. "remaining_time": 0,
  410. "filename": None,
  411. }
  412. # ============================================================================
  413. # Log Capture Fixtures for Error Detection
  414. # ============================================================================
  415. class LogCapture(logging.Handler):
  416. """Handler that captures log records for testing."""
  417. def __init__(self):
  418. super().__init__()
  419. self.records: list[logging.LogRecord] = []
  420. def emit(self, record: logging.LogRecord):
  421. self.records.append(record)
  422. def clear(self):
  423. self.records.clear()
  424. def get_errors(self) -> list[logging.LogRecord]:
  425. """Get all ERROR and CRITICAL level records."""
  426. return [r for r in self.records if r.levelno >= logging.ERROR]
  427. def get_warnings(self) -> list[logging.LogRecord]:
  428. """Get all WARNING level records."""
  429. return [r for r in self.records if r.levelno == logging.WARNING]
  430. def has_errors(self) -> bool:
  431. """Check if any errors were logged."""
  432. return len(self.get_errors()) > 0
  433. def format_errors(self) -> str:
  434. """Format all errors as a string for assertion messages."""
  435. errors = self.get_errors()
  436. if not errors:
  437. return "No errors"
  438. formatter = logging.Formatter("%(name)s - %(levelname)s - %(message)s")
  439. return "\n".join(formatter.format(r) for r in errors)
  440. @pytest.fixture
  441. def capture_logs():
  442. """Fixture that captures log output during a test.
  443. Usage:
  444. def test_something(capture_logs):
  445. # Do something that might log errors
  446. some_function()
  447. # Check no errors were logged
  448. assert not capture_logs.has_errors(), capture_logs.format_errors()
  449. """
  450. handler = LogCapture()
  451. handler.setLevel(logging.DEBUG)
  452. # Attach to root logger to capture all logs
  453. root_logger = logging.getLogger()
  454. root_logger.addHandler(handler)
  455. yield handler
  456. root_logger.removeHandler(handler)
  457. @pytest.fixture
  458. def assert_no_log_errors(capture_logs):
  459. """Fixture that automatically asserts no errors were logged.
  460. Usage:
  461. def test_something(assert_no_log_errors):
  462. # If any ERROR logs occur during this test, it will fail
  463. some_function()
  464. """
  465. yield capture_logs
  466. errors = capture_logs.get_errors()
  467. if errors:
  468. pytest.fail(f"Unexpected log errors:\n{capture_logs.format_errors()}")