Browse Source

fix(scheduler): post-dispatch hold prevents H2D Pro double-fire (#1157)

  Multi-plate batches scheduled to the same H2D Pro were triple-dispatched
  within ~60 s — observed in user logs as queue items 139/140/141 all
  flipping to status='printing' even though the printer was still
  digesting the first project_file (FINISH for 80-210 s before flipping
  to PREPARE). The DB busy_printers seed at print_scheduler.py:145 was
  empirically missing the in-flight items in this window; without
  database access I cannot pin the exact why, but the guard is unreliable.

  Add a defensive in-memory dispatch hold:
  - _start_print captures (dispatched_at, pre_state, pre_subtask_id) per
    printer
  - check_queue augments busy_printers with any printer still inside its
    hold window (60 s minimum cooldown, 180 s hard timeout)
  - _watchdog_print_start releases the hold once it observes a state or
    subtask_id transition (success path), or on the existing 90 s revert
    (unhappy path), or on disconnect

  Pure additive — alongside the existing seed query and _is_printer_idle.
  Doesn't depend on DB row visibility or on_print_complete firing
  correctly. Per-printer isolated. Watchdog kept as @staticmethod so the
  existing 12 watchdog tests pass unchanged; hold-release calls go
  through the module-level scheduler instance.
maziggy 4 weeks ago
parent
commit
724bc92c22

File diff suppressed because it is too large
+ 0 - 0
CHANGELOG.md


+ 116 - 3
backend/app/services/print_scheduler.py

@@ -71,6 +71,24 @@ class PrintScheduler:
         self._min_drying_seconds = 1800  # 30 minutes minimum before humidity re-check can stop drying
         # Track which printers are currently auto-drying (printer_id -> start timestamp)
         self._drying_in_progress: dict[int, float] = {}
+        # Defensive in-memory dispatch hold (#1157): a printer that just received
+        # a project_file command must not get a second dispatch until either it
+        # transitions out of pre_state OR the hard timeout expires. The H2D Pro
+        # can take 80–210 s to flip FINISH→PREPARE after project_file, and
+        # during that window the DB busy_printers seed is empirically unreliable
+        # (multi-plate batches double-/triple-dispatched onto the same printer
+        # 30 s apart). Keyed by printer_id; cleared by the watchdog on success
+        # or revert.
+        # printer_id -> (monotonic_started_at, pre_state, pre_subtask_id)
+        self._dispatch_holds: dict[int, tuple[float, str, str | None]] = {}
+        # Minimum cooldown between dispatches to the same printer (covers the
+        # H2D's project_file digestion window).
+        self._dispatch_min_cooldown = 60.0
+        # Hard timeout — drop the hold even if we never observed a transition,
+        # so a lost MQTT session can't lock a printer out of the queue forever.
+        # Matches the watchdog timeout (90 s) plus a safety margin so the
+        # watchdog runs first on the unhappy path.
+        self._dispatch_max_hold = 180.0
 
     async def run(self):
         """Main loop - check queue every interval."""
@@ -149,6 +167,18 @@ class PrintScheduler:
             )
             busy_printers: set[int] = {pid for (pid,) in busy_result.all() if pid is not None}
 
+            # Defense-in-depth (#1157): augment busy_printers with any printer
+            # still in its post-dispatch hold window. Empirically, the DB seed
+            # above can miss in-flight items in a multi-plate batch — same-file
+            # plates were being dispatched 30 s apart while the H2D was still
+            # digesting the first project_file. The hold is keyed in-memory and
+            # released by the watchdog on the success path, so it adds a layer
+            # that doesn't depend on DB row visibility or completion-callback
+            # timing.
+            for held_printer_id in list(self._dispatch_holds.keys()):
+                if self._printer_in_dispatch_hold(held_printer_id):
+                    busy_printers.add(held_printer_id)
+
             # Log skip reasons once per queue check (not per item)
             skip_reasons: dict[str, int] = {}
 
@@ -1118,6 +1148,70 @@ class PrintScheduler:
 
         return mapping
 
