Browse Source

fix(scheduler): watchdog falsely reverts slow H2D dispatches, causing reprints (#1078)

  _watchdog_print_start reverted queue items to "pending" at 45 s if
  gcode_state hadn't changed, assuming the MQTT project_file was swallowed
  by a half-broken session (#887/#967). H2D Pro firmware (01.01.00.00)
  routinely keeps state=FINISH for 48-55 s after actually accepting the
  command before transitioning to PREPARE. The watchdog reverted items
  the printer had already started physically printing; the archive updated
  normally via _active_prints, but the queue item was now "pending" again,
  and the next scheduler tick after plate-clear re-dispatched the same
  item as if it had never run. With one item left in the queue that looked
  like a reprint of the just-finished job; with multiple items the
  symptom was masked by item N+1 getting dispatched during the race.

  Add a second "command landed" signal: subtask_id advancing past the
  pre-dispatch value. Bambuddy already mints a unique submission_id per
  project_file publish (#1042) and the printer echoes it back on the next
  push_status as soon as it starts processing the command - well before
  gcode_state transitions on slow-transition models. _start_print now
  captures pre_subtask_id alongside pre_state and passes both to the
  watchdog, which exits early on either a state change or a subtask_id
  advance.

  Raise default timeout 45 s → 90 s as belt-and-braces for printers that
  neither flip state nor echo subtask_id inside the polling window.
  Genuinely half-broken sessions (both signals unchanged across the full
  90 s) still revert + force-reconnect exactly as before.

  Transient subtask_id=None during reconnect is not mis-detected as a
  change. pre_subtask_id=None falls back to state-only checking so the
  fix is safe for printers that haven't reported a subtask_id yet.

  New test_scheduler_watchdog.py pins the eight behaviours that matter:
  pickup via state change; pickup via subtask_id change with state still
  FINISH (the exact #1078 case); revert when neither signal changes;
  default timeout is 90 s; pre_subtask_id=None state-only fallback;
  current subtask_id=None not treated as change; printer disconnect
  mid-watchdog leaves DB untouched; item that already moved on is not
  clobbered.
maziggy 1 month ago
parent
commit
0918907dab

File diff suppressed because it is too large
+ 0 - 0
CHANGELOG.md


+ 42 - 9
backend/app/services/print_scheduler.py

@@ -1833,9 +1833,12 @@ class PrintScheduler:
         logger.info("Queue item %s: Status set to 'printing', sending print command...", item.id)
         logger.info("Queue item %s: Status set to 'printing', sending print command...", item.id)
 
 
         # Capture state before dispatch so the watchdog can detect whether the
         # Capture state before dispatch so the watchdog can detect whether the
-        # printer actually transitioned (#967).
+        # printer actually transitioned (#967). Also capture subtask_id so the
+        # watchdog can recognise "command landed but state hasn't flipped yet"
+        # on slow H2D transitions (#1078).
         pre_status = printer_manager.get_status(item.printer_id)
         pre_status = printer_manager.get_status(item.printer_id)
         pre_state = getattr(pre_status, "state", None) if pre_status else None
         pre_state = getattr(pre_status, "state", None) if pre_status else None
+        pre_subtask_id = getattr(pre_status, "subtask_id", None) if pre_status else None
 
 
         # Start the print with AMS mapping, plate_id and print options
         # Start the print with AMS mapping, plate_id and print options
         started = printer_manager.start_print(
         started = printer_manager.start_print(
@@ -1854,12 +1857,23 @@ class PrintScheduler:
         if started:
         if started:
             logger.info("Queue item %s: Print started successfully - %s", item.id, filename)
             logger.info("Queue item %s: Print started successfully - %s", item.id, filename)
 
 
-            # Watchdog: if the printer never transitions out of pre_state, the MQTT
-            # publish was accepted locally but didn't reach the printer (half-broken
-            # session — same shape as #887/#936). Revert the queue item so the next
-            # dispatch can pick it up instead of leaving it stuck in "printing" (#967).
+            # Watchdog: if the printer never transitions out of pre_state AND
+            # never advances subtask_id, the MQTT publish was accepted locally but
+            # didn't reach the printer (half-broken session — same shape as
+            # #887/#936). Revert the queue item so the next dispatch can pick it
+            # up instead of leaving it stuck in "printing" (#967). subtask_id
+            # check avoids false reverts on slow H2D FINISH→PREPARE transitions
+            # that would otherwise cause the item to re-dispatch as a reprint
+            # of the just-finished job (#1078).
             if pre_state:
             if pre_state:
-                asyncio.create_task(self._watchdog_print_start(item.id, item.printer_id, pre_state))
+                asyncio.create_task(
+                    self._watchdog_print_start(
+                        item.id,
+                        item.printer_id,
+                        pre_state,
+                        pre_subtask_id,
+                    )
+                )
 
 
             # Get estimated time for notification
             # Get estimated time for notification
             estimated_time = None
             estimated_time = None
@@ -1930,7 +1944,8 @@ class PrintScheduler:
         queue_item_id: int,
         queue_item_id: int,
         printer_id: int,
         printer_id: int,
         pre_state: str,
         pre_state: str,
-        timeout: float = 45.0,
+        pre_subtask_id: str | None = None,
+        timeout: float = 90.0,
         poll_interval: float = 3.0,
         poll_interval: float = 3.0,
     ) -> None:
     ) -> None:
         """Revert a queue item if the printer never acknowledges the start command.
         """Revert a queue item if the printer never acknowledges the start command.
@@ -1939,6 +1954,20 @@ class PrintScheduler:
         MQTT project_file publish succeeds locally. If the printer drops/ignores the
         MQTT project_file publish succeeds locally. If the printer drops/ignores the
         command (half-broken MQTT session — #887/#936), the state never transitions
         command (half-broken MQTT session — #887/#936), the state never transitions
         and the item would otherwise stay stuck in "printing" forever (#967).
         and the item would otherwise stay stuck in "printing" forever (#967).
+
+        Exit paths (printer picked up the job — no revert):
+          - gcode_state changed from pre_state, OR
+          - subtask_id advanced past pre_subtask_id — the printer echoes our
+            per-dispatch identity back on push_status, so a subtask_id change is
+            a definitive "command landed" signal even while state is still FINISH.
+            H2D can sit at FINISH for ~50 s after accepting project_file before
+            transitioning to PREPARE, which used to trip the state-only watchdog
+            and caused the scheduler to revert + re-dispatch the item; the next
+            successful dispatch then looked like a reprint of the just-finished
+            job (#1078).
+
+        Timeout raised from 45 s → 90 s as belt-and-braces for slow transitions
+        that also don't emit an early subtask_id tick.
         """
         """
         deadline = time.monotonic() + timeout
         deadline = time.monotonic() + timeout
         while time.monotonic() < deadline:
         while time.monotonic() < deadline:
@@ -1947,7 +1976,9 @@ class PrintScheduler:
             if not status:
             if not status:
                 return  # Printer disconnected — don't mess with the DB
                 return  # Printer disconnected — don't mess with the DB
             if status.state != pre_state:
             if status.state != pre_state:
-                return  # Printer picked up the job
+                return  # Printer picked up the job (state transition)
+            if pre_subtask_id is not None and status.subtask_id is not None and status.subtask_id != pre_subtask_id:
+                return  # Printer picked up the job (subtask_id advanced)
 
 
         # No transition. Revert the item so the scheduler can retry.
         # No transition. Revert the item so the scheduler can retry.
         async with async_session() as db:
         async with async_session() as db:
@@ -1959,11 +1990,13 @@ class PrintScheduler:
             await db.commit()
             await db.commit()
             logger.warning(
             logger.warning(
                 "Queue item %s: printer %d did not respond to print command within "
                 "Queue item %s: printer %d did not respond to print command within "
-                "%.0fs (state still %s) — reverted to 'pending' for retry (#967)",
+                "%.0fs (state still %s, subtask_id still %s) — reverted to 'pending' "
+                "for retry (#967)",
                 queue_item_id,
                 queue_item_id,
                 printer_id,
                 printer_id,
                 timeout,
                 timeout,
                 pre_state,
                 pre_state,
+                pre_subtask_id,
             )
             )
 
 
         # Same half-broken-session recovery as background_dispatch: force the
         # Same half-broken-session recovery as background_dispatch: force the

+ 260 - 0
backend/tests/unit/test_scheduler_watchdog.py

@@ -0,0 +1,260 @@
+"""Regression tests for ``_watchdog_print_start``.
+
+The watchdog reverts queue items to ``pending`` when a dispatched print never
+lands on the printer (half-broken MQTT session — #887/#936/#967). H2D firmware
+can sit at ``FINISH`` for 50+ seconds after accepting a ``project_file``
+command before flipping ``gcode_state`` to ``PREPARE``, which used to trip the
+state-only watchdog and cause the scheduler to revert the item; the subsequent
+successful dispatch then looked like a reprint of the just-finished job (#1078).
+
+The fix: treat ``subtask_id`` advancing past the pre-dispatch value as an
+equivalent "command landed" signal, and raise the timeout from 45 s to 90 s as
+belt-and-braces for slow transitions that also don't emit an early subtask_id
+tick.
+"""
+
+from types import SimpleNamespace
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from backend.app.models.print_queue import PrintQueueItem
+from backend.app.services.print_scheduler import PrintScheduler
+
+
+@pytest.fixture
+async def db_session():
+    """In-memory SQLite with one ``printing`` queue item at id=1."""
+    from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
+
+    import backend.app.models  # noqa: F401  — populate Base.metadata
+    from backend.app.core.database import Base
+
+    engine = create_async_engine("sqlite+aiosqlite:///:memory:", echo=False)
+    async with engine.begin() as conn:
+        await conn.run_sync(Base.metadata.create_all)
+    session_maker = async_sessionmaker(engine, expire_on_commit=False)
+
+    async with session_maker() as db:
+        db.add(PrintQueueItem(id=1, printer_id=42, archive_id=99, status="printing"))
+        await db.commit()
+
+    try:
+        yield session_maker
+    finally:
+        await engine.dispose()
+
+
+def _status(state: str, subtask_id: str | None = None):
+    """Minimal stand-in for PrinterState — only the two fields the watchdog reads."""
+    return SimpleNamespace(state=state, subtask_id=subtask_id)
+
+
+class TestWatchdogExitsEarlyOnPickup:
+    """The watchdog must NOT revert when the printer has clearly picked up the job."""
+
+    @pytest.mark.asyncio
+    async def test_exits_on_state_change(self, db_session):
+        """State transitioning away from pre_state is the primary "accepted" signal."""
+        get_status = MagicMock(return_value=_status("RUNNING", "OLD_SUBTASK"))
+        with (
+            patch("backend.app.services.print_scheduler.printer_manager.get_status", get_status),
+            patch("backend.app.services.print_scheduler.async_session", db_session),
+        ):
+            await PrintScheduler._watchdog_print_start(
+                queue_item_id=1,
+                printer_id=42,
+                pre_state="FINISH",
+                pre_subtask_id="OLD_SUBTASK",
+                timeout=0.3,
+                poll_interval=0.05,
+            )
+
+        # Item should remain "printing" — watchdog recognised the pickup.
+        async with db_session() as db:
+            item = await db.get(PrintQueueItem, 1)
+            assert item.status == "printing"
+
+    @pytest.mark.asyncio
+    async def test_exits_on_subtask_id_change_even_if_state_still_finish(self, db_session):
+        """Regression for #1078: H2D keeps state=FINISH for ~50 s after accepting
+        project_file, but subtask_id flips to our new submission_id almost
+        immediately. That must short-circuit the revert."""
+        get_status = MagicMock(return_value=_status("FINISH", "NEW_SUBTASK_12345"))
+        with (
+            patch("backend.app.services.print_scheduler.printer_manager.get_status", get_status),
+            patch("backend.app.services.print_scheduler.async_session", db_session),
+        ):
+            await PrintScheduler._watchdog_print_start(
+                queue_item_id=1,
+                printer_id=42,
+                pre_state="FINISH",
+                pre_subtask_id="OLD_SUBTASK_99999",
+                timeout=0.3,
+                poll_interval=0.05,
+            )
+
+        async with db_session() as db:
+            item = await db.get(PrintQueueItem, 1)
+            assert item.status == "printing", (
+                "subtask_id advanced past pre_subtask_id — the printer accepted our "
+                "project_file and the watchdog must not revert the queue item even "
+                "though state is still FINISH (#1078)"
+            )
+
+
+class TestWatchdogRevertsWhenStuck:
+    """Genuine half-broken sessions still need the revert + reconnect recovery."""
+
+    @pytest.mark.asyncio
+    async def test_reverts_when_neither_state_nor_subtask_id_changes(self, db_session):
+        """Both signals unchanged across the full timeout → revert to pending
+        and force MQTT reconnect (the #967 recovery path)."""
+        get_status = MagicMock(return_value=_status("FINISH", "OLD_SUBTASK"))
+        client = MagicMock()
+        get_client = MagicMock(return_value=client)
+
+        with (
+            patch("backend.app.services.print_scheduler.printer_manager.get_status", get_status),
+            patch("backend.app.services.print_scheduler.printer_manager.get_client", get_client),
+            patch("backend.app.services.print_scheduler.async_session", db_session),
+        ):
+            await PrintScheduler._watchdog_print_start(
+                queue_item_id=1,
+                printer_id=42,
+                pre_state="FINISH",
+                pre_subtask_id="OLD_SUBTASK",
+                timeout=0.2,
+                poll_interval=0.05,
+            )
+
+        async with db_session() as db:
+            item = await db.get(PrintQueueItem, 1)
+            assert item.status == "pending"
+            assert item.started_at is None
+
+        client.force_reconnect_stale_session.assert_called_once()
+
+    @pytest.mark.asyncio
+    async def test_default_timeout_is_90_seconds(self):
+        """The default timeout must cover slow H2D FINISH→PREPARE transitions
+        (~50 s observed). A 45 s default would trip on the exact scenario the
+        subtask_id check is guarding against, leaving no fallback for printers
+        that don't echo subtask_id."""
+        import inspect
+
+        sig = inspect.signature(PrintScheduler._watchdog_print_start)
+        assert sig.parameters["timeout"].default == 90.0
+
+
+class TestWatchdogFallbackBehaviour:
+    """Backwards-compat and defensive behaviour around missing data."""
+
+    @pytest.mark.asyncio
+    async def test_pre_subtask_id_none_falls_back_to_state_only(self, db_session):
+        """When we never captured a pre-dispatch subtask_id (e.g. printer just
+        connected), the watchdog must still work on the state signal alone —
+        and still revert when state stays unchanged, so half-broken sessions
+        are still recovered."""
+        get_status = MagicMock(return_value=_status("FINISH", "SOMETHING"))
+        get_client = MagicMock(return_value=None)
+
+        with (
+            patch("backend.app.services.print_scheduler.printer_manager.get_status", get_status),
+            patch("backend.app.services.print_scheduler.printer_manager.get_client", get_client),
+            patch("backend.app.services.print_scheduler.async_session", db_session),
+        ):
+            await PrintScheduler._watchdog_print_start(
+                queue_item_id=1,
+                printer_id=42,
+                pre_state="FINISH",
+                pre_subtask_id=None,
+                timeout=0.2,
+                poll_interval=0.05,
+            )
+
+        async with db_session() as db:
+            item = await db.get(PrintQueueItem, 1)
+            assert item.status == "pending"
+
+    @pytest.mark.asyncio
+    async def test_current_subtask_id_none_does_not_trigger_early_exit(self, db_session):
+        """If the printer transiently reports subtask_id=None (e.g. during
+        reconnect), that must not be treated as "changed" — otherwise the
+        watchdog would exit early without a real pickup signal and leave the
+        item stuck in "printing" after a genuinely broken session."""
+        get_status = MagicMock(return_value=_status("FINISH", None))
+        get_client = MagicMock(return_value=None)
+
+        with (
+            patch("backend.app.services.print_scheduler.printer_manager.get_status", get_status),
+            patch("backend.app.services.print_scheduler.printer_manager.get_client", get_client),
+            patch("backend.app.services.print_scheduler.async_session", db_session),
+        ):
+            await PrintScheduler._watchdog_print_start(
+                queue_item_id=1,
+                printer_id=42,
+                pre_state="FINISH",
+                pre_subtask_id="OLD_SUBTASK",
+                timeout=0.2,
+                poll_interval=0.05,
+            )
+
+        async with db_session() as db:
+            item = await db.get(PrintQueueItem, 1)
+            assert item.status == "pending"
+
+    @pytest.mark.asyncio
+    async def test_printer_disconnected_returns_without_reverting(self, db_session):
+        """If the printer drops during the watchdog window, don't touch the DB —
+        the reconnect path will sort the queue state out."""
+        get_status = MagicMock(return_value=None)
+
+        with (
+            patch("backend.app.services.print_scheduler.printer_manager.get_status", get_status),
+            patch("backend.app.services.print_scheduler.async_session", db_session),
+        ):
+            await PrintScheduler._watchdog_print_start(
+                queue_item_id=1,
+                printer_id=42,
+                pre_state="FINISH",
+                pre_subtask_id="OLD_SUBTASK",
+                timeout=0.2,
+                poll_interval=0.05,
+            )
+
+        async with db_session() as db:
+            item = await db.get(PrintQueueItem, 1)
+            assert item.status == "printing"
+
+    @pytest.mark.asyncio
+    async def test_no_revert_if_item_already_completed(self, db_session):
+        """If the print completed between watchdog arm-time and timeout (item is
+        no longer "printing"), the watchdog must not clobber whatever status it
+        ended up in — #967 race guard."""
+        # Move item on to "completed" before the watchdog fires.
+        async with db_session() as db:
+            item = await db.get(PrintQueueItem, 1)
+            item.status = "completed"
+            await db.commit()
+
+        get_status = MagicMock(return_value=_status("FINISH", "OLD_SUBTASK"))
+        get_client = MagicMock(return_value=None)
+
+        with (
+            patch("backend.app.services.print_scheduler.printer_manager.get_status", get_status),
+            patch("backend.app.services.print_scheduler.printer_manager.get_client", get_client),
+            patch("backend.app.services.print_scheduler.async_session", db_session),
+        ):
+            await PrintScheduler._watchdog_print_start(
+                queue_item_id=1,
+                printer_id=42,
+                pre_state="FINISH",
+                pre_subtask_id="OLD_SUBTASK",
+                timeout=0.2,
+                poll_interval=0.05,
+            )
+
+        async with db_session() as db:
+            item = await db.get(PrintQueueItem, 1)
+            assert item.status == "completed"  # untouched

Some files were not shown because too many files changed in this diff