test_runtime_tracking_pause.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. """Regression tests for the runtime-tracking task (#1521).
  2. The ``runtime_seconds`` counter on each printer feeds hours-based maintenance
  3. intervals (rod lubrication, belt checks, nozzle cleaning). It was accumulating
  4. elapsed time whenever ``state.state`` was ``RUNNING`` *or* ``PAUSE``, which
  5. meant a print paused for hours (e.g. overnight) inflated the maintenance
  6. clock without any actual mechanical wear. Fix excludes PAUSE; these tests
  7. pin the new contract.
  8. """
  9. from __future__ import annotations
  10. import asyncio
  11. from datetime import datetime, timedelta, timezone
  12. from types import SimpleNamespace
  13. from unittest.mock import patch
  14. import pytest
  15. from sqlalchemy import select
  16. from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
  17. async def _build_db_with_printer(*, runtime_seconds: int, last_runtime_update: datetime | None):
  18. """Spin up an in-memory DB with one active printer in the requested state."""
  19. import backend.app.models # noqa: F401 -- register all models on Base.metadata
  20. from backend.app.core.database import Base
  21. from backend.app.models.printer import Printer
  22. engine = create_async_engine("sqlite+aiosqlite:///:memory:", echo=False)
  23. async with engine.begin() as conn:
  24. await conn.run_sync(Base.metadata.create_all)
  25. session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
  26. async with session_maker() as db:
  27. db.add(
  28. Printer(
  29. id=1,
  30. name="P1",
  31. serial_number="S1",
  32. ip_address="1.1.1.1",
  33. access_code="x",
  34. is_active=True,
  35. runtime_seconds=runtime_seconds,
  36. last_runtime_update=last_runtime_update,
  37. )
  38. )
  39. await db.commit()
  40. return engine, session_maker
  41. async def _run_one_iteration(session_maker, state_value: str):
  42. """Run a single iteration of track_printer_runtime() against a mocked state.
  43. Patches ``asyncio.sleep`` to skip the startup wait and cancel after the
  44. first loop tick. Patches ``printer_manager.get_status`` to return a fake
  45. state with the requested ``.state`` value. Points the module-level
  46. ``async_session`` at the test DB so the loop's queries hit it.
  47. """
  48. from backend.app import main as app_main
  49. sleep_calls = {"count": 0}
  50. real_sleep = asyncio.sleep
  51. async def fake_sleep(seconds, *args, **kwargs):
  52. sleep_calls["count"] += 1
  53. # First sleep = the 15s startup wait. Second sleep = end-of-iteration
  54. # tick; raise here so the loop exits cleanly via its CancelledError
  55. # handler after exactly one work cycle.
  56. if sleep_calls["count"] >= 2:
  57. raise asyncio.CancelledError()
  58. # Yield control to keep the event loop healthy without blocking.
  59. await real_sleep(0)
  60. fake_state = SimpleNamespace(state=state_value, connected=True)
  61. # The loop's tail-of-iteration sleep is OUTSIDE its try/except, so the
  62. # CancelledError raised from fake_sleep propagates out of the function
  63. # rather than triggering the inner break — catch it at the test boundary.
  64. with (
  65. patch.object(app_main, "async_session", session_maker),
  66. patch.object(app_main.printer_manager, "get_status", return_value=fake_state),
  67. patch.object(app_main.asyncio, "sleep", fake_sleep),
  68. pytest.raises(asyncio.CancelledError),
  69. ):
  70. await app_main.track_printer_runtime()
  71. @pytest.mark.asyncio
  72. async def test_pause_state_does_not_accumulate_runtime():
  73. """PAUSE must NOT add to runtime_seconds — paused = no motion = no wear (#1521)."""
  74. seeded_runtime = 1000 # 1000s already accumulated
  75. seeded_last_update = datetime.now(timezone.utc) - timedelta(seconds=300) # 5min ago
  76. engine, session_maker = await _build_db_with_printer(
  77. runtime_seconds=seeded_runtime, last_runtime_update=seeded_last_update
  78. )
  79. await _run_one_iteration(session_maker, state_value="PAUSE")
  80. from backend.app.models.printer import Printer
  81. async with session_maker() as db:
  82. row = (await db.execute(select(Printer).where(Printer.id == 1))).scalar_one()
  83. # Runtime counter unchanged — the 5 minutes paused contributed nothing.
  84. assert row.runtime_seconds == seeded_runtime
  85. # last_runtime_update cleared on the non-running branch so the next
  86. # transition to RUNNING starts fresh and doesn't back-bill paused time.
  87. assert row.last_runtime_update is None
  88. await engine.dispose()
  89. @pytest.mark.asyncio
  90. async def test_running_state_still_accumulates_runtime():
  91. """RUNNING must continue to accumulate — the bug was scope, not the whole feature."""
  92. seeded_runtime = 1000
  93. seeded_last_update = datetime.now(timezone.utc) - timedelta(seconds=60)
  94. engine, session_maker = await _build_db_with_printer(
  95. runtime_seconds=seeded_runtime, last_runtime_update=seeded_last_update
  96. )
  97. await _run_one_iteration(session_maker, state_value="RUNNING")
  98. from backend.app.models.printer import Printer
  99. async with session_maker() as db:
  100. row = (await db.execute(select(Printer).where(Printer.id == 1))).scalar_one()
  101. # Wall-clock elapsed since seeded_last_update should now be added.
  102. # Allow a generous lower bound (≥30s) — actual elapsed depends on
  103. # how fast the test runs, but it MUST have grown past the seed.
  104. assert row.runtime_seconds > seeded_runtime
  105. assert row.runtime_seconds >= seeded_runtime + 30
  106. assert row.last_runtime_update is not None
  107. await engine.dispose()
  108. @pytest.mark.asyncio
  109. async def test_idle_state_clears_last_update_without_accumulating():
  110. """A non-active state (FINISH/IDLE/PREPARE/etc.) must clear last_runtime_update
  111. so a later RUNNING transition doesn't retroactively back-bill all the idle time."""
  112. seeded_runtime = 1000
  113. seeded_last_update = datetime.now(timezone.utc) - timedelta(seconds=3600) # 1h ago
  114. engine, session_maker = await _build_db_with_printer(
  115. runtime_seconds=seeded_runtime, last_runtime_update=seeded_last_update
  116. )
  117. await _run_one_iteration(session_maker, state_value="FINISH")
  118. from backend.app.models.printer import Printer
  119. async with session_maker() as db:
  120. row = (await db.execute(select(Printer).where(Printer.id == 1))).scalar_one()
  121. assert row.runtime_seconds == seeded_runtime # no accumulation
  122. assert row.last_runtime_update is None # cleared, prevents back-bill
  123. await engine.dispose()