+    def _mark_printer_dispatched(
+        self,
+        printer_id: int,
+        pre_state: str | None,
+        pre_subtask_id: str | None,
+    ) -> None:
+        """Record that a print command was just sent to ``printer_id``.
+
+        Held until either the watchdog observes a state/subtask transition
+        (success path) or the hard timeout expires. See ``_dispatch_holds``.
+        """
+        if not pre_state:
+            # No pre_state means we can't detect a transition — fall back to a
+            # pure time-based hold using empty string as a sentinel that won't
+            # match any real printer state.
+            pre_state = ""
+        self._dispatch_holds[printer_id] = (time.monotonic(), pre_state, pre_subtask_id)
+
+    def _release_dispatch_hold(self, printer_id: int) -> None:
+        """Drop the dispatch hold for ``printer_id`` (called by the watchdog)."""
+        self._dispatch_holds.pop(printer_id, None)
+
+    def _printer_in_dispatch_hold(self, printer_id: int) -> bool:
+        """True if ``printer_id`` is still inside its post-dispatch hold window.
+
+        Returns False (and clears the hold) once any of these are true:
+          - hard timeout (``_dispatch_max_hold``) has elapsed
+          - the printer has transitioned out of pre_state and we're past the
+            minimum cooldown
+          - the printer's subtask_id has advanced past pre_subtask_id and we're
+            past the minimum cooldown
+        Otherwise the printer is held — caller should treat it as busy.
+        """
+        entry = self._dispatch_holds.get(printer_id)
+        if not entry:
+            return False
+        started_at, pre_state, pre_subtask_id = entry
+        elapsed = time.monotonic() - started_at
+
+        if elapsed >= self._dispatch_max_hold:
+            self._dispatch_holds.pop(printer_id, None)
+            return False
+
+        # Without a pre_state we can't detect a transition — fall back to the
+        # min cooldown alone, then drop the hold.
+        if not pre_state:
+            if elapsed >= self._dispatch_min_cooldown:
+                self._dispatch_holds.pop(printer_id, None)
+                return False
+            return True
+
+        status = printer_manager.get_status(printer_id)
+        current_state = getattr(status, "state", None) if status else None
+        current_subtask_id = getattr(status, "subtask_id", None) if status else None
+        transitioned = (current_state is not None and current_state != pre_state) or (
+            pre_subtask_id is not None and current_subtask_id is not None and current_subtask_id != pre_subtask_id
+        )
+
+        if transitioned and elapsed >= self._dispatch_min_cooldown:
+            self._dispatch_holds.pop(printer_id, None)
+            return False
+
+        return True
+
     def _is_printer_idle(self, printer_id: int, require_plate_clear: bool = True) -> bool:
         """Check if a printer is connected and idle."""
         if not printer_manager.is_connected(printer_id):
@@ -1870,6 +1964,12 @@ class PrintScheduler:
         if started:
             logger.info("Queue item %s: Print started successfully - %s", item.id, filename)
 
+            # Hold the printer against further dispatches until the watchdog
+            # confirms the printer transitioned (or until the hard timeout).
+            # Prevents multi-plate batches from triple-dispatching onto the
+            # same H2D Pro while it digests the first project_file (#1157).
+            self._mark_printer_dispatched(item.printer_id, pre_state, pre_subtask_id)
+
             # Watchdog: if the printer never transitions out of pre_state AND
             # never advances subtask_id, the MQTT publish was accepted locally but
             # didn't reach the printer (half-broken session — same shape as
@@ -1990,14 +2090,27 @@ class PrintScheduler:
             await asyncio.sleep(poll_interval)
             status = printer_manager.get_status(printer_id)
             if not status:
-                return  # Printer disconnected — don't mess with the DB
+                # Printer disconnected — don't mess with the DB. Drop the
+                # in-memory dispatch hold too so a fresh dispatch can retry
+                # once the printer comes back; the hard timeout would
+                # otherwise hold the printer unnecessarily.
+                scheduler._release_dispatch_hold(printer_id)
+                return
             last_status = status
             if status.state != pre_state:
-                return  # Printer picked up the job (state transition)
+                # Printer picked up the job (state transition) — release the
+                # post-dispatch hold so the next pending item for this printer
+                # can be evaluated normally.
+                scheduler._release_dispatch_hold(printer_id)
+                return
             if pre_subtask_id is not None and status.subtask_id is not None and status.subtask_id != pre_subtask_id:
-                return  # Printer picked up the job (subtask_id advanced)
+                # Printer picked up the job (subtask_id advanced)
+                scheduler._release_dispatch_hold(printer_id)
+                return
 
         # No transition. Revert the item so the scheduler can retry.
+        # Drop the in-memory hold so the retry isn't blocked by it.
+        scheduler._release_dispatch_hold(printer_id)
         async with async_session() as db:
             item = await db.get(PrintQueueItem, queue_item_id)
             if not item or item.status != "printing":

+ 191 - 0
backend/tests/unit/test_scheduler_dispatch_hold.py

