print_scheduler.py 115 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406
  1. """Print scheduler service - processes the print queue."""
  2. import asyncio
  3. import json
  4. import logging
  5. import time
  6. from datetime import datetime, timezone
  7. from pathlib import Path
  8. from sqlalchemy import func, select
  9. from sqlalchemy.ext.asyncio import AsyncSession
  10. from sqlalchemy.orm import selectinload
  11. from backend.app.core.config import settings
  12. from backend.app.core.database import async_session, run_with_retry
  13. from backend.app.models.archive import PrintArchive
  14. from backend.app.models.library import LibraryFile
  15. from backend.app.models.print_queue import PrintQueueItem
  16. from backend.app.models.printer import Printer
  17. from backend.app.models.settings import Settings
  18. from backend.app.models.smart_plug import SmartPlug
  19. from backend.app.models.spool_assignment import SpoolAssignment
  20. from backend.app.models.spoolman_slot_assignment import SpoolmanSlotAssignment
  21. from backend.app.services.bambu_ftp import (
  22. cache_3mf_download,
  23. delete_file_async,
  24. get_ftp_retry_settings,
  25. upload_file_async,
  26. with_ftp_retry,
  27. )
  28. from backend.app.services.filament_deficit import compute_deficit_for_queue_item
  29. from backend.app.services.notification_service import notification_service
  30. from backend.app.services.printer_manager import printer_manager, supports_drying
  31. from backend.app.services.smart_plug_manager import smart_plug_manager
  32. from backend.app.utils.printer_models import normalize_printer_model
  33. logger = logging.getLogger(__name__)
  34. # Bambu firmware states that mean the project_file has actually been accepted
  35. # and the printer is now processing / running / paused mid-print. Used by the
  36. # dispatch watchdog (#1370): a transition into one of these states means the
  37. # print landed, anything else (e.g. FINISH -> IDLE after the user dismisses
  38. # a post-print prompt) is NOT a valid "command landed" signal even though the
  39. # state value did change. SLICING is included because some firmwares park
  40. # briefly in SLICING between PREPARE and RUNNING while parsing the g-code.
  41. _ACTIVE_PRINT_STATES: frozenset[str] = frozenset({"PREPARE", "SLICING", "RUNNING", "PAUSE"})
  42. # Filament type equivalence groups — types within the same group are
  43. # interchangeable on the printer side (Bambu Lab firmware treats them as compatible).
  44. _FILAMENT_TYPE_GROUPS: list[list[str]] = [
  45. ["PA-CF", "PA12-CF", "PAHT-CF"],
  46. ]
  47. _FILAMENT_EQUIV_MAP: dict[str, str] = {}
  48. for _group in _FILAMENT_TYPE_GROUPS:
  49. _canonical = _group[0].upper()
  50. for _t in _group:
  51. _FILAMENT_EQUIV_MAP[_t.upper()] = _canonical
  52. def _canonical_filament_type(ftype: str) -> str:
  53. """Return canonical type for equivalence matching."""
  54. upper = ftype.upper()
  55. return _FILAMENT_EQUIV_MAP.get(upper, upper)
  56. class PrintScheduler:
  57. """Background scheduler that processes the print queue."""
  58. # Built-in drying presets per filament type (from BambuStudio filament profiles)
  59. # Format: { n3f_temp, n3s_temp, n3f_hours, n3s_hours }
  60. DEFAULT_DRYING_PRESETS: dict[str, dict[str, int]] = {
  61. "PLA": {"n3f": 45, "n3s": 45, "n3f_hours": 12, "n3s_hours": 12},
  62. "PETG": {"n3f": 65, "n3s": 65, "n3f_hours": 12, "n3s_hours": 12},
  63. "TPU": {"n3f": 65, "n3s": 75, "n3f_hours": 12, "n3s_hours": 18},
  64. "ABS": {"n3f": 65, "n3s": 80, "n3f_hours": 12, "n3s_hours": 8},
  65. "ASA": {"n3f": 65, "n3s": 80, "n3f_hours": 12, "n3s_hours": 8},
  66. "PA": {"n3f": 65, "n3s": 85, "n3f_hours": 12, "n3s_hours": 12},
  67. "PC": {"n3f": 65, "n3s": 80, "n3f_hours": 12, "n3s_hours": 8},
  68. "PVA": {"n3f": 65, "n3s": 85, "n3f_hours": 12, "n3s_hours": 18},
  69. }
  70. def __init__(self):
  71. self._running = False
  72. self._check_interval = 30 # seconds
  73. self._power_on_wait_time = 180 # seconds to wait for printer after power on (3 min)
  74. self._power_on_check_interval = 10 # seconds between connection checks
  75. self._min_drying_seconds = 1800 # 30 minutes minimum before humidity re-check can stop drying
  76. # Track which printers are currently auto-drying (printer_id -> start timestamp)
  77. self._drying_in_progress: dict[int, float] = {}
  78. # Defensive in-memory dispatch hold (#1157): a printer that just received
  79. # a project_file command must not get a second dispatch until either it
  80. # transitions out of pre_state OR the hard timeout expires. The H2D Pro
  81. # can take 80–210 s to flip FINISH→PREPARE after project_file, and
  82. # during that window the DB busy_printers seed is empirically unreliable
  83. # (multi-plate batches double-/triple-dispatched onto the same printer
  84. # 30 s apart). Keyed by printer_id; cleared by the watchdog on success
  85. # or revert.
  86. # printer_id -> (monotonic_started_at, pre_state, pre_subtask_id)
  87. self._dispatch_holds: dict[int, tuple[float, str, str | None]] = {}
  88. # Minimum cooldown between dispatches to the same printer (covers the
  89. # H2D's project_file digestion window).
  90. self._dispatch_min_cooldown = 60.0
  91. # Hard timeout — drop the hold even if we never observed a transition,
  92. # so a lost MQTT session can't lock a printer out of the queue forever.
  93. # Matches the watchdog timeout (90 s) plus a safety margin so the
  94. # watchdog runs first on the unhappy path.
  95. self._dispatch_max_hold = 180.0
  96. async def run(self):
  97. """Main loop - check queue every interval."""
  98. self._running = True
  99. logger.info("Print scheduler started")
  100. while self._running:
  101. try:
  102. await self.check_queue()
  103. except Exception as e:
  104. logger.error("Scheduler error: %s", e)
  105. await asyncio.sleep(self._check_interval)
  106. def stop(self):
  107. """Stop the scheduler."""
  108. self._running = False
  109. logger.info("Print scheduler stopped")
  110. async def check_queue(self):
  111. """Check for prints ready to start."""
  112. async with async_session() as db:
  113. # Check if shortest-job-first scheduling is enabled
  114. sjf_enabled = await self._get_bool_setting(db, "queue_shortest_first")
  115. # Get all pending items, ordered by printer and position (or SJF order)
  116. if sjf_enabled:
  117. # SJF: group by printer (and target_model for model-based jobs),
  118. # then items already jumped get top priority (starvation guard),
  119. # then sort by print_time ascending. Items with no print time go last.
  120. result = await db.execute(
  121. select(PrintQueueItem)
  122. .where(PrintQueueItem.status == "pending")
  123. .order_by(
  124. PrintQueueItem.printer_id,
  125. PrintQueueItem.target_model,
  126. PrintQueueItem.been_jumped.desc(),
  127. PrintQueueItem.print_time_seconds.asc().nullslast(),
  128. PrintQueueItem.position,
  129. )
  130. )
  131. else:
  132. result = await db.execute(
  133. select(PrintQueueItem)
  134. .where(PrintQueueItem.status == "pending")
  135. .order_by(PrintQueueItem.printer_id, PrintQueueItem.position)
  136. )
  137. items = list(result.scalars().all())
  138. # Read plate-clear setting once per queue check
  139. require_plate_clear = await self._get_bool_setting(db, "require_plate_clear", default=True)
  140. if not items:
  141. # No pending items — still check auto-drying on idle printers
  142. await self._check_auto_drying(db, [], set(), require_plate_clear=require_plate_clear)
  143. return
  144. logger.info(
  145. "Queue check: found %d pending items: %s",
  146. len(items),
  147. [(i.id, i.printer_id, i.archive_id, i.library_file_id) for i in items],
  148. )
  149. # Seed busy_printers with printers that already have an item in 'printing'
  150. # status. _is_printer_idle() alone is not sufficient as a dispatch gate —
  151. # on H2D / P1 series the MQTT state transition from IDLE to RUNNING can
  152. # lag several seconds behind the print command, so the next check_queue
  153. # tick still sees IDLE and would double-dispatch onto the same printer.
  154. # Without this guard, two pending items targeting the same printer
  155. # (e.g. a batch with quantity>1) both end up in 'printing' status —
  156. # surfaced via the "BUG: Multiple queue items" warning in on_print_complete.
  157. busy_result = await db.execute(
  158. select(PrintQueueItem.printer_id)
  159. .where(PrintQueueItem.status == "printing")
  160. .where(PrintQueueItem.printer_id.is_not(None))
  161. )
  162. busy_printers: set[int] = {pid for (pid,) in busy_result.all() if pid is not None}
  163. # Defense-in-depth (#1157): augment busy_printers with any printer
  164. # still in its post-dispatch hold window. Empirically, the DB seed
  165. # above can miss in-flight items in a multi-plate batch — same-file
  166. # plates were being dispatched 30 s apart while the H2D was still
  167. # digesting the first project_file. The hold is keyed in-memory and
  168. # released by the watchdog on the success path, so it adds a layer
  169. # that doesn't depend on DB row visibility or completion-callback
  170. # timing.
  171. for held_printer_id in list(self._dispatch_holds.keys()):
  172. if self._printer_in_dispatch_hold(held_printer_id):
  173. busy_printers.add(held_printer_id)
  174. # Log skip reasons once per queue check (not per item)
  175. skip_reasons: dict[str, int] = {}
  176. for item in items:
  177. # Check scheduled time first (scheduled_time is stored in UTC from ISO string)
  178. if item.scheduled_time:
  179. sched = item.scheduled_time
  180. if sched.tzinfo is None:
  181. sched = sched.replace(tzinfo=timezone.utc)
  182. if sched > datetime.now(timezone.utc):
  183. skip_reasons["scheduled_future"] = skip_reasons.get("scheduled_future", 0) + 1
  184. continue
  185. # Skip items that require manual start
  186. if item.manual_start:
  187. skip_reasons["manual_start"] = skip_reasons.get("manual_start", 0) + 1
  188. continue
  189. if item.printer_id:
  190. # Specific printer assignment (existing behavior)
  191. if item.printer_id in busy_printers:
  192. continue
  193. # Check if printer is idle
  194. printer_idle = self._is_printer_idle(item.printer_id, require_plate_clear)
  195. printer_connected = printer_manager.is_connected(item.printer_id)
  196. # If printer not connected, try to power on via smart plug
  197. if not printer_connected:
  198. plugs = await self._get_smart_plugs(db, item.printer_id)
  199. auto_on_plugs = [p for p in plugs if p.auto_on and p.enabled]
  200. if auto_on_plugs:
  201. logger.info("Printer %s offline, attempting to power on via smart plug(s)", item.printer_id)
  202. # Power on using the first auto_on plug (the printer power plug)
  203. powered_on = await self._power_on_and_wait(auto_on_plugs[0], item.printer_id, db)
  204. if powered_on:
  205. # Also turn on any remaining auto_on plugs (e.g., filter)
  206. for extra_plug in auto_on_plugs[1:]:
  207. try:
  208. service = await smart_plug_manager.get_service_for_plug(extra_plug, db)
  209. await service.turn_on(extra_plug)
  210. logger.info(
  211. "Also powered on plug '%s' for printer %s", extra_plug.name, item.printer_id
  212. )
  213. except Exception as e:
  214. logger.warning("Failed to power on extra plug '%s': %s", extra_plug.name, e)
  215. printer_connected = True
  216. printer_idle = self._is_printer_idle(item.printer_id, require_plate_clear)
  217. else:
  218. logger.warning("Could not power on printer %s via smart plug", item.printer_id)
  219. busy_printers.add(item.printer_id)
  220. continue
  221. else:
  222. # No plug or auto_on disabled
  223. busy_printers.add(item.printer_id)
  224. continue
  225. # Check if printer is idle (busy with another print)
  226. if not printer_idle:
  227. # If printer is drying (not truly busy), handle based on queue_drying_block
  228. if self._drying_in_progress.get(item.printer_id):
  229. block_for_drying = await self._get_bool_setting(db, "queue_drying_block")
  230. if block_for_drying:
  231. # Drying blocks queue — skip this printer
  232. busy_printers.add(item.printer_id)
  233. continue
  234. else:
  235. # Print takes priority — stop drying
  236. await self._stop_drying(item.printer_id)
  237. # Re-check idle after stopping drying
  238. printer_idle = self._is_printer_idle(item.printer_id, require_plate_clear)
  239. if not printer_idle:
  240. busy_printers.add(item.printer_id)
  241. continue
  242. else:
  243. busy_printers.add(item.printer_id)
  244. continue
  245. # Check condition (previous print success)
  246. if item.require_previous_success:
  247. if not await self._check_previous_success(db, item):
  248. item.status = "skipped"
  249. item.error_message = "Previous print failed or was aborted"
  250. item.completed_at = datetime.now(timezone.utc)
  251. await db.commit()
  252. logger.info("Skipped queue item %s - previous print failed", item.id)
  253. # Send notification
  254. job_name = await self._get_job_name(db, item)
  255. printer = await self._get_printer(db, item.printer_id)
  256. await notification_service.on_queue_job_skipped(
  257. job_name=job_name,
  258. printer_id=item.printer_id,
  259. printer_name=printer.name if printer else "Unknown",
  260. reason="Previous print failed or was aborted",
  261. db=db,
  262. )
  263. continue
  264. # Compute AMS mapping if not already set
  265. if not item.ams_mapping:
  266. computed_mapping = await self._compute_ams_mapping_for_printer(db, item.printer_id, item)
  267. if computed_mapping:
  268. item.ams_mapping = json.dumps(computed_mapping)
  269. logger.info(
  270. f"Queue item {item.id}: Computed AMS mapping for printer {item.printer_id}: {computed_mapping}"
  271. )
  272. await db.commit()
  273. # Filament-deficit pre-dispatch check (#1496). If the
  274. # assigned spool can't satisfy any required slot grams,
  275. # promote the item to manual_start so the user must
  276. # acknowledge via the ▶ button (which re-checks live).
  277. if await self._block_on_filament_deficit(db, item):
  278. continue
  279. # Start the print
  280. await self._start_print(db, item)
  281. busy_printers.add(item.printer_id)
  282. # SJF starvation guard: mark items that were jumped
  283. if sjf_enabled and item.print_time_seconds is not None:
  284. for other in items:
  285. if (
  286. other.id != item.id
  287. and other.status == "pending"
  288. and other.printer_id == item.printer_id
  289. and not other.been_jumped
  290. and other.position < item.position
  291. and (
  292. other.print_time_seconds is None
  293. or other.print_time_seconds > item.print_time_seconds
  294. )
  295. ):
  296. other.been_jumped = True
  297. await db.commit()
  298. elif item.target_model:
  299. # Model-based assignment - find any idle printer of matching model
  300. # Parse required filament types if present
  301. required_types = None
  302. if item.required_filament_types:
  303. try:
  304. required_types = json.loads(item.required_filament_types)
  305. except json.JSONDecodeError:
  306. pass # Ignore malformed filament types; treat as no constraint
  307. # Parse filament overrides if present
  308. filament_overrides = None
  309. if item.filament_overrides:
  310. try:
  311. filament_overrides = json.loads(item.filament_overrides)
  312. except json.JSONDecodeError:
  313. pass
  314. # If overrides exist, use override types for validation instead
  315. effective_types = required_types
  316. if filament_overrides:
  317. override_types = sorted({o["type"] for o in filament_overrides if "type" in o})
  318. if override_types:
  319. # Merge: keep original types for non-overridden slots, add override types
  320. effective_types = sorted(set(required_types or []) | set(override_types))
  321. printer_id, waiting_reason = await self._find_idle_printer_for_model(
  322. db,
  323. item.target_model,
  324. busy_printers,
  325. effective_types,
  326. item.target_location,
  327. filament_overrides=filament_overrides,
  328. require_plate_clear=require_plate_clear,
  329. )
  330. # Update waiting_reason if changed and send notification when first waiting
  331. if item.waiting_reason != waiting_reason:
  332. was_waiting = item.waiting_reason is not None
  333. item.waiting_reason = waiting_reason
  334. await db.commit()
  335. # Send waiting notification only when transitioning to waiting state
  336. # and the reason requires user action (not just "all printers busy")
  337. if waiting_reason and not was_waiting and not self._is_busy_only(waiting_reason):
  338. job_name = await self._get_job_name(db, item)
  339. await notification_service.on_queue_job_waiting(
  340. job_name=job_name,
  341. target_model=item.target_model,
  342. waiting_reason=waiting_reason,
  343. db=db,
  344. )
  345. if printer_id:
  346. # Check condition (previous print success) before assigning
  347. if item.require_previous_success:
  348. if not await self._check_previous_success(db, item):
  349. item.status = "skipped"
  350. item.error_message = "Previous print failed or was aborted"
  351. item.completed_at = datetime.now(timezone.utc)
  352. await db.commit()
  353. logger.info("Skipped queue item %s - previous print failed", item.id)
  354. # Send notification
  355. job_name = await self._get_job_name(db, item)
  356. printer = await self._get_printer(db, printer_id)
  357. await notification_service.on_queue_job_skipped(
  358. job_name=job_name,
  359. printer_id=printer_id,
  360. printer_name=printer.name if printer else "Unknown",
  361. reason="Previous print failed or was aborted",
  362. db=db,
  363. )
  364. continue
  365. # Assign printer and start - clear waiting reason
  366. item.printer_id = printer_id
  367. item.waiting_reason = None
  368. logger.info("Model-based assignment: queue item %s assigned to printer %s", item.id, printer_id)
  369. # Send assignment notification
  370. job_name = await self._get_job_name(db, item)
  371. printer = await self._get_printer(db, printer_id)
  372. await notification_service.on_queue_job_assigned(
  373. job_name=job_name,
  374. printer_id=printer_id,
  375. printer_name=printer.name if printer else "Unknown",
  376. target_model=item.target_model,
  377. db=db,
  378. )
  379. # Compute AMS mapping for the assigned printer if not already set
  380. # This is critical for model-based jobs where mapping wasn't computed upfront
  381. if not item.ams_mapping:
  382. computed_mapping = await self._compute_ams_mapping_for_printer(db, printer_id, item)
  383. if computed_mapping:
  384. item.ams_mapping = json.dumps(computed_mapping)
  385. logger.info(
  386. f"Queue item {item.id}: Computed AMS mapping for printer {printer_id}: {computed_mapping}"
  387. )
  388. await db.commit()
  389. # Filament-deficit pre-dispatch check (#1496).
  390. if await self._block_on_filament_deficit(db, item):
  391. continue
  392. await self._start_print(db, item)
  393. busy_printers.add(printer_id)
  394. # SJF starvation guard: mark model-based items that were jumped
  395. if sjf_enabled and item.print_time_seconds is not None:
  396. for other in items:
  397. if (
  398. other.id != item.id
  399. and other.status == "pending"
  400. and other.printer_id is None
  401. and other.target_model
  402. and other.target_model.upper() == item.target_model.upper()
  403. and not other.been_jumped
  404. and other.position < item.position
  405. and (
  406. other.print_time_seconds is None
  407. or other.print_time_seconds > item.print_time_seconds
  408. )
  409. ):
  410. other.been_jumped = True
  411. await db.commit()
  412. # Log summary of skip reasons (helps diagnose why queue items aren't starting)
  413. if skip_reasons:
  414. logger.info("Queue skip summary: %s", skip_reasons)
  415. if busy_printers:
  416. # Log why each printer was busy (first time it was checked)
  417. for pid in busy_printers:
  418. state = printer_manager.get_status(pid)
  419. connected = printer_manager.is_connected(pid)
  420. awaiting = printer_manager.is_awaiting_plate_clear(pid)
  421. state_name = state.state if state else "NO_STATUS"
  422. logger.info(
  423. "Queue: printer %d not available — connected=%s, state=%s, awaiting_plate_clear=%s",
  424. pid,
  425. connected,
  426. state_name,
  427. awaiting,
  428. )
  429. # Auto-drying: start drying on idle printers that have no pending queue items
  430. await self._check_auto_drying(db, items, busy_printers, require_plate_clear=require_plate_clear)
  431. async def _find_idle_printer_for_model(
  432. self,
  433. db: AsyncSession,
  434. model: str,
  435. exclude_ids: set[int],
  436. required_filament_types: list[str] | None = None,
  437. target_location: str | None = None,
  438. filament_overrides: list[dict] | None = None,
  439. require_plate_clear: bool = True,
  440. ) -> tuple[int | None, str | None]:
  441. """Find an idle, connected printer matching the model with compatible filaments.
  442. Args:
  443. db: Database session
  444. model: Printer model to match (e.g., "X1C", "P1S")
  445. exclude_ids: Printer IDs to exclude (already busy)
  446. required_filament_types: Optional list of filament types needed (e.g., ["PLA", "PETG"])
  447. If provided, only printers with all required types loaded will match.
  448. target_location: Optional location filter. If provided, only printers in this location are considered.
  449. filament_overrides: Optional list of override dicts. Each entry may include
  450. ``force_color_match: true`` to require an exact type+color match
  451. on the printer for that slot. Without the flag the existing
  452. colour-preference logic applies.
  453. Returns:
  454. Tuple of (printer_id, waiting_reason):
  455. - (printer_id, None) if a matching printer was found
  456. - (None, reason) if no printer is available, with explanation
  457. """
  458. # Normalize model name and use case-insensitive matching
  459. normalized_model = normalize_printer_model(model) or model
  460. query = (
  461. select(Printer)
  462. .where(func.lower(Printer.model) == normalized_model.lower())
  463. .where(Printer.is_active == True) # noqa: E712
  464. )
  465. # Add location filter if specified
  466. if target_location:
  467. query = query.where(Printer.location == target_location)
  468. result = await db.execute(query)
  469. printers = list(result.scalars().all())
  470. location_suffix = f" in {target_location}" if target_location else ""
  471. if not printers:
  472. return None, f"No active {normalized_model} printers{location_suffix} configured"
  473. # Separate force-matched overrides from preference-only overrides
  474. force_overrides = [o for o in (filament_overrides or []) if o.get("force_color_match")]
  475. pref_overrides = [o for o in (filament_overrides or []) if not o.get("force_color_match")]
  476. # Track reasons for skipping printers
  477. printers_busy = []
  478. printers_offline = []
  479. printers_missing_filament: list[tuple[str, list[str]]] = []
  480. candidates: list[tuple[int, int]] = [] # (printer_id, color_match_count)
  481. for printer in printers:
  482. if printer.id in exclude_ids:
  483. # Printer is already claimed by another job in this scheduling run.
  484. # For force-color jobs, still check if the color would match — if not,
  485. # report it as a color mismatch rather than plain "Busy" so the user
  486. # knows the job needs a filament change, not just to wait for availability.
  487. if force_overrides and not pref_overrides:
  488. missing_colors = self._get_missing_force_color_slots(printer.id, force_overrides)
  489. if missing_colors:
  490. printers_missing_filament.append((printer.name, missing_colors))
  491. continue
  492. printers_busy.append(printer.name)
  493. continue
  494. is_connected = printer_manager.is_connected(printer.id)
  495. is_idle = self._is_printer_idle(printer.id, require_plate_clear) if is_connected else False
  496. if not is_connected:
  497. printers_offline.append(printer.name)
  498. continue
  499. if not is_idle:
  500. # Printer is currently printing. For force-color jobs, check whether the
  501. # loaded color would satisfy the requirement — if not, surface it as a
  502. # color-mismatch reason rather than plain "Busy" so the user understands
  503. # that the job is waiting for a filament change, not just printer availability.
  504. if force_overrides and not pref_overrides:
  505. missing_colors = self._get_missing_force_color_slots(printer.id, force_overrides)
  506. if missing_colors:
  507. printers_missing_filament.append((printer.name, missing_colors))
  508. logger.debug(
  509. "Printer %s (%s) is busy but also has wrong force-color: %s",
  510. printer.id,
  511. printer.name,
  512. missing_colors,
  513. )
  514. continue
  515. printers_busy.append(printer.name)
  516. continue
  517. # Validate filament compatibility if required types are specified
  518. if required_filament_types:
  519. missing = self._get_missing_filament_types(printer.id, required_filament_types)
  520. if missing:
  521. # When force_overrides are present, enrich missing entries with color info
  522. # so the "Waiting on" message includes "TYPE (color)" instead of just "TYPE"
  523. if force_overrides:
  524. force_color_map = {
  525. (o.get("type") or "").upper(): o.get("color_name") or o.get("color", "?")
  526. for o in force_overrides
  527. }
  528. missing_enriched = [
  529. f"{t} ({force_color_map[t_upper]})" if (t_upper := t.upper()) in force_color_map else t
  530. for t in missing
  531. ]
  532. printers_missing_filament.append((printer.name, missing_enriched))
  533. else:
  534. printers_missing_filament.append((printer.name, missing))
  535. logger.debug("Skipping printer %s (%s) - missing filaments: %s", printer.id, printer.name, missing)
  536. continue
  537. # Force color match: ALL flagged slots must have an exact type+color match
  538. if force_overrides:
  539. missing_colors = self._get_missing_force_color_slots(printer.id, force_overrides)
  540. if missing_colors:
  541. printers_missing_filament.append((printer.name, missing_colors))
  542. logger.debug(
  543. "Skipping printer %s (%s) - missing force-matched colors: %s",
  544. printer.id,
  545. printer.name,
  546. missing_colors,
  547. )
  548. continue
  549. # If preference-only overrides exist, rank by color matches (existing behaviour)
  550. if pref_overrides:
  551. color_matches = self._count_override_color_matches(printer.id, pref_overrides)
  552. if color_matches > 0:
  553. candidates.append((printer.id, color_matches))
  554. else:
  555. override_colors = [f"{o.get('type', '?')} ({o.get('color', '?')})" for o in pref_overrides]
  556. printers_missing_filament.append((printer.name, override_colors))
  557. logger.debug("Skipping printer %s (%s) - no matching override colors", printer.id, printer.name)
  558. continue
  559. elif force_overrides:
  560. # Passed all force checks — immediately eligible (no preference ordering needed)
  561. return printer.id, None
  562. else:
  563. # No overrides at all - take first available (existing behavior)
  564. return printer.id, None
  565. # If we have candidates from preference override matching, pick the one with most color matches
  566. if candidates:
  567. candidates.sort(key=lambda c: c[1], reverse=True)
  568. return candidates[0][0], None
  569. # Build waiting reason from what we found
  570. reasons = []
  571. if printers_missing_filament:
  572. # Filament/color mismatch is most actionable - show first
  573. if force_overrides and not pref_overrides:
  574. # All mismatches are force-color failures — use descriptive message only;
  575. # but only if there are no busy printers that DO have the matching color.
  576. # If a printer has the right color but is busy, surface "Busy" instead so
  577. # the user knows the job will start automatically once that printer is free.
  578. if not printers_busy:
  579. all_missing = sorted({c for _, cols in printers_missing_filament for c in cols})
  580. return None, f"No matching material/color. Waiting on {', '.join(all_missing)}"
  581. # else: fall through — printers_busy will be appended below
  582. else:
  583. names_and_missing = [
  584. f"{name} (needs {', '.join(missing)})" for name, missing in printers_missing_filament
  585. ]
  586. reasons.append(f"Waiting for filament: {'; '.join(names_and_missing)}")
  587. if printers_busy:
  588. reasons.append(f"Busy: {', '.join(printers_busy)}")
  589. if printers_offline:
  590. reasons.append(f"Offline: {', '.join(printers_offline)}")
  591. return None, " | ".join(reasons) if reasons else f"No available {model} printers{location_suffix}"
  592. @staticmethod
  593. def _is_busy_only(waiting_reason: str) -> bool:
  594. """Check if the waiting reason only contains 'Busy' entries.
  595. When all matching printers are simply busy printing, the queued job
  596. will start automatically once a printer finishes — no user action
  597. is required, so we skip the notification.
  598. """
  599. parts = [p.strip() for p in waiting_reason.split(" | ")]
  600. return all(p.startswith("Busy:") for p in parts)
  601. def _get_missing_force_color_slots(self, printer_id: int, force_overrides: list[dict]) -> list[str]:
  602. """Return descriptive strings for force_color_match slots not satisfied by the printer.
  603. Each entry in ``force_overrides`` must have ``type`` and ``color`` fields and is expected
  604. to carry ``force_color_match: True``. The printer must have **every** such slot loaded
  605. with an exact type+color match.
  606. Returns:
  607. List of ``"TYPE (color)"`` strings for unmatched slots (empty list means all match).
  608. """
  609. status = printer_manager.get_status(printer_id)
  610. if not status:
  611. return [f"{o.get('type', '?')} ({o.get('color_name') or o.get('color', '?')})" for o in force_overrides]
  612. # Build set of loaded type+colour pairs from AMS and external spool
  613. loaded: set[tuple[str, str]] = set()
  614. for ams_unit in status.raw_data.get("ams", []):
  615. for tray in ams_unit.get("tray", []):
  616. tray_type = tray.get("tray_type")
  617. tray_color = tray.get("tray_color", "")
  618. if tray_type:
  619. color_norm = tray_color.replace("#", "").lower()[:6]
  620. loaded.add((_canonical_filament_type(tray_type), color_norm))
  621. for vt in status.raw_data.get("vt_tray") or []:
  622. vt_type = vt.get("tray_type")
  623. if vt_type:
  624. color_norm = (vt.get("tray_color", "") or "").replace("#", "").lower()[:6]
  625. loaded.add((_canonical_filament_type(vt_type), color_norm))
  626. missing = []
  627. for o in force_overrides:
  628. o_type = _canonical_filament_type(o.get("type") or "")
  629. o_color = (o.get("color") or "").replace("#", "").lower()[:6]
  630. if (o_type, o_color) not in loaded:
  631. color_label = o.get("color_name") or o.get("color", "?")
  632. missing.append(f"{o_type} ({color_label})")
  633. return missing
  634. def _get_missing_filament_types(self, printer_id: int, required_types: list[str]) -> list[str]:
  635. """Get the list of required filament types that are not loaded on the printer.
  636. Args:
  637. printer_id: The printer ID
  638. required_types: List of filament types needed (e.g., ["PLA", "PETG"])
  639. Returns:
  640. List of missing filament types (empty if all are loaded)
  641. """
  642. status = printer_manager.get_status(printer_id)
  643. if not status:
  644. return required_types # Can't determine, assume all missing
  645. # Collect all filament types loaded on this printer (AMS units + external spool)
  646. # Use canonical types so equivalence groups (e.g. PA-CF/PA12-CF/PAHT-CF) match.
  647. loaded_types: set[str] = set()
  648. # Check AMS units (stored in raw_data["ams"])
  649. ams_data = status.raw_data.get("ams", [])
  650. if ams_data:
  651. for ams_unit in ams_data:
  652. for tray in ams_unit.get("tray", []):
  653. tray_type = tray.get("tray_type")
  654. if tray_type:
  655. loaded_types.add(_canonical_filament_type(tray_type))
  656. # Check external spool(s) (virtual tray, stored in raw_data["vt_tray"] as list)
  657. for vt in status.raw_data.get("vt_tray") or []:
  658. vt_type = vt.get("tray_type")
  659. if vt_type:
  660. loaded_types.add(_canonical_filament_type(vt_type))
  661. # Find which required types are missing (using canonical type for equivalence)
  662. missing = []
  663. for req_type in required_types:
  664. if _canonical_filament_type(req_type) not in loaded_types:
  665. missing.append(req_type)
  666. return missing
  667. def _count_override_color_matches(self, printer_id: int, overrides: list[dict]) -> int:
  668. """Count how many filament overrides have an exact color match on the printer.
  669. Used to prefer printers that already have the desired override colors loaded.
  670. """
  671. status = printer_manager.get_status(printer_id)
  672. if not status:
  673. return 0
  674. # Collect loaded filaments' type+color pairs
  675. loaded: set[tuple[str, str]] = set()
  676. for ams_unit in status.raw_data.get("ams", []):
  677. for tray in ams_unit.get("tray", []):
  678. tray_type = tray.get("tray_type")
  679. tray_color = tray.get("tray_color", "")
  680. if tray_type:
  681. color_norm = tray_color.replace("#", "").lower()[:6]
  682. loaded.add((tray_type.upper(), color_norm))
  683. for vt in status.raw_data.get("vt_tray") or []:
  684. vt_type = vt.get("tray_type")
  685. if vt_type:
  686. color_norm = (vt.get("tray_color", "") or "").replace("#", "").lower()[:6]
  687. loaded.add((vt_type.upper(), color_norm))
  688. matches = 0
  689. for o in overrides:
  690. o_type = (o.get("type") or "").upper()
  691. o_color = (o.get("color") or "").replace("#", "").lower()[:6]
  692. if (o_type, o_color) in loaded:
  693. matches += 1
  694. return matches
  695. async def _compute_ams_mapping_for_printer(
  696. self, db: AsyncSession, printer_id: int, item: PrintQueueItem
  697. ) -> list[int] | None:
  698. """Compute AMS mapping for a printer based on filament requirements.
  699. Called when a queue item has no ams_mapping set — either for model-based
  700. items after printer assignment, or printer-specific items (e.g. from VP).
  701. Args:
  702. db: Database session
  703. printer_id: The assigned printer ID
  704. item: The queue item (contains archive_id or library_file_id)
  705. Returns:
  706. AMS mapping array or None if no mapping needed/possible
  707. """
  708. # Get printer status
  709. status = printer_manager.get_status(printer_id)
  710. if not status:
  711. logger.warning("Cannot compute AMS mapping: printer %s status unavailable", printer_id)
  712. return None
  713. # Get filament requirements from source file
  714. filament_reqs = await self._get_filament_requirements(db, item)
  715. if not filament_reqs:
  716. # When the 3MF can't be read but force-color overrides are present, build a
  717. # direct mapping from the overrides so the printer uses the correct AMS slot.
  718. if item.filament_overrides:
  719. try:
  720. overrides = json.loads(item.filament_overrides)
  721. force_overrides = [o for o in overrides if o.get("force_color_match")]
  722. if force_overrides:
  723. logger.info(
  724. "Queue item %s: No filament reqs from 3MF; building AMS mapping from %d "
  725. "force-color override(s)",
  726. item.id,
  727. len(force_overrides),
  728. )
  729. return self._build_override_direct_mapping(force_overrides, status)
  730. except (json.JSONDecodeError, KeyError, TypeError) as e:
  731. logger.warning("Queue item %s: Force-color fallback mapping failed: %s", item.id, e)
  732. logger.debug("No filament requirements found for queue item %s", item.id)
  733. return None
  734. # Apply filament overrides if present
  735. if item.filament_overrides:
  736. try:
  737. overrides = json.loads(item.filament_overrides)
  738. override_map = {o["slot_id"]: o for o in overrides}
  739. for req in filament_reqs:
  740. if req["slot_id"] in override_map:
  741. override = override_map[req["slot_id"]]
  742. req["type"] = override["type"]
  743. req["color"] = override["color"]
  744. # Clear tray_info_idx so matching uses type+color instead of
  745. # the original 3MF's tray_info_idx (which would match the old filament)
  746. req["tray_info_idx"] = ""
  747. logger.debug(
  748. "Queue item %s: Override slot %d -> %s %s",
  749. item.id,
  750. req["slot_id"],
  751. override["type"],
  752. override["color"],
  753. )
  754. except (json.JSONDecodeError, KeyError, TypeError) as e:
  755. logger.warning("Failed to apply filament overrides for queue item %s: %s", item.id, e)
  756. # Build loaded filaments from printer status
  757. loaded_filaments = self._build_loaded_filaments(status)
  758. if not loaded_filaments:
  759. logger.debug("No filaments loaded on printer %s", printer_id)
  760. return None
  761. # Check if user prefers lowest remaining filament when multiple spools match
  762. prefer_lowest = await self._get_bool_setting(db, "prefer_lowest_filament")
  763. # When the preference is on, surface Bambuddy's inventory-side
  764. # remaining for each slot that's bound to a tracked spool, so the
  765. # sort beats the MQTT-only blind spot (#1508). Skip the lookup
  766. # entirely when the preference is off — no behaviour change for
  767. # users who haven't opted in.
  768. inventory_remain_overrides: dict[int, float] | None = None
  769. if prefer_lowest:
  770. inventory_remain_overrides = await self._build_inventory_remain_overrides(db, printer_id, loaded_filaments)
  771. # Compute mapping: match required filaments to available slots
  772. return self._match_filaments_to_slots(
  773. filament_reqs, loaded_filaments, prefer_lowest, inventory_remain_overrides
  774. )
  775. def _build_override_direct_mapping(self, force_overrides: list[dict], status) -> list[int] | None:
  776. """Build an AMS mapping directly from force-color overrides without a 3MF.
  777. Used when ``_get_filament_requirements`` returns nothing (e.g. the 3MF's
  778. slice_info is missing or unreadable) but ``force_color_match`` overrides
  779. are present. Each override's ``slot_id``, ``type``, and ``color`` are
  780. treated as the filament requirement for that slot and matched against the
  781. current AMS state of the printer.
  782. Returns the same format as ``_match_filaments_to_slots``, or None when
  783. the AMS has no loaded filaments.
  784. """
  785. loaded = self._build_loaded_filaments(status)
  786. if not loaded:
  787. return None
  788. reqs = [
  789. {
  790. "slot_id": o["slot_id"],
  791. "type": o.get("type", ""),
  792. "color": o.get("color", ""),
  793. "tray_info_idx": "",
  794. }
  795. for o in force_overrides
  796. ]
  797. return self._match_filaments_to_slots(reqs, loaded)
  798. async def _get_filament_requirements(self, db: AsyncSession, item: PrintQueueItem) -> list[dict] | None:
  799. """Resolve the queue item's source 3MF and parse the per-slot
  800. filament requirements out of it. Thin DB-resolver wrapper around
  801. ``filament_requirements.extract_filament_requirements`` so the VP
  802. queue-mode write path (#1188) can reuse the same parser at upload
  803. time.
  804. """
  805. from backend.app.services.filament_requirements import extract_filament_requirements
  806. file_path: Path | None = None
  807. if item.archive_id:
  808. result = await db.execute(select(PrintArchive).where(PrintArchive.id == item.archive_id))
  809. archive = result.scalar_one_or_none()
  810. if archive:
  811. file_path = settings.base_dir / archive.file_path
  812. elif item.library_file_id:
  813. result = await db.execute(LibraryFile.active().where(LibraryFile.id == item.library_file_id))
  814. library_file = result.scalar_one_or_none()
  815. if library_file:
  816. lib_path = Path(library_file.file_path)
  817. file_path = lib_path if lib_path.is_absolute() else settings.base_dir / library_file.file_path
  818. if not file_path or not file_path.exists():
  819. return None
  820. filaments = extract_filament_requirements(file_path, plate_id=item.plate_id)
  821. return filaments if filaments else None
  822. def _build_loaded_filaments(self, status) -> list[dict]:
  823. """Build list of loaded filaments from printer status.
  824. Args:
  825. status: PrinterState from printer_manager
  826. Returns:
  827. List of loaded filament dicts with type, color, ams_id, tray_id, global_tray_id
  828. """
  829. filaments = []
  830. # Get ams_extruder_map for dual-nozzle printers (H2D, H2D Pro)
  831. ams_extruder_map = status.raw_data.get("ams_extruder_map", {})
  832. # Parse AMS units from raw_data
  833. ams_data = status.raw_data.get("ams", [])
  834. for ams_unit in ams_data:
  835. ams_id = int(ams_unit.get("id", 0))
  836. trays = ams_unit.get("tray", [])
  837. is_ht = len(trays) == 1 # AMS-HT has single tray
  838. for tray in trays:
  839. tray_type = tray.get("tray_type")
  840. if tray_type:
  841. tray_id = int(tray.get("id", 0))
  842. tray_color = tray.get("tray_color", "")
  843. # tray_info_idx identifies the specific spool (e.g., "GFA00", "P4d64437")
  844. tray_info_idx = tray.get("tray_info_idx", "")
  845. # Normalize color: remove alpha, add hash
  846. color = self._normalize_color(tray_color)
  847. # Calculate global tray ID
  848. # AMS-HT units have IDs starting at 128 with a single tray
  849. global_tray_id = ams_id if ams_id >= 128 else ams_id * 4 + tray_id
  850. filaments.append(
  851. {
  852. "type": tray_type,
  853. "color": color,
  854. "tray_info_idx": tray_info_idx,
  855. "ams_id": ams_id,
  856. "tray_id": tray_id,
  857. "is_ht": is_ht,
  858. "is_external": False,
  859. "global_tray_id": global_tray_id,
  860. "extruder_id": ams_extruder_map.get(str(ams_id)),
  861. "remain": tray.get("remain", -1),
  862. }
  863. )
  864. # Check external spool(s) (vt_tray is a list)
  865. for idx, vt in enumerate(status.raw_data.get("vt_tray") or []):
  866. if vt.get("tray_type"):
  867. color = self._normalize_color(vt.get("tray_color", ""))
  868. tray_id = int(vt.get("id", 254))
  869. filaments.append(
  870. {
  871. "type": vt["tray_type"],
  872. "color": color,
  873. "tray_info_idx": vt.get("tray_info_idx", ""),
  874. "ams_id": -1,
  875. "tray_id": idx,
  876. "is_ht": False,
  877. "is_external": True,
  878. "global_tray_id": tray_id,
  879. "extruder_id": (255 - tray_id) if ams_extruder_map else None,
  880. "remain": vt.get("remain", -1),
  881. }
  882. )
  883. return filaments
  884. def _normalize_color(self, color: str | None) -> str:
  885. """Normalize color to #RRGGBB format."""
  886. if not color:
  887. return "#808080"
  888. hex_color = color.replace("#", "")[:6]
  889. return f"#{hex_color}"
  890. def _normalize_color_for_compare(self, color: str | None) -> str:
  891. """Normalize color for comparison (lowercase, no hash)."""
  892. if not color:
  893. return ""
  894. return color.replace("#", "").lower()[:6]
  895. def _colors_are_similar(self, color1: str | None, color2: str | None, threshold: int = 40) -> bool:
  896. """Check if two colors are visually similar within a threshold."""
  897. hex1 = self._normalize_color_for_compare(color1)
  898. hex2 = self._normalize_color_for_compare(color2)
  899. if not hex1 or not hex2 or len(hex1) < 6 or len(hex2) < 6:
  900. return False
  901. try:
  902. r1 = int(hex1[0:2], 16)
  903. g1 = int(hex1[2:4], 16)
  904. b1 = int(hex1[4:6], 16)
  905. r2 = int(hex2[0:2], 16)
  906. g2 = int(hex2[2:4], 16)
  907. b2 = int(hex2[4:6], 16)
  908. return abs(r1 - r2) <= threshold and abs(g1 - g2) <= threshold and abs(b1 - b2) <= threshold
  909. except ValueError:
  910. return False
  911. async def _build_inventory_remain_overrides(
  912. self, db: AsyncSession, printer_id: int, loaded: list[dict]
  913. ) -> dict[int, float]:
  914. """Return ``{global_tray_id: remaining_grams}`` for AMS slots the user
  915. has bound to an inventory spool — Bambuddy-side or Spoolman-side.
  916. The MQTT ``remain`` field on a tray is the printer firmware's
  917. RFID-decremented value, which has two limitations the "Prefer Lowest
  918. Remaining Filament" feature has been ignoring (#1508):
  919. - it's only meaningful for Bambu RFID spools; everything else reports
  920. ``-1`` (then clamped to a sentinel), so multiple non-RFID trays
  921. compare equal and the sort collapses to AMS-slot order — the user
  922. who's curating inventory weights gets the lower-slot pick instead
  923. of the lower-remaining pick;
  924. - even when set, it's the *printer's* counter, not Bambuddy's
  925. ``label_weight - weight_used`` (internal mode) or Spoolman's
  926. ``remaining_weight`` (Spoolman mode) — the two diverge any time the
  927. user re-spools, swaps cardboard, or runs a print outside Bambuddy.
  928. When the user has bound a spool to a slot, their own inventory
  929. tracking is authoritative; this helper surfaces that value so the
  930. sort can prefer it. Slots without a binding are absent from the
  931. returned map — the caller then falls back to MQTT ``remain`` for
  932. those, preserving the pre-#1508 behaviour for un-tracked spools.
  933. Returns an empty map on any failure (no inventory bindings, DB
  934. error, Spoolman unreachable). A best-effort lookup; "Prefer Lowest"
  935. is a preference, not a guarantee.
  936. """
  937. if not loaded:
  938. return {}
  939. # External / virtual-tray slots are tracked separately from AMS — skip
  940. # them so a VT-loaded spool doesn't accidentally inherit a tracked
  941. # AMS binding (the tables use ams_id 254/255 for VT, but the cross
  942. # match is fiddly and out of scope for this fix).
  943. tracked_slots = [(f["ams_id"], f["tray_id"], f["global_tray_id"]) for f in loaded if not f.get("is_external")]
  944. if not tracked_slots:
  945. return {}
  946. is_spoolman = await self._is_spoolman_mode(db)
  947. overrides: dict[int, float] = {}
  948. if is_spoolman:
  949. result = await db.execute(
  950. select(SpoolmanSlotAssignment).where(SpoolmanSlotAssignment.printer_id == printer_id)
  951. )
  952. assignments = list(result.scalars().all())
  953. by_slot = {(a.ams_id, a.tray_id): a.spoolman_spool_id for a in assignments}
  954. from backend.app.services.filament_deficit import _spoolman_remaining_grams
  955. for ams_id, tray_id, gtid in tracked_slots:
  956. spoolman_id = by_slot.get((ams_id, tray_id))
  957. if spoolman_id is None:
  958. continue
  959. grams = await _spoolman_remaining_grams(spoolman_id)
  960. if grams is not None:
  961. overrides[gtid] = grams
  962. return overrides
  963. # Internal inventory mode (default). selectinload matches the pattern
  964. # used elsewhere (inventory.py, spoolman.py routes) — a single query
  965. # plus an eager-loaded relationship rather than an explicit join, so
  966. # the row-attribute shape is exactly what those routes already rely on.
  967. result = await db.execute(
  968. select(SpoolAssignment)
  969. .options(selectinload(SpoolAssignment.spool))
  970. .where(SpoolAssignment.printer_id == printer_id)
  971. )
  972. assignments = list(result.scalars().all())
  973. by_slot = {(a.ams_id, a.tray_id): a.spool for a in assignments}
  974. for ams_id, tray_id, gtid in tracked_slots:
  975. spool = by_slot.get((ams_id, tray_id))
  976. if spool is None:
  977. continue
  978. label = float(spool.label_weight or 0)
  979. used = float(spool.weight_used or 0)
  980. overrides[gtid] = max(0.0, label - used)
  981. return overrides
  982. @staticmethod
  983. async def _is_spoolman_mode(db: AsyncSession) -> bool:
  984. """Mirror of ``filament_deficit._is_spoolman_mode`` — kept private
  985. here to avoid making this module import-dependent on that private
  986. helper's signature."""
  987. try:
  988. from backend.app.api.routes.settings import get_setting
  989. v = await get_setting(db, "spoolman_enabled")
  990. return bool(v) and v.lower() == "true"
  991. except Exception:
  992. return False
  993. @staticmethod
  994. def _slot_priority(ams_id: int | None, tray_id: int | None) -> int:
  995. """Deterministic slot-position tie-breaker for the prefer-lowest sort.
  996. Three bands, matched to the emission order in ``_build_loaded_filaments``
  997. so a tied sort produces the same physical-position order the pre-#1508
  998. stable sort did (preserves the regression-free baseline):
  999. - Regular AMS (``ams_id`` 0..7): ``ams_id * 4 + tray_id`` → 0..31
  1000. - AMS-HT (``ams_id`` >= 128, single tray): ``1000 + (ams_id - 128) * 4``
  1001. - External / VT (``ams_id`` < 0, or ``None``): ``10_000``
  1002. Banding ensures regular AMS < AMS-HT < external on ties, regardless of
  1003. what the raw ``ams_id`` happens to be (in particular, ``ams_id = -1``
  1004. for VT must NOT sort to a negative number or it would beat AMS slot 0).
  1005. """
  1006. if ams_id is None or ams_id < 0:
  1007. return 10_000
  1008. if ams_id >= 128:
  1009. return 1_000 + (ams_id - 128) * 4 + (tray_id or 0)
  1010. return ams_id * 4 + (tray_id or 0)
  1011. @staticmethod
  1012. def _prefer_lowest_sort_key(f: dict, overrides: dict[int, float] | None) -> tuple[int, float, int]:
  1013. """Sort key for the "Prefer Lowest Remaining Filament" preference.
  1014. Two-tier ordering: inventory-tracked spools always sort BEFORE
  1015. non-tracked spools (the user has told us they care about these
  1016. specifically), then ascending by remaining within each tier, then
  1017. ascending by AMS slot position as the deterministic tie-breaker.
  1018. Tiers are flagged by the first tuple element (0 = inventory-tracked,
  1019. 1 = MQTT-only / unknown). Cross-tier value comparisons never run
  1020. because the tier flag dominates — which is what lets us mix grams
  1021. (inventory) and percent (MQTT) without a unit conversion.
  1022. Within the MQTT tier ``remain = -1`` (unknown) is mapped to 101 so
  1023. spools the printer DOES know something about sort ahead of those
  1024. it knows nothing about — preserves pre-#1508 behaviour for the
  1025. no-inventory-binding case.
  1026. Slot tie-breaker via ``_slot_priority`` so regular AMS < AMS-HT <
  1027. external on ties, matching the legacy emission-order stable sort.
  1028. """
  1029. gtid = f.get("global_tray_id")
  1030. slot_order = PrintScheduler._slot_priority(f.get("ams_id"), f.get("tray_id"))
  1031. if overrides and gtid in overrides:
  1032. return (0, overrides[gtid], slot_order)
  1033. remain = f.get("remain", -1)
  1034. return (1, float(remain) if remain is not None and remain >= 0 else 101.0, slot_order)
  1035. def _match_filaments_to_slots(
  1036. self,
  1037. required: list[dict],
  1038. loaded: list[dict],
  1039. prefer_lowest: bool = False,
  1040. inventory_remain_overrides: dict[int, float] | None = None,
  1041. ) -> list[int] | None:
  1042. """Match required filaments to loaded filaments and build AMS mapping.
  1043. Priority: unique tray_info_idx match > exact color match > similar color match > type-only match
  1044. The tray_info_idx is a filament type identifier stored in the 3MF file when the user
  1045. slices (e.g., "GFA00" for generic PLA, "P4d64437" for custom presets). If the same
  1046. tray_info_idx appears in only ONE available tray, we use that tray. If multiple trays
  1047. have the same tray_info_idx (e.g., two spools of generic PLA), we fall back to color
  1048. matching among those trays.
  1049. Args:
  1050. required: List of required filaments with slot_id, type, color, tray_info_idx
  1051. loaded: List of loaded filaments with type, color, tray_info_idx, global_tray_id
  1052. Returns:
  1053. AMS mapping array (position = slot_id - 1, value = global_tray_id or -1)
  1054. """
  1055. if not required:
  1056. return None
  1057. # Track used trays to avoid duplicate assignment
  1058. used_tray_ids: set[int] = set()
  1059. comparisons = []
  1060. for req in required:
  1061. req_type = (req.get("type") or "").upper()
  1062. req_color = req.get("color", "")
  1063. req_tray_info_idx = req.get("tray_info_idx", "")
  1064. # Find best match: unique tray_info_idx > exact color > similar color > type-only
  1065. idx_match = None
  1066. exact_match = None
  1067. similar_match = None
  1068. type_only_match = None
  1069. # Get available trays (not already used)
  1070. available = [f for f in loaded if f["global_tray_id"] not in used_tray_ids]
  1071. # Nozzle-aware filtering: restrict to trays on the correct nozzle.
  1072. # Hard filter — cross-nozzle assignment causes print failures
  1073. # ("position of left hotend is abnormal"), so never fall back.
  1074. req_nozzle_id = req.get("nozzle_id")
  1075. if req_nozzle_id is not None:
  1076. available = [f for f in available if f.get("extruder_id") == req_nozzle_id]
  1077. # Sort by remaining filament (ascending) so lowest-remain spool wins .find().
  1078. # Inventory-tracked spools sort before MQTT-only ones (#1508); see
  1079. # _prefer_lowest_sort_key for the full rationale.
  1080. if prefer_lowest:
  1081. available.sort(key=lambda f: self._prefer_lowest_sort_key(f, inventory_remain_overrides))
  1082. # Check if tray_info_idx is unique among available trays
  1083. if req_tray_info_idx:
  1084. idx_matches = [f for f in available if f.get("tray_info_idx") == req_tray_info_idx]
  1085. if len(idx_matches) == 1:
  1086. # Unique tray_info_idx - use it as definitive match
  1087. idx_match = idx_matches[0]
  1088. logger.debug(
  1089. f"Matched filament slot {req.get('slot_id')} by unique tray_info_idx={req_tray_info_idx} "
  1090. f"-> tray {idx_match['global_tray_id']}"
  1091. )
  1092. elif len(idx_matches) > 1:
  1093. # Multiple trays with same tray_info_idx - use color matching among them
  1094. logger.debug(
  1095. f"Non-unique tray_info_idx={req_tray_info_idx} found in {len(idx_matches)} trays, "
  1096. f"using color matching among trays: {[f['global_tray_id'] for f in idx_matches]}"
  1097. )
  1098. if prefer_lowest:
  1099. idx_matches.sort(key=lambda f: self._prefer_lowest_sort_key(f, inventory_remain_overrides))
  1100. # Use color matching within this subset
  1101. for f in idx_matches:
  1102. f_color = f.get("color", "")
  1103. if self._normalize_color_for_compare(f_color) == self._normalize_color_for_compare(req_color):
  1104. if not exact_match:
  1105. exact_match = f
  1106. elif self._colors_are_similar(f_color, req_color):
  1107. if not similar_match:
  1108. similar_match = f
  1109. elif not type_only_match:
  1110. type_only_match = f
  1111. # If no idx_match yet, do standard type/color matching on all available trays
  1112. if not idx_match and not exact_match and not similar_match and not type_only_match:
  1113. for f in available:
  1114. f_type = (f.get("type") or "").upper()
  1115. if _canonical_filament_type(f_type) != _canonical_filament_type(req_type):
  1116. continue
  1117. # Type matches - check color
  1118. f_color = f.get("color", "")
  1119. if self._normalize_color_for_compare(f_color) == self._normalize_color_for_compare(req_color):
  1120. if not exact_match:
  1121. exact_match = f
  1122. elif self._colors_are_similar(f_color, req_color):
  1123. if not similar_match:
  1124. similar_match = f
  1125. elif not type_only_match:
  1126. type_only_match = f
  1127. match = idx_match or exact_match or similar_match or type_only_match
  1128. if match:
  1129. used_tray_ids.add(match["global_tray_id"])
  1130. comparisons.append({"slot_id": req.get("slot_id", 0), "global_tray_id": match["global_tray_id"]})
  1131. else:
  1132. comparisons.append({"slot_id": req.get("slot_id", 0), "global_tray_id": -1})
  1133. # Build mapping array
  1134. if not comparisons:
  1135. return None
  1136. max_slot_id = max(c["slot_id"] for c in comparisons)
  1137. if max_slot_id <= 0:
  1138. return None
  1139. mapping = [-1] * max_slot_id
  1140. for c in comparisons:
  1141. slot_id = c["slot_id"]
  1142. if slot_id and slot_id > 0:
  1143. mapping[slot_id - 1] = c["global_tray_id"]
  1144. return mapping
  1145. def _mark_printer_dispatched(
  1146. self,
  1147. printer_id: int,
  1148. pre_state: str | None,
  1149. pre_subtask_id: str | None,
  1150. ) -> None:
  1151. """Record that a print command was just sent to ``printer_id``.
  1152. Held until either the watchdog observes a state/subtask transition
  1153. (success path) or the hard timeout expires. See ``_dispatch_holds``.
  1154. """
  1155. if not pre_state:
  1156. # No pre_state means we can't detect a transition — fall back to a
  1157. # pure time-based hold using empty string as a sentinel that won't
  1158. # match any real printer state.
  1159. pre_state = ""
  1160. self._dispatch_holds[printer_id] = (time.monotonic(), pre_state, pre_subtask_id)
  1161. def _release_dispatch_hold(self, printer_id: int) -> None:
  1162. """Drop the dispatch hold for ``printer_id`` (called by the watchdog)."""
  1163. self._dispatch_holds.pop(printer_id, None)
  1164. def _printer_in_dispatch_hold(self, printer_id: int) -> bool:
  1165. """True if ``printer_id`` is still inside its post-dispatch hold window.
  1166. Returns False (and clears the hold) once any of these are true:
  1167. - hard timeout (``_dispatch_max_hold``) has elapsed
  1168. - the printer has transitioned out of pre_state and we're past the
  1169. minimum cooldown
  1170. - the printer's subtask_id has advanced past pre_subtask_id and we're
  1171. past the minimum cooldown
  1172. Otherwise the printer is held — caller should treat it as busy.
  1173. """
  1174. entry = self._dispatch_holds.get(printer_id)
  1175. if not entry:
  1176. return False
  1177. started_at, pre_state, pre_subtask_id = entry
  1178. elapsed = time.monotonic() - started_at
  1179. if elapsed >= self._dispatch_max_hold:
  1180. self._dispatch_holds.pop(printer_id, None)
  1181. return False
  1182. # Without a pre_state we can't detect a transition — fall back to the
  1183. # min cooldown alone, then drop the hold.
  1184. if not pre_state:
  1185. if elapsed >= self._dispatch_min_cooldown:
  1186. self._dispatch_holds.pop(printer_id, None)
  1187. return False
  1188. return True
  1189. status = printer_manager.get_status(printer_id)
  1190. current_state = getattr(status, "state", None) if status else None
  1191. current_subtask_id = getattr(status, "subtask_id", None) if status else None
  1192. transitioned = (current_state is not None and current_state != pre_state) or (
  1193. pre_subtask_id is not None and current_subtask_id is not None and current_subtask_id != pre_subtask_id
  1194. )
  1195. if transitioned and elapsed >= self._dispatch_min_cooldown:
  1196. self._dispatch_holds.pop(printer_id, None)
  1197. return False
  1198. return True
  1199. def _is_printer_idle(self, printer_id: int, require_plate_clear: bool = True) -> bool:
  1200. """Check if a printer is connected and idle."""
  1201. if not printer_manager.is_connected(printer_id):
  1202. logger.debug("Printer %d: not connected", printer_id)
  1203. return False
  1204. state = printer_manager.get_status(printer_id)
  1205. if not state:
  1206. logger.debug("Printer %d: no status available", printer_id)
  1207. return False
  1208. # Plate-clear gate: if the printer finished/failed a previous print and the user
  1209. # hasn't acknowledged the plate was cleared, the queue must not dispatch the next
  1210. # job — even if the printer currently reports IDLE. After Auto Off cycles the
  1211. # printer, it boots back into IDLE with no memory of the previous finish; without
  1212. # the persisted awaiting flag we'd bypass the confirmation prompt (#961).
  1213. if require_plate_clear and printer_manager.is_awaiting_plate_clear(printer_id):
  1214. logger.debug(
  1215. "Printer %d: not idle — awaiting plate-clear acknowledgment (state=%s)",
  1216. printer_id,
  1217. state.state,
  1218. )
  1219. return False
  1220. idle = state.state in ("IDLE", "FINISH", "FAILED")
  1221. if not idle:
  1222. logger.debug("Printer %d: not idle — state=%s", printer_id, state.state)
  1223. return idle
  1224. async def _get_setting(self, db: AsyncSession, key: str) -> str | None:
  1225. """Read a setting value from the database."""
  1226. result = await db.execute(select(Settings).where(Settings.key == key))
  1227. setting = result.scalar_one_or_none()
  1228. return setting.value if setting else None
  1229. async def _get_bool_setting(self, db: AsyncSession, key: str, default: bool = False) -> bool:
  1230. """Read a boolean setting from the database."""
  1231. result = await db.execute(select(Settings).where(Settings.key == key))
  1232. setting = result.scalar_one_or_none()
  1233. if setting:
  1234. return setting.value.lower() == "true"
  1235. return default
  1236. async def _get_drying_presets(self, db: AsyncSession) -> dict[str, dict[str, int]]:
  1237. """Get drying presets (user-configured or built-in defaults)."""
  1238. result = await db.execute(select(Settings).where(Settings.key == "drying_presets"))
  1239. setting = result.scalar_one_or_none()
  1240. if setting and setting.value:
  1241. try:
  1242. presets = json.loads(setting.value)
  1243. if isinstance(presets, dict) and presets:
  1244. return presets
  1245. except json.JSONDecodeError:
  1246. pass
  1247. return self.DEFAULT_DRYING_PRESETS
  1248. def _get_conservative_drying_params(
  1249. self, trays: list[dict], module_type: str, presets: dict[str, dict[str, int]]
  1250. ) -> tuple[int, int, str] | None:
  1251. """Get the most conservative drying params for mixed filament types in an AMS unit.
  1252. Returns (temp, duration_hours, filament_type) or None if no drying-eligible filaments.
  1253. """
  1254. temp_key = module_type if module_type in ("n3f", "n3s") else "n3f"
  1255. hours_key = f"{temp_key}_hours"
  1256. min_temp = None
  1257. max_hours = None
  1258. filament_type = ""
  1259. for tray in trays:
  1260. tray_type = tray.get("tray_type", "")
  1261. if not tray_type:
  1262. continue
  1263. # Normalize filament type for preset lookup (e.g., "PLA Basic" -> "PLA")
  1264. base_type = tray_type.split()[0].upper()
  1265. preset = presets.get(base_type)
  1266. if not preset:
  1267. continue
  1268. temp = preset.get(temp_key, 55)
  1269. hours = preset.get(hours_key, 12)
  1270. # Conservative: lowest temp, longest duration
  1271. if min_temp is None or temp < min_temp:
  1272. min_temp = temp
  1273. if max_hours is None or hours > max_hours:
  1274. max_hours = hours
  1275. if not filament_type:
  1276. filament_type = base_type
  1277. if min_temp is None:
  1278. return None
  1279. return (min_temp, max_hours or 12, filament_type)
  1280. async def _check_auto_drying(
  1281. self,
  1282. db: AsyncSession,
  1283. queue_items: list[PrintQueueItem],
  1284. busy_printers: set[int],
  1285. *,
  1286. require_plate_clear: bool = True,
  1287. ):
  1288. """Start drying on idle printers based on humidity.
  1289. Two modes (can both be enabled):
  1290. - queue_drying_enabled: Dry between scheduled queue prints
  1291. - ambient_drying_enabled: Dry any idle printer when humidity is high, regardless of queue
  1292. """
  1293. queue_drying_enabled = await self._get_bool_setting(db, "queue_drying_enabled")
  1294. ambient_drying_enabled = await self._get_bool_setting(db, "ambient_drying_enabled")
  1295. if not queue_drying_enabled and not ambient_drying_enabled:
  1296. # Stop active drying on all printers if both features disabled
  1297. if self._drying_in_progress:
  1298. for pid in list(self._drying_in_progress):
  1299. logger.info("Auto-drying: printer %d — stopping, auto-drying disabled", pid)
  1300. await self._stop_drying(pid)
  1301. return
  1302. # Update drying state from printer status (handles backend restart)
  1303. self._sync_drying_state()
  1304. # Find printers with scheduled items (for queue drying mode)
  1305. printers_with_scheduled: set[int] = set()
  1306. printers_with_items: set[int] = set()
  1307. for item in queue_items:
  1308. if item.printer_id:
  1309. printers_with_items.add(item.printer_id)
  1310. if item.scheduled_time and not item.manual_start:
  1311. printers_with_scheduled.add(item.printer_id)
  1312. # If only queue mode is on and no printers have scheduled items, stop drying
  1313. if not ambient_drying_enabled and not printers_with_scheduled:
  1314. for pid in list(self._drying_in_progress):
  1315. logger.info("Auto-drying: printer %d — stopping, no scheduled prints in queue", pid)
  1316. await self._stop_drying(pid)
  1317. return
  1318. # Get humidity threshold
  1319. result = await db.execute(select(Settings).where(Settings.key == "ams_humidity_fair"))
  1320. setting = result.scalar_one_or_none()
  1321. humidity_threshold = int(setting.value) if setting else 60
  1322. # Get drying presets
  1323. presets = await self._get_drying_presets(db)
  1324. # Determine if drying should be skipped for printers with pending items
  1325. block_for_drying = await self._get_bool_setting(db, "queue_drying_block")
  1326. # Get all active printers
  1327. all_printers = await db.execute(select(Printer).where(Printer.is_active.is_(True)))
  1328. for printer in all_printers.scalars():
  1329. pid = printer.id
  1330. if pid in busy_printers:
  1331. logger.debug("Auto-drying: printer %d skipped — busy", pid)
  1332. continue
  1333. # In queue-only mode, only dry printers that have scheduled prints
  1334. if not ambient_drying_enabled and pid not in printers_with_scheduled:
  1335. if self._drying_in_progress.get(pid):
  1336. logger.info("Auto-drying: printer %d — stopping, no scheduled prints for this printer", pid)
  1337. await self._stop_drying(pid)
  1338. logger.debug("Auto-drying: printer %d skipped — no scheduled prints", pid)
  1339. continue
  1340. # When block mode is on, don't START new drying on printers with pending items.
  1341. # But allow already-drying printers through so humidity auto-stop logic still runs.
  1342. if block_for_drying and pid in printers_with_items and not self._drying_in_progress.get(pid):
  1343. logger.debug("Auto-drying: printer %d skipped — has pending items (block mode)", pid)
  1344. continue
  1345. if not printer_manager.is_connected(pid):
  1346. logger.debug("Auto-drying: printer %d skipped — not connected", pid)
  1347. continue
  1348. if not self._is_printer_idle(pid, require_plate_clear):
  1349. logger.debug("Auto-drying: printer %d skipped — not idle", pid)
  1350. continue
  1351. # Check if this printer supports drying
  1352. state = printer_manager.get_status(pid)
  1353. if not state:
  1354. logger.debug("Auto-drying: printer %d skipped — no state", pid)
  1355. continue
  1356. model = printer_manager.get_model(pid)
  1357. firmware = state.firmware_version
  1358. if not supports_drying(model, firmware):
  1359. logger.debug("Auto-drying: printer %d skipped — model %s does not support drying", pid, model)
  1360. continue
  1361. # Check each AMS unit from raw_data
  1362. ams_list = state.raw_data.get("ams", [])
  1363. logger.debug("Auto-drying: printer %d — checking %d AMS units", pid, len(ams_list))
  1364. for ams_data in ams_list:
  1365. module_type = str(ams_data.get("module_type") or "")
  1366. ams_id = int(ams_data.get("id", 0))
  1367. # Only n3f/n3s support drying
  1368. if module_type not in ("n3f", "n3s"):
  1369. logger.debug("Auto-drying: printer %d AMS %d skipped — module_type=%s", pid, ams_id, module_type)
  1370. continue
  1371. dry_time = int(ams_data.get("dry_time") or 0)
  1372. # Read humidity — prefer humidity_raw (actual %) over humidity (index 1-5)
  1373. humidity = None
  1374. h_raw = ams_data.get("humidity_raw")
  1375. if h_raw is not None:
  1376. try:
  1377. humidity = int(h_raw)
  1378. except (ValueError, TypeError):
  1379. pass
  1380. if humidity is None:
  1381. h_idx = ams_data.get("humidity")
  1382. if h_idx is not None:
  1383. try:
  1384. humidity = int(h_idx)
  1385. except (ValueError, TypeError):
  1386. pass
  1387. # Already drying — check if humidity dropped below threshold (with minimum drying time)
  1388. if dry_time > 0:
  1389. if pid not in self._drying_in_progress:
  1390. # Drying we didn't start (manual or from before restart) — track but don't stop
  1391. self._drying_in_progress[pid] = time.monotonic()
  1392. started_at = self._drying_in_progress[pid]
  1393. elapsed = time.monotonic() - started_at
  1394. if humidity is not None and humidity <= humidity_threshold and elapsed >= self._min_drying_seconds:
  1395. logger.info(
  1396. "Auto-drying: printer %d AMS %d — humidity %d%% <= threshold %d%% after %dm, stopping drying",
  1397. pid,
  1398. ams_id,
  1399. humidity,
  1400. humidity_threshold,
  1401. int(elapsed / 60),
  1402. )
  1403. printer_manager.send_drying_command(pid, ams_id, temp=0, duration=0, mode=0)
  1404. else:
  1405. logger.debug(
  1406. "Auto-drying: printer %d AMS %d — drying (%dm left, humidity %s%%, elapsed %dm/%dm min)",
  1407. pid,
  1408. ams_id,
  1409. dry_time,
  1410. humidity,
  1411. int(elapsed / 60),
  1412. self._min_drying_seconds // 60,
  1413. )
  1414. continue
  1415. # Humidity below threshold — no need to start drying
  1416. if humidity is None or humidity <= humidity_threshold:
  1417. logger.debug(
  1418. "Auto-drying: printer %d AMS %d skipped — humidity %s <= threshold %d",
  1419. pid,
  1420. ams_id,
  1421. humidity,
  1422. humidity_threshold,
  1423. )
  1424. continue
  1425. # Check cannot-dry reasons (power constraints etc.)
  1426. sf_reasons = ams_data.get("dry_sf_reason", [])
  1427. if sf_reasons:
  1428. logger.debug(
  1429. "Auto-drying: printer %d AMS %d skipped — cannot dry reasons: %s",
  1430. pid,
  1431. ams_id,
  1432. sf_reasons,
  1433. )
  1434. continue
  1435. # Get conservative drying params for mixed filaments
  1436. trays = ams_data.get("tray", [])
  1437. params = self._get_conservative_drying_params(trays, module_type, presets)
  1438. if not params:
  1439. logger.debug(
  1440. "Auto-drying: printer %d AMS %d skipped — no drying-eligible filaments in trays", pid, ams_id
  1441. )
  1442. continue
  1443. temp, duration_hours, filament_type = params
  1444. # Start drying
  1445. logger.info(
  1446. "Auto-drying: printer %d AMS %d — humidity %d%% > threshold %d%%, "
  1447. "starting %s drying at %d°C for %dh",
  1448. pid,
  1449. ams_id,
  1450. humidity,
  1451. humidity_threshold,
  1452. filament_type,
  1453. temp,
  1454. duration_hours,
  1455. )
  1456. success = printer_manager.send_drying_command(
  1457. pid, ams_id, temp, duration_hours, mode=1, filament=filament_type
  1458. )
  1459. if success:
  1460. self._drying_in_progress[pid] = time.monotonic()
  1461. def _sync_drying_state(self):
  1462. """Sync in-memory drying state with actual printer status.
  1463. Handles backend restart — if a printer is drying but we don't know about it,
  1464. update our state. If we think it's drying but it's not, clear it.
  1465. """
  1466. to_remove = []
  1467. for pid in self._drying_in_progress:
  1468. state = printer_manager.get_status(pid)
  1469. if not state:
  1470. to_remove.append(pid)
  1471. continue
  1472. # Check if any AMS unit is still drying
  1473. ams_list = state.raw_data.get("ams", [])
  1474. any_drying = any(int(a.get("dry_time") or 0) > 0 for a in ams_list)
  1475. if not any_drying:
  1476. to_remove.append(pid)
  1477. for pid in to_remove:
  1478. self._drying_in_progress.pop(pid, None)
  1479. async def _stop_drying(self, printer_id: int):
  1480. """Stop all active drying on a printer (print takes priority)."""
  1481. state = printer_manager.get_status(printer_id)
  1482. if not state:
  1483. self._drying_in_progress.pop(printer_id, None)
  1484. return
  1485. ams_list = state.raw_data.get("ams", [])
  1486. for ams_data in ams_list:
  1487. dry_time = int(ams_data.get("dry_time") or 0)
  1488. if dry_time > 0:
  1489. ams_id = int(ams_data.get("id", 0))
  1490. logger.info(
  1491. "Auto-drying: stopping drying on printer %d AMS %d — print takes priority",
  1492. printer_id,
  1493. ams_id,
  1494. )
  1495. printer_manager.send_drying_command(printer_id, ams_id, 0, 0, mode=0)
  1496. self._drying_in_progress.pop(printer_id, None)
  1497. async def _get_smart_plugs(self, db: AsyncSession, printer_id: int) -> list[SmartPlug]:
  1498. """Get all smart plugs associated with a printer."""
  1499. result = await db.execute(select(SmartPlug).where(SmartPlug.printer_id == printer_id))
  1500. return list(result.scalars().all())
  1501. async def _power_on_and_wait(self, plug: SmartPlug, printer_id: int, db: AsyncSession) -> bool:
  1502. """Turn on smart plug and wait for printer to connect.
  1503. Returns True if printer connected successfully within timeout.
  1504. """
  1505. # Get the appropriate service for the plug type (Tasmota or Home Assistant)
  1506. service = await smart_plug_manager.get_service_for_plug(plug, db)
  1507. # Check current plug state
  1508. status = await service.get_status(plug)
  1509. if not status.get("reachable"):
  1510. logger.warning("Smart plug '%s' is not reachable", plug.name)
  1511. return False
  1512. # Turn on if not already on
  1513. if status.get("state") != "ON":
  1514. success = await service.turn_on(plug)
  1515. if not success:
  1516. logger.warning("Failed to turn on smart plug '%s'", plug.name)
  1517. return False
  1518. logger.info("Powered on smart plug '%s' for printer %s", plug.name, printer_id)
  1519. # Get printer from database for connection
  1520. result = await db.execute(select(Printer).where(Printer.id == printer_id))
  1521. printer = result.scalar_one_or_none()
  1522. if not printer:
  1523. logger.error("Printer %s not found in database", printer_id)
  1524. return False
  1525. # Wait for printer to boot (give it some time before trying to connect)
  1526. logger.info("Waiting 30s for printer %s to boot...", printer_id)
  1527. await asyncio.sleep(30)
  1528. # Try to connect to the printer periodically
  1529. elapsed = 30 # Already waited 30s
  1530. while elapsed < self._power_on_wait_time:
  1531. # Try to connect
  1532. logger.info("Attempting to connect to printer %s...", printer_id)
  1533. try:
  1534. connected = await printer_manager.connect_printer(printer)
  1535. if connected:
  1536. logger.info("Printer %s connected after %ss", printer_id, elapsed)
  1537. # Give it a moment to stabilize and get status
  1538. await asyncio.sleep(5)
  1539. return True
  1540. except Exception as e:
  1541. logger.debug("Connection attempt failed: %s", e)
  1542. await asyncio.sleep(self._power_on_check_interval)
  1543. elapsed += self._power_on_check_interval
  1544. logger.debug("Waiting for printer %s to connect... (%ss)", printer_id, elapsed)
  1545. logger.warning("Printer %s did not connect within %ss after power on", printer_id, self._power_on_wait_time)
  1546. return False
  1547. async def _check_previous_success(self, db: AsyncSession, item: PrintQueueItem) -> bool:
  1548. """Check if the previous print on this printer succeeded."""
  1549. # Find the most recent completed queue item for this printer
  1550. result = await db.execute(
  1551. select(PrintQueueItem)
  1552. .where(PrintQueueItem.printer_id == item.printer_id)
  1553. .where(PrintQueueItem.id != item.id)
  1554. .where(PrintQueueItem.status.in_(["completed", "failed", "skipped", "aborted"]))
  1555. .order_by(PrintQueueItem.completed_at.desc())
  1556. .limit(1)
  1557. )
  1558. prev_item = result.scalar_one_or_none()
  1559. # If no previous item, assume success (first in queue)
  1560. if not prev_item:
  1561. return True
  1562. return prev_item.status == "completed"
  1563. async def _power_off_if_needed(self, db: AsyncSession, item: PrintQueueItem):
  1564. """Power off printer if auto_off_after is enabled (waits for cooldown)."""
  1565. if not item.auto_off_after:
  1566. return
  1567. plugs = await self._get_smart_plugs(db, item.printer_id)
  1568. plug_ids = [p.id for p in plugs if p.enabled]
  1569. if plug_ids:
  1570. logger.info("Auto-off: Waiting for printer %s to cool down before power off...", item.printer_id)
  1571. # Wait for cooldown (up to 10 minutes)
  1572. await printer_manager.wait_for_cooldown(item.printer_id, target_temp=50.0, timeout=600)
  1573. # Re-fetch plugs in a fresh session after the long cooldown wait
  1574. async with async_session() as new_db:
  1575. for plug_id in plug_ids:
  1576. try:
  1577. result = await new_db.execute(select(SmartPlug).where(SmartPlug.id == plug_id))
  1578. plug = result.scalar_one_or_none()
  1579. if plug and plug.enabled:
  1580. logger.info("Auto-off: Powering off plug '%s' for printer %s", plug.name, item.printer_id)
  1581. service = await smart_plug_manager.get_service_for_plug(plug, new_db)
  1582. await service.turn_off(plug)
  1583. except Exception as e:
  1584. logger.warning(
  1585. "Auto-off: Failed to power off plug %s for printer %s: %s", plug_id, item.printer_id, e
  1586. )
  1587. async def _get_job_name(self, db: AsyncSession, item: PrintQueueItem) -> str:
  1588. """Get a human-readable name for a queue item."""
  1589. if item.archive_id:
  1590. result = await db.execute(select(PrintArchive).where(PrintArchive.id == item.archive_id))
  1591. archive = result.scalar_one_or_none()
  1592. if archive:
  1593. return archive.filename.replace(".gcode.3mf", "").replace(".3mf", "")
  1594. if item.library_file_id:
  1595. result = await db.execute(LibraryFile.active().where(LibraryFile.id == item.library_file_id))
  1596. library_file = result.scalar_one_or_none()
  1597. if library_file:
  1598. return library_file.filename.replace(".gcode.3mf", "").replace(".3mf", "")
  1599. return f"Job #{item.id}"
  1600. async def _get_printer(self, db: AsyncSession, printer_id: int) -> Printer | None:
  1601. """Get printer by ID."""
  1602. result = await db.execute(select(Printer).where(Printer.id == printer_id))
  1603. return result.scalar_one_or_none()
  1604. async def _block_on_filament_deficit(
  1605. self,
  1606. db: AsyncSession,
  1607. item: PrintQueueItem,
  1608. ) -> bool:
  1609. """Promote the item to manual_start when the assigned spool is short (#1496).
  1610. Returns True when this dispatch attempt was blocked, False when the
  1611. item is clear to start. A previously-flagged item whose spool has
  1612. since been swapped to one with enough material clears the flag here
  1613. so the next scheduler tick dispatches it.
  1614. """
  1615. try:
  1616. deficit = await compute_deficit_for_queue_item(db, item)
  1617. except Exception as e:
  1618. # Never let a flaky deficit check wedge the queue — log and let
  1619. # dispatch proceed. The PrintModal-side check still runs on the
  1620. # manual paths.
  1621. logger.warning("Filament deficit check failed for item %s: %s", item.id, e)
  1622. return False
  1623. if deficit:
  1624. item.filament_short = True
  1625. item.manual_start = True
  1626. await db.commit()
  1627. job_name = await self._get_job_name(db, item)
  1628. printer = await self._get_printer(db, item.printer_id) if item.printer_id else None
  1629. logger.info(
  1630. "Queue item %s blocked on filament deficit (%d slot(s)) — promoted to manual_start",
  1631. item.id,
  1632. len(deficit),
  1633. )
  1634. try:
  1635. await notification_service.on_queue_job_waiting(
  1636. job_name=job_name,
  1637. target_model=(printer.model if printer else "") or "",
  1638. waiting_reason="filament_short",
  1639. db=db,
  1640. )
  1641. except Exception as e:
  1642. logger.debug("filament_short notification failed for item %s: %s", item.id, e)
  1643. return True
  1644. # No deficit — clear any stale flag from a previous tick.
  1645. if item.filament_short:
  1646. item.filament_short = False
  1647. await db.commit()
  1648. return False
  1649. async def _start_print(self, db: AsyncSession, item: PrintQueueItem):
  1650. """Upload file and start print for a queue item.
  1651. Supports two sources:
  1652. - archive_id: Print from an existing archive
  1653. - library_file_id: Print from a library file (file manager)
  1654. """
  1655. logger.info("Starting queue item %s", item.id)
  1656. # Get printer first (needed for both paths)
  1657. result = await db.execute(select(Printer).where(Printer.id == item.printer_id))
  1658. printer = result.scalar_one_or_none()
  1659. if not printer:
  1660. item.status = "failed"
  1661. item.error_message = "Printer not found"
  1662. item.completed_at = datetime.now(timezone.utc)
  1663. await db.commit()
  1664. logger.error("Queue item %s: Printer %s not found", item.id, item.printer_id)
  1665. await self._power_off_if_needed(db, item)
  1666. return
  1667. # Check printer is connected
  1668. if not printer_manager.is_connected(item.printer_id):
  1669. item.status = "failed"
  1670. item.error_message = "Printer not connected"
  1671. item.completed_at = datetime.now(timezone.utc)
  1672. await db.commit()
  1673. logger.error("Queue item %s: Printer %s not connected", item.id, item.printer_id)
  1674. await self._power_off_if_needed(db, item)
  1675. return
  1676. # Determine source: archive or library file
  1677. archive = None
  1678. library_file = None
  1679. file_path = None
  1680. filename = None
  1681. if item.archive_id:
  1682. # Print from archive
  1683. result = await db.execute(select(PrintArchive).where(PrintArchive.id == item.archive_id))
  1684. archive = result.scalar_one_or_none()
  1685. if not archive:
  1686. item.status = "failed"
  1687. item.error_message = "Archive not found"
  1688. item.completed_at = datetime.now(timezone.utc)
  1689. await db.commit()
  1690. logger.error("Queue item %s: Archive %s not found", item.id, item.archive_id)
  1691. await self._power_off_if_needed(db, item)
  1692. return
  1693. file_path = settings.base_dir / archive.file_path
  1694. filename = archive.filename
  1695. elif item.library_file_id:
  1696. # Print from library file (file manager)
  1697. result = await db.execute(LibraryFile.active().where(LibraryFile.id == item.library_file_id))
  1698. library_file = result.scalar_one_or_none()
  1699. if not library_file:
  1700. item.status = "failed"
  1701. item.error_message = "Library file not found"
  1702. item.completed_at = datetime.now(timezone.utc)
  1703. await db.commit()
  1704. logger.error("Queue item %s: Library file %s not found", item.id, item.library_file_id)
  1705. await self._power_off_if_needed(db, item)
  1706. return
  1707. # Library files store absolute paths
  1708. lib_path = Path(library_file.file_path)
  1709. file_path = lib_path if lib_path.is_absolute() else settings.base_dir / library_file.file_path
  1710. filename = library_file.filename
  1711. # Create archive from library file so usage tracking has access to the 3MF
  1712. try:
  1713. from backend.app.services.archive import ArchiveService
  1714. archive_service = ArchiveService(db)
  1715. archive = await archive_service.archive_print(
  1716. printer_id=item.printer_id,
  1717. source_file=file_path,
  1718. original_filename=filename,
  1719. created_by_id=item.created_by_id,
  1720. project_id=item.project_id,
  1721. )
  1722. if archive:
  1723. item.archive_id = archive.id
  1724. await db.flush()
  1725. logger.info(
  1726. "Queue item %s: Created archive %s from library file %s",
  1727. item.id,
  1728. archive.id,
  1729. item.library_file_id,
  1730. )
  1731. except Exception as e:
  1732. logger.warning("Queue item %s: Failed to create archive from library file: %s", item.id, e)
  1733. else:
  1734. # Neither archive nor library file specified
  1735. item.status = "failed"
  1736. item.error_message = "No source file specified"
  1737. item.completed_at = datetime.now(timezone.utc)
  1738. await db.commit()
  1739. logger.error("Queue item %s: No archive_id or library_file_id specified", item.id)
  1740. await self._power_off_if_needed(db, item)
  1741. return
  1742. # Check file exists on disk
  1743. if not file_path.exists():
  1744. item.status = "failed"
  1745. item.error_message = "Source file not found on disk"
  1746. item.completed_at = datetime.now(timezone.utc)
  1747. await db.commit()
  1748. logger.error("Queue item %s: File not found: %s", item.id, file_path)
  1749. await self._power_off_if_needed(db, item)
  1750. return
  1751. # G-code injection for auto-print systems (#422)
  1752. injected_path = None
  1753. if item.gcode_injection:
  1754. try:
  1755. snippets_raw = await self._get_setting(db, "gcode_snippets")
  1756. if snippets_raw:
  1757. snippets = json.loads(snippets_raw)
  1758. model_snippets = snippets.get(printer.model, {})
  1759. start_gc = (model_snippets.get("start_gcode") or "").strip()
  1760. end_gc = (model_snippets.get("end_gcode") or "").strip()
  1761. if start_gc or end_gc:
  1762. from backend.app.utils.threemf_tools import inject_gcode_into_3mf
  1763. injected_path = inject_gcode_into_3mf(
  1764. file_path, item.plate_id or 1, start_gc or None, end_gc or None
  1765. )
  1766. if injected_path:
  1767. file_path = injected_path
  1768. logger.info("Queue item %s: G-code injected for model %s", item.id, printer.model)
  1769. else:
  1770. logger.warning(
  1771. "Queue item %s: G-code injection returned no result, using original", item.id
  1772. )
  1773. except Exception as e:
  1774. logger.warning("Queue item %s: G-code injection failed, using original: %s", item.id, e)
  1775. # Upload file to printer via FTP
  1776. # Use a clean filename to avoid issues with double extensions like .gcode.3mf
  1777. base_name = filename
  1778. if base_name.endswith(".gcode.3mf"):
  1779. base_name = base_name[:-10] # Remove .gcode.3mf
  1780. elif base_name.endswith(".3mf"):
  1781. base_name = base_name[:-4] # Remove .3mf
  1782. remote_filename = f"{base_name}.3mf"
  1783. # Sanitize: firmware parses ftp://{filename} as a URL, spaces break it
  1784. remote_filename = remote_filename.replace(" ", "_")
  1785. # Upload to root directory (not /cache/) - the start_print command references
  1786. # files by name only (ftp://{filename}), so they must be in the root
  1787. remote_path = f"/{remote_filename}"
  1788. # Get FTP retry settings
  1789. ftp_retry_enabled, ftp_retry_count, ftp_retry_delay, ftp_timeout = await get_ftp_retry_settings()
  1790. logger.info(
  1791. f"Queue item {item.id}: FTP upload starting - printer={printer.name} ({printer.model}), "
  1792. f"ip={printer.ip_address}, file={remote_filename}, local_path={file_path}, "
  1793. f"retry_enabled={ftp_retry_enabled}, retry_count={ftp_retry_count}, timeout={ftp_timeout}"
  1794. )
  1795. # Delete existing file if present (avoids 553 error on overwrite)
  1796. try:
  1797. logger.debug("Queue item %s: Deleting existing file %s if present...", item.id, remote_path)
  1798. delete_result = await delete_file_async(
  1799. printer.ip_address,
  1800. printer.access_code,
  1801. remote_path,
  1802. socket_timeout=ftp_timeout,
  1803. printer_model=printer.model,
  1804. )
  1805. logger.debug("Queue item %s: Delete result: %s", item.id, delete_result)
  1806. except Exception as e:
  1807. logger.debug("Queue item %s: Delete failed (may not exist): %s", item.id, e)
  1808. try:
  1809. if ftp_retry_enabled:
  1810. uploaded = await with_ftp_retry(
  1811. upload_file_async,
  1812. printer.ip_address,
  1813. printer.access_code,
  1814. file_path,
  1815. remote_path,
  1816. socket_timeout=ftp_timeout,
  1817. printer_model=printer.model,
  1818. max_retries=ftp_retry_count,
  1819. retry_delay=ftp_retry_delay,
  1820. operation_name=f"Upload print to {printer.name}",
  1821. )
  1822. else:
  1823. uploaded = await upload_file_async(
  1824. printer.ip_address,
  1825. printer.access_code,
  1826. file_path,
  1827. remote_path,
  1828. socket_timeout=ftp_timeout,
  1829. printer_model=printer.model,
  1830. )
  1831. except Exception as e:
  1832. uploaded = False
  1833. logger.error("Queue item %s: FTP error: %s (type: %s)", item.id, e, type(e).__name__)
  1834. # Clean up injected temp file after upload attempt
  1835. if injected_path and injected_path.exists():
  1836. injected_path.unlink(missing_ok=True)
  1837. if not uploaded:
  1838. error_msg = (
  1839. "Failed to upload file to printer. Check if SD card is inserted and properly formatted (FAT32/exFAT). "
  1840. "See server logs for detailed diagnostics."
  1841. )
  1842. item.status = "failed"
  1843. item.error_message = error_msg
  1844. item.completed_at = datetime.now(timezone.utc)
  1845. await db.commit()
  1846. logger.error(
  1847. f"Queue item {item.id}: FTP upload failed - printer={printer.name}, model={printer.model}, "
  1848. f"ip={printer.ip_address}. Check logs above for storage diagnostics and specific error codes."
  1849. )
  1850. # Send failure notification
  1851. await notification_service.on_queue_job_failed(
  1852. job_name=filename.replace(".gcode.3mf", "").replace(".3mf", ""),
  1853. printer_id=printer.id,
  1854. printer_name=printer.name,
  1855. reason="Failed to upload file to printer",
  1856. db=db,
  1857. )
  1858. await self._power_off_if_needed(db, item)
  1859. return
  1860. # Parse AMS mapping if stored
  1861. ams_mapping = None
  1862. if item.ams_mapping:
  1863. try:
  1864. ams_mapping = json.loads(item.ams_mapping)
  1865. except json.JSONDecodeError:
  1866. logger.warning("Queue item %s: Invalid AMS mapping JSON, ignoring", item.id)
  1867. # Register as expected print so we don't create a duplicate archive
  1868. # Only applicable for archive-based prints
  1869. if archive:
  1870. from backend.app.main import register_expected_print
  1871. register_expected_print(
  1872. item.printer_id,
  1873. remote_filename,
  1874. archive.id,
  1875. ams_mapping=ams_mapping,
  1876. created_by_id=item.created_by_id,
  1877. )
  1878. # IMPORTANT: Set status to "printing" BEFORE sending the print command.
  1879. # This prevents phantom reprints if the backend crashes/restarts after the
  1880. # print command is sent but before the status update is committed.
  1881. # If we crash after this commit but before start_print(), the item will be
  1882. # in "printing" status without actually printing - but that's safer than
  1883. # accidentally reprinting the same file hours later.
  1884. item.status = "printing"
  1885. item.started_at = datetime.now(timezone.utc)
  1886. await db.commit()
  1887. # Clear the awaiting-plate-clear flag now that we're starting a new print
  1888. printer_manager.set_awaiting_plate_clear(item.printer_id, False)
  1889. logger.info("Queue item %s: Status set to 'printing', sending print command...", item.id)
  1890. # Capture state before dispatch so the watchdog can detect whether the
  1891. # printer actually transitioned (#967). Also capture subtask_id so the
  1892. # watchdog can recognise "command landed but state hasn't flipped yet"
  1893. # on slow H2D transitions (#1078).
  1894. pre_status = printer_manager.get_status(item.printer_id)
  1895. pre_state = getattr(pre_status, "state", None) if pre_status else None
  1896. pre_subtask_id = getattr(pre_status, "subtask_id", None) if pre_status else None
  1897. pre_gcode_file = getattr(pre_status, "gcode_file", None) if pre_status else None
  1898. # Start the print with AMS mapping, plate_id and print options
  1899. started = printer_manager.start_print(
  1900. item.printer_id,
  1901. remote_filename,
  1902. plate_id=item.plate_id or 1,
  1903. ams_mapping=ams_mapping,
  1904. bed_levelling=item.bed_levelling,
  1905. flow_cali=item.flow_cali,
  1906. vibration_cali=item.vibration_cali,
  1907. layer_inspect=item.layer_inspect,
  1908. timelapse=item.timelapse,
  1909. use_ams=item.use_ams,
  1910. )
  1911. if started:
  1912. logger.info("Queue item %s: Print started successfully - %s", item.id, filename)
  1913. # Register the local 3MF in the cover-cache so /cover skips FTP
  1914. # (#1166 follow-up). file_path was resolved earlier from either the
  1915. # archive or the library file row.
  1916. if file_path is not None:
  1917. cache_3mf_download(item.printer_id, remote_filename, file_path)
  1918. # Hold the printer against further dispatches until the watchdog
  1919. # confirms the printer transitioned (or until the hard timeout).
  1920. # Prevents multi-plate batches from triple-dispatching onto the
  1921. # same H2D Pro while it digests the first project_file (#1157).
  1922. self._mark_printer_dispatched(item.printer_id, pre_state, pre_subtask_id)
  1923. # Watchdog: if the printer never transitions out of pre_state AND
  1924. # never advances subtask_id, the MQTT publish was accepted locally but
  1925. # didn't reach the printer (half-broken session — same shape as
  1926. # #887/#936). Revert the queue item so the next dispatch can pick it
  1927. # up instead of leaving it stuck in "printing" (#967). subtask_id
  1928. # check avoids false reverts on slow H2D FINISH→PREPARE transitions
  1929. # that would otherwise cause the item to re-dispatch as a reprint
  1930. # of the just-finished job (#1078).
  1931. if pre_state:
  1932. asyncio.create_task(
  1933. self._watchdog_print_start(
  1934. item.id,
  1935. item.printer_id,
  1936. pre_state,
  1937. pre_subtask_id,
  1938. pre_gcode_file,
  1939. )
  1940. )
  1941. # Get estimated time for notification
  1942. estimated_time = None
  1943. if archive and archive.print_time_seconds:
  1944. estimated_time = archive.print_time_seconds
  1945. elif library_file and library_file.print_time_seconds:
  1946. estimated_time = library_file.print_time_seconds
  1947. # Send job started notification
  1948. await notification_service.on_queue_job_started(
  1949. job_name=filename.replace(".gcode.3mf", "").replace(".3mf", ""),
  1950. printer_id=printer.id,
  1951. printer_name=printer.name,
  1952. db=db,
  1953. estimated_time=estimated_time,
  1954. )
  1955. # MQTT relay - publish queue job started
  1956. try:
  1957. from backend.app.services.mqtt_relay import mqtt_relay
  1958. await mqtt_relay.on_queue_job_started(
  1959. job_id=item.id,
  1960. filename=filename,
  1961. printer_id=printer.id,
  1962. printer_name=printer.name,
  1963. printer_serial=printer.serial_number,
  1964. )
  1965. except Exception:
  1966. pass # Don't fail if MQTT fails
  1967. else:
  1968. # Clean up uploaded file from SD card to prevent phantom prints
  1969. try:
  1970. await delete_file_async(
  1971. printer.ip_address,
  1972. printer.access_code,
  1973. remote_path,
  1974. printer_model=printer.model,
  1975. )
  1976. except Exception:
  1977. pass # Best-effort — don't fail the error handler
  1978. # Print command failed - revert status
  1979. item.status = "failed"
  1980. item.error_message = "Failed to send print command to printer"
  1981. item.completed_at = datetime.now(timezone.utc)
  1982. await db.commit()
  1983. logger.error(
  1984. f"Queue item {item.id}: Failed to start print on {printer.name} ({printer.model}) - "
  1985. f"printer_manager.start_print() returned False. "
  1986. f"This may indicate: printer not connected, MQTT error, unsupported model configuration, or firmware issue. "
  1987. f"Check printer status and backend logs for details."
  1988. )
  1989. # Send failure notification
  1990. await notification_service.on_queue_job_failed(
  1991. job_name=filename.replace(".gcode.3mf", "").replace(".3mf", ""),
  1992. printer_id=printer.id,
  1993. printer_name=printer.name,
  1994. reason="Failed to send print command to printer - check printer connection and status",
  1995. db=db,
  1996. )
  1997. await self._power_off_if_needed(db, item)
  1998. @staticmethod
  1999. async def _watchdog_print_start(
  2000. queue_item_id: int,
  2001. printer_id: int,
  2002. pre_state: str,
  2003. pre_subtask_id: str | None = None,
  2004. pre_gcode_file: str | None = None,
  2005. timeout: float = 90.0,
  2006. poll_interval: float = 3.0,
  2007. ) -> None:
  2008. """Revert a queue item if the printer never acknowledges the start command.
  2009. Bambuddy optimistically marks the queue item as "printing" right after the
  2010. MQTT project_file publish succeeds locally. If the printer drops/ignores the
  2011. command (half-broken MQTT session — #887/#936), the state never transitions
  2012. and the item would otherwise stay stuck in "printing" forever (#967).
  2013. Exit paths (printer picked up the job — no revert):
  2014. - gcode_state changed from pre_state, OR
  2015. - subtask_id advanced past pre_subtask_id — the printer echoes our
  2016. per-dispatch identity back on push_status, so a subtask_id change is
  2017. a definitive "command landed" signal even while state is still FINISH.
  2018. H2D can sit at FINISH for ~50 s after accepting project_file before
  2019. transitioning to PREPARE, which used to trip the state-only watchdog
  2020. and caused the scheduler to revert + re-dispatch the item; the next
  2021. successful dispatch then looked like a reprint of the just-finished
  2022. job (#1078).
  2023. Timeout raised from 45 s → 90 s as belt-and-braces for slow transitions
  2024. that also don't emit an early subtask_id tick.
  2025. """
  2026. deadline = time.monotonic() + timeout
  2027. last_status = None
  2028. while time.monotonic() < deadline:
  2029. await asyncio.sleep(poll_interval)
  2030. status = printer_manager.get_status(printer_id)
  2031. if not status:
  2032. # Printer disconnected — don't mess with the DB. Drop the
  2033. # in-memory dispatch hold too so a fresh dispatch can retry
  2034. # once the printer comes back; the hard timeout would
  2035. # otherwise hold the printer unnecessarily.
  2036. scheduler._release_dispatch_hold(printer_id)
  2037. return
  2038. last_status = status
  2039. if status.state in _ACTIVE_PRINT_STATES:
  2040. # Printer is actively processing the job — release the
  2041. # post-dispatch hold so the next pending item for this printer
  2042. # can be evaluated normally. We do NOT accept arbitrary state
  2043. # transitions: a printer going FINISH -> IDLE (user dismissed
  2044. # the post-print prompt without accepting our project_file)
  2045. # would otherwise look like "command landed" and leave the
  2046. # queue item stuck in 'printing' forever (#1370).
  2047. scheduler._release_dispatch_hold(printer_id)
  2048. return
  2049. if pre_subtask_id is not None and status.subtask_id is not None and status.subtask_id != pre_subtask_id:
  2050. # Printer picked up the job (subtask_id advanced). H2D can
  2051. # sit at FINISH for ~50 s after accepting project_file
  2052. # before transitioning to PREPARE, but the subtask_id flips
  2053. # to our submission_id almost immediately (#1078).
  2054. scheduler._release_dispatch_hold(printer_id)
  2055. return
  2056. # No transition. Revert the item so the scheduler can retry.
  2057. # Drop the in-memory hold so the retry isn't blocked by it.
  2058. scheduler._release_dispatch_hold(printer_id)
  2059. # Three outcomes from the revert attempt, each routed differently:
  2060. # "reverted": row flipped from printing -> pending, run recovery
  2061. # "already_moved_on": item.status != 'printing' (completed/cancelled by
  2062. # on_print_complete or user). Skip recovery entirely
  2063. # — the print clearly landed somewhere even if the
  2064. # watchdog didn't see the active-state transition.
  2065. # "revert_failed": SQLite contention exhausted retries. Still run
  2066. # recovery so the MQTT session gets a fresh client_id
  2067. # on the half-broken-session path.
  2068. async def _do_revert(db):
  2069. item = await db.get(PrintQueueItem, queue_item_id)
  2070. if not item or item.status != "printing":
  2071. return "already_moved_on"
  2072. item.status = "pending"
  2073. item.started_at = None
  2074. await db.commit()
  2075. return "reverted"
  2076. try:
  2077. revert_outcome = await run_with_retry(_do_revert, label=f"watchdog revert item={queue_item_id}")
  2078. except Exception as e:
  2079. logger.warning(
  2080. "Queue item %s: failed to revert to 'pending' (printer %d): %s — "
  2081. "scheduler may keep treating this item as in-flight",
  2082. queue_item_id,
  2083. printer_id,
  2084. e,
  2085. )
  2086. revert_outcome = "revert_failed"
  2087. if revert_outcome == "already_moved_on":
  2088. # Preserves the pre-#1370 early-return: if on_print_complete (or any
  2089. # other path) already moved the item past 'printing', don't run the
  2090. # MQTT session-recovery logic below — a forced reconnect on a healthy
  2091. # session breaks ongoing prints on the same printer.
  2092. return
  2093. if revert_outcome == "reverted":
  2094. logger.warning(
  2095. "Queue item %s: printer %d did not respond to print command within "
  2096. "%.0fs (state still %s, subtask_id still %s) — reverted to 'pending' "
  2097. "for retry (#967)",
  2098. queue_item_id,
  2099. printer_id,
  2100. timeout,
  2101. pre_state,
  2102. pre_subtask_id,
  2103. )
  2104. # Same #1150 / #887/#936 discriminator as background_dispatch: if the
  2105. # printer's gcode_file changed since pre-dispatch, the project_file
  2106. # command landed and the printer is parsing — a forced reconnect
  2107. # mid-parse triggers 0500_4003. If gcode_file is unchanged, the
  2108. # publish was silently swallowed (#887/#936) and the original
  2109. # force_reconnect recovery is what we want.
  2110. client = printer_manager.get_client(printer_id)
  2111. current_gcode_file = getattr(last_status, "gcode_file", None) if last_status else None
  2112. publish_landed = current_gcode_file is not None and current_gcode_file != pre_gcode_file
  2113. if publish_landed:
  2114. logger.warning(
  2115. "Queue item %s: gcode_file changed to %r (was %r) — printer "
  2116. "received the command and is parsing slowly. Skipping forced "
  2117. "MQTT reconnect to avoid 0500_4003 mid-parse (#1150).",
  2118. queue_item_id,
  2119. current_gcode_file,
  2120. pre_gcode_file,
  2121. )
  2122. elif client and hasattr(client, "force_reconnect_stale_session"):
  2123. client.force_reconnect_stale_session(
  2124. f"queue print command unacknowledged after {timeout:.0f}s "
  2125. f"(state still {pre_state}, gcode_file {current_gcode_file!r})"
  2126. )
  2127. # Global scheduler instance
  2128. scheduler = PrintScheduler()