@@ -0,0 +1,191 @@
+"""Regression tests for the in-memory dispatch hold (#1157).
+
+When the scheduler dispatches a print, it records a per-printer hold that
+prevents a second dispatch onto the same printer until either the printer
+transitions out of pre_state OR a hard timeout expires. This is defense in
+depth alongside the DB ``busy_printers`` seed.
+
+Why it exists: on the H2D Pro, ``project_file`` ack can take 80–210 s. During
+that window users were getting 3 plates of the same multi-plate file
+dispatched 30 s apart onto the same printer — the seed query was empirically
+missing in-flight items even though the queue items were marked ``printing``
+in the DB. The hold removes the dependency on DB-row visibility / completion-
+callback timing for this guard.
+"""
+
+from types import SimpleNamespace
+from unittest.mock import MagicMock, patch
+
+from backend.app.services.print_scheduler import PrintScheduler
+
+
+def _status(state: str, subtask_id: str | None = None):
+    return SimpleNamespace(state=state, subtask_id=subtask_id, gcode_file=None)
+
+
+class TestDispatchHoldHoldsThePrinter:
+    """A printer that just received a project_file is locked out of new
+    dispatches until something releases it."""
+
+    def test_held_immediately_after_mark(self):
+        sched = PrintScheduler()
+        get_status = MagicMock(return_value=_status("FINISH", "subtask-1"))
+        with patch("backend.app.services.print_scheduler.printer_manager.get_status", get_status):
+            sched._mark_printer_dispatched(42, pre_state="FINISH", pre_subtask_id="subtask-1")
+            assert sched._printer_in_dispatch_hold(42) is True
+
+    def test_unmarked_printer_not_held(self):
+        sched = PrintScheduler()
+        assert sched._printer_in_dispatch_hold(42) is False
+
+    def test_state_unchanged_keeps_hold(self):
+        """Printer still reporting pre_state with no subtask_id advance ⇒ held.
+
+        This is the main scenario: H2D Pro at FINISH for ~80 s after
+        ``project_file``; the scheduler must not double-dispatch into that
+        window.
+        """
+        sched = PrintScheduler()
+        get_status = MagicMock(return_value=_status("FINISH", "subtask-1"))
+        with patch("backend.app.services.print_scheduler.printer_manager.get_status", get_status):
+            sched._mark_printer_dispatched(42, pre_state="FINISH", pre_subtask_id="subtask-1")
+            assert sched._printer_in_dispatch_hold(42) is True
+
+
+class TestDispatchHoldReleases:
+    """The hold must release once the printer has actually picked up the job,
+    so the next pending item for this printer can dispatch normally."""
+
+    def test_release_via_explicit_call(self):
+        sched = PrintScheduler()
+        sched._mark_printer_dispatched(42, "FINISH", "subtask-1")
+        sched._release_dispatch_hold(42)
+        assert 42 not in sched._dispatch_holds
+
+    def test_release_is_idempotent(self):
+        sched = PrintScheduler()
+        sched._release_dispatch_hold(42)  # never marked
+        sched._release_dispatch_hold(42)  # double-release
+        assert 42 not in sched._dispatch_holds
+
+    def test_state_transition_after_min_cooldown_releases(self):
+        """If the printer transitions away from pre_state AND the minimum
+        cooldown has elapsed, the hold drops on the next check."""
+        sched = PrintScheduler()
+        sched._dispatch_min_cooldown = 0.0  # Skip the cooldown floor for this test
+        get_status = MagicMock(return_value=_status("PREPARE", "subtask-1"))
+        with patch("backend.app.services.print_scheduler.printer_manager.get_status", get_status):
+            sched._mark_printer_dispatched(42, pre_state="FINISH", pre_subtask_id="subtask-1")
+            assert sched._printer_in_dispatch_hold(42) is False
+            assert 42 not in sched._dispatch_holds
+
+    def test_subtask_id_advance_releases(self):
+        """H2D firmware can echo the new subtask_id back on push_status before
+        flipping gcode_state — that's also a definitive 'job accepted' signal,
+        same shape as the existing watchdog logic (#1078)."""
+        sched = PrintScheduler()
+        sched._dispatch_min_cooldown = 0.0
+        get_status = MagicMock(return_value=_status("FINISH", "new-subtask-99"))
+        with patch("backend.app.services.print_scheduler.printer_manager.get_status", get_status):
+            sched._mark_printer_dispatched(42, pre_state="FINISH", pre_subtask_id="old-subtask-1")
+            assert sched._printer_in_dispatch_hold(42) is False
+
+    def test_transition_within_cooldown_still_holds(self):
+        """Even after a state transition, hold for at least min_cooldown so a
+        slow printer that briefly pulses through PREPARE→RUNNING→PREPARE
+        doesn't open a window for double-dispatch."""
+        sched = PrintScheduler()
+        sched._dispatch_min_cooldown = 60.0
+        get_status = MagicMock(return_value=_status("PREPARE", "subtask-1"))
+        with patch("backend.app.services.print_scheduler.printer_manager.get_status", get_status):
+            sched._mark_printer_dispatched(42, pre_state="FINISH", pre_subtask_id="subtask-1")
+            # Cooldown not elapsed (just-marked) → still held even though
+            # state already transitioned.
+            assert sched._printer_in_dispatch_hold(42) is True
+
+
+class TestDispatchHoldHardTimeout:
+    """A lost MQTT session must not lock a printer out of the queue forever."""
+
+    def test_hard_timeout_drops_hold(self):
+        sched = PrintScheduler()
+        sched._dispatch_max_hold = 0.001  # ~1 ms — instant expiry
+        get_status = MagicMock(return_value=_status("FINISH", "subtask-1"))
+        with patch("backend.app.services.print_scheduler.printer_manager.get_status", get_status):
+            sched._mark_printer_dispatched(42, pre_state="FINISH", pre_subtask_id="subtask-1")
+            import time
+
+            time.sleep(0.005)
+            assert sched._printer_in_dispatch_hold(42) is False
+            assert 42 not in sched._dispatch_holds
+
+
+class TestDispatchHoldFallbacks:
+    """Edge cases around missing pre-dispatch data."""
+
+    def test_no_pre_state_falls_back_to_time_only_hold(self):
+        """If the printer was disconnected at dispatch time we have no
+        pre_state to compare against. Hold for the minimum cooldown anyway —
+        better than allowing an immediate second dispatch onto a printer we
+        couldn't even read state from."""
+        sched = PrintScheduler()
+        sched._dispatch_min_cooldown = 60.0
+        sched._mark_printer_dispatched(42, pre_state=None, pre_subtask_id=None)
+        # Status doesn't matter — there's no pre_state to compare.
+        get_status = MagicMock(return_value=_status("RUNNING", "anything"))
+        with patch("backend.app.services.print_scheduler.printer_manager.get_status", get_status):
+            assert sched._printer_in_dispatch_hold(42) is True
+
+    def test_no_pre_state_releases_after_cooldown(self):
+        sched = PrintScheduler()
+        sched._dispatch_min_cooldown = 0.001
+        sched._mark_printer_dispatched(42, pre_state=None, pre_subtask_id=None)
+        import time
+
+        time.sleep(0.005)
+        assert sched._printer_in_dispatch_hold(42) is False
+
+    def test_status_unavailable_keeps_hold(self):
+        """If the printer disconnects after dispatch we can't read state —
+        keep the hold until the hard timeout. Don't release on missing data,
+        because that would let a second dispatch land on a printer we have
+        no visibility into."""
+        sched = PrintScheduler()
+        get_status = MagicMock(return_value=None)
+        with patch("backend.app.services.print_scheduler.printer_manager.get_status", get_status):
+            sched._mark_printer_dispatched(42, pre_state="FINISH", pre_subtask_id="subtask-1")
+            assert sched._printer_in_dispatch_hold(42) is True
+
+
+class TestPerPrinterIsolation:
+    """Holds on one printer must not affect another."""
+
+    def test_hold_does_not_leak_across_printers(self):
+        sched = PrintScheduler()
+        get_status = MagicMock(return_value=_status("FINISH", "subtask-1"))
+        with patch("backend.app.services.print_scheduler.printer_manager.get_status", get_status):
+            sched._mark_printer_dispatched(42, pre_state="FINISH", pre_subtask_id="subtask-1")
+            # Printer 99 was never dispatched-to — must not be held.
+            assert sched._printer_in_dispatch_hold(99) is False
+            # Printer 42 still held.
+            assert sched._printer_in_dispatch_hold(42) is True
+
+
+class TestWatchdogIntegration:
+    """The watchdog drops the dispatch hold on its happy paths so the next
+    pending item can dispatch immediately. Without this, a successful print
+    leaves the hold in place until the hard timeout — blocking valid follow-
+    up dispatches."""
+
+    def test_release_dispatch_hold_callable_from_module_level_scheduler(self):
+        """The static watchdog calls ``scheduler._release_dispatch_hold(...)``
+        on transition observed. Smoke-test that the public API is reachable
+        and idempotent on the module-level instance the watchdog uses.
+        """
+        from backend.app.services.print_scheduler import scheduler
+
+        scheduler._release_dispatch_hold(99999)  # not held — must not raise
+        scheduler._mark_printer_dispatched(99999, "FINISH", "subtask-1")
+        assert 99999 in scheduler._dispatch_holds
+        scheduler._release_dispatch_hold(99999)
+        assert 99999 not in scheduler._dispatch_holds

Some files were not shown because too many files changed in this diff