print_scheduler.py 43 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002
  1. """Print scheduler service - processes the print queue."""
  2. import asyncio
  3. import json
  4. import logging
  5. import xml.etree.ElementTree as ET
  6. import zipfile
  7. from datetime import datetime
  8. from pathlib import Path
  9. from sqlalchemy import func, select
  10. from sqlalchemy.ext.asyncio import AsyncSession
  11. from backend.app.core.config import settings
  12. from backend.app.core.database import async_session
  13. from backend.app.models.archive import PrintArchive
  14. from backend.app.models.library import LibraryFile
  15. from backend.app.models.print_queue import PrintQueueItem
  16. from backend.app.models.printer import Printer
  17. from backend.app.models.smart_plug import SmartPlug
  18. from backend.app.services.bambu_ftp import delete_file_async, get_ftp_retry_settings, upload_file_async, with_ftp_retry
  19. from backend.app.services.notification_service import notification_service
  20. from backend.app.services.printer_manager import printer_manager
  21. from backend.app.services.smart_plug_manager import smart_plug_manager
  22. from backend.app.utils.printer_models import normalize_printer_model
  23. logger = logging.getLogger(__name__)
  24. class PrintScheduler:
  25. """Background scheduler that processes the print queue."""
  26. def __init__(self):
  27. self._running = False
  28. self._check_interval = 30 # seconds
  29. self._power_on_wait_time = 180 # seconds to wait for printer after power on (3 min)
  30. self._power_on_check_interval = 10 # seconds between connection checks
  31. async def run(self):
  32. """Main loop - check queue every interval."""
  33. self._running = True
  34. logger.info("Print scheduler started")
  35. while self._running:
  36. try:
  37. await self.check_queue()
  38. except Exception as e:
  39. logger.error(f"Scheduler error: {e}")
  40. await asyncio.sleep(self._check_interval)
  41. def stop(self):
  42. """Stop the scheduler."""
  43. self._running = False
  44. logger.info("Print scheduler stopped")
  45. async def check_queue(self):
  46. """Check for prints ready to start."""
  47. async with async_session() as db:
  48. # Get all pending items, ordered by printer and position
  49. result = await db.execute(
  50. select(PrintQueueItem)
  51. .where(PrintQueueItem.status == "pending")
  52. .order_by(PrintQueueItem.printer_id, PrintQueueItem.position)
  53. )
  54. items = list(result.scalars().all())
  55. if not items:
  56. return
  57. # Track busy printers to avoid assigning multiple items to same printer
  58. busy_printers: set[int] = set()
  59. for item in items:
  60. # Check scheduled time first (scheduled_time is stored in UTC from ISO string)
  61. if item.scheduled_time and item.scheduled_time > datetime.utcnow():
  62. continue
  63. # Skip items that require manual start
  64. if item.manual_start:
  65. continue
  66. if item.printer_id:
  67. # Specific printer assignment (existing behavior)
  68. if item.printer_id in busy_printers:
  69. continue
  70. # Check if printer is idle
  71. printer_idle = self._is_printer_idle(item.printer_id)
  72. printer_connected = printer_manager.is_connected(item.printer_id)
  73. # If printer not connected, try to power on via smart plug
  74. if not printer_connected:
  75. plug = await self._get_smart_plug(db, item.printer_id)
  76. if plug and plug.auto_on and plug.enabled:
  77. logger.info(f"Printer {item.printer_id} offline, attempting to power on via smart plug")
  78. powered_on = await self._power_on_and_wait(plug, item.printer_id, db)
  79. if powered_on:
  80. printer_connected = True
  81. printer_idle = self._is_printer_idle(item.printer_id)
  82. else:
  83. logger.warning(f"Could not power on printer {item.printer_id} via smart plug")
  84. busy_printers.add(item.printer_id)
  85. continue
  86. else:
  87. # No plug or auto_on disabled
  88. busy_printers.add(item.printer_id)
  89. continue
  90. # Check if printer is idle (busy with another print)
  91. if not printer_idle:
  92. busy_printers.add(item.printer_id)
  93. continue
  94. # Check condition (previous print success)
  95. if item.require_previous_success:
  96. if not await self._check_previous_success(db, item):
  97. item.status = "skipped"
  98. item.error_message = "Previous print failed or was aborted"
  99. item.completed_at = datetime.now()
  100. await db.commit()
  101. logger.info(f"Skipped queue item {item.id} - previous print failed")
  102. # Send notification
  103. job_name = await self._get_job_name(db, item)
  104. printer = await self._get_printer(db, item.printer_id)
  105. await notification_service.on_queue_job_skipped(
  106. job_name=job_name,
  107. printer_id=item.printer_id,
  108. printer_name=printer.name if printer else "Unknown",
  109. reason="Previous print failed or was aborted",
  110. db=db,
  111. )
  112. continue
  113. # Start the print
  114. await self._start_print(db, item)
  115. busy_printers.add(item.printer_id)
  116. elif item.target_model:
  117. # Model-based assignment - find any idle printer of matching model
  118. # Parse required filament types if present
  119. required_types = None
  120. if item.required_filament_types:
  121. try:
  122. required_types = json.loads(item.required_filament_types)
  123. except json.JSONDecodeError:
  124. pass
  125. printer_id, waiting_reason = await self._find_idle_printer_for_model(
  126. db, item.target_model, busy_printers, required_types
  127. )
  128. # Update waiting_reason if changed and send notification when first waiting
  129. if item.waiting_reason != waiting_reason:
  130. was_waiting = item.waiting_reason is not None
  131. item.waiting_reason = waiting_reason
  132. await db.commit()
  133. # Send waiting notification only when transitioning to waiting state
  134. if waiting_reason and not was_waiting:
  135. job_name = await self._get_job_name(db, item)
  136. await notification_service.on_queue_job_waiting(
  137. job_name=job_name,
  138. target_model=item.target_model,
  139. waiting_reason=waiting_reason,
  140. db=db,
  141. )
  142. if printer_id:
  143. # Check condition (previous print success) before assigning
  144. if item.require_previous_success:
  145. if not await self._check_previous_success(db, item):
  146. item.status = "skipped"
  147. item.error_message = "Previous print failed or was aborted"
  148. item.completed_at = datetime.now()
  149. await db.commit()
  150. logger.info(f"Skipped queue item {item.id} - previous print failed")
  151. # Send notification
  152. job_name = await self._get_job_name(db, item)
  153. printer = await self._get_printer(db, printer_id)
  154. await notification_service.on_queue_job_skipped(
  155. job_name=job_name,
  156. printer_id=printer_id,
  157. printer_name=printer.name if printer else "Unknown",
  158. reason="Previous print failed or was aborted",
  159. db=db,
  160. )
  161. continue
  162. # Assign printer and start - clear waiting reason
  163. item.printer_id = printer_id
  164. item.waiting_reason = None
  165. logger.info(f"Model-based assignment: queue item {item.id} assigned to printer {printer_id}")
  166. # Send assignment notification
  167. job_name = await self._get_job_name(db, item)
  168. printer = await self._get_printer(db, printer_id)
  169. await notification_service.on_queue_job_assigned(
  170. job_name=job_name,
  171. printer_id=printer_id,
  172. printer_name=printer.name if printer else "Unknown",
  173. target_model=item.target_model,
  174. db=db,
  175. )
  176. # Compute AMS mapping for the assigned printer if not already set
  177. # This is critical for model-based jobs where mapping wasn't computed upfront
  178. if not item.ams_mapping:
  179. computed_mapping = await self._compute_ams_mapping_for_printer(db, printer_id, item)
  180. if computed_mapping:
  181. item.ams_mapping = json.dumps(computed_mapping)
  182. logger.info(
  183. f"Queue item {item.id}: Computed AMS mapping for printer {printer_id}: {computed_mapping}"
  184. )
  185. await db.commit()
  186. await self._start_print(db, item)
  187. busy_printers.add(printer_id)
  188. async def _find_idle_printer_for_model(
  189. self,
  190. db: AsyncSession,
  191. model: str,
  192. exclude_ids: set[int],
  193. required_filament_types: list[str] | None = None,
  194. ) -> tuple[int | None, str | None]:
  195. """Find an idle, connected printer matching the model with compatible filaments.
  196. Args:
  197. db: Database session
  198. model: Printer model to match (e.g., "X1C", "P1S")
  199. exclude_ids: Printer IDs to exclude (already busy)
  200. required_filament_types: Optional list of filament types needed (e.g., ["PLA", "PETG"])
  201. If provided, only printers with all required types loaded will match.
  202. Returns:
  203. Tuple of (printer_id, waiting_reason):
  204. - (printer_id, None) if a matching printer was found
  205. - (None, reason) if no printer is available, with explanation
  206. """
  207. # Normalize model name and use case-insensitive matching
  208. normalized_model = normalize_printer_model(model) or model
  209. result = await db.execute(
  210. select(Printer)
  211. .where(func.lower(Printer.model) == normalized_model.lower())
  212. .where(Printer.is_active == True) # noqa: E712
  213. )
  214. printers = list(result.scalars().all())
  215. if not printers:
  216. return None, f"No active {normalized_model} printers configured"
  217. # Track reasons for skipping printers
  218. printers_busy = []
  219. printers_offline = []
  220. printers_missing_filament = []
  221. for printer in printers:
  222. if printer.id in exclude_ids:
  223. printers_busy.append(printer.name)
  224. continue
  225. is_connected = printer_manager.is_connected(printer.id)
  226. is_idle = self._is_printer_idle(printer.id) if is_connected else False
  227. if not is_connected:
  228. printers_offline.append(printer.name)
  229. continue
  230. if not is_idle:
  231. printers_busy.append(printer.name)
  232. continue
  233. # Validate filament compatibility if required types are specified
  234. if required_filament_types:
  235. missing = self._get_missing_filament_types(printer.id, required_filament_types)
  236. if missing:
  237. printers_missing_filament.append((printer.name, missing))
  238. logger.debug(f"Skipping printer {printer.id} ({printer.name}) - missing filaments: {missing}")
  239. continue
  240. # Found a matching printer - clear waiting reason
  241. return printer.id, None
  242. # Build waiting reason from what we found
  243. reasons = []
  244. if printers_missing_filament:
  245. # Filament mismatch is most actionable - show first
  246. names_and_missing = [f"{name} (needs {', '.join(missing)})" for name, missing in printers_missing_filament]
  247. reasons.append(f"Waiting for filament: {'; '.join(names_and_missing)}")
  248. if printers_busy:
  249. reasons.append(f"Busy: {', '.join(printers_busy)}")
  250. if printers_offline:
  251. reasons.append(f"Offline: {', '.join(printers_offline)}")
  252. return None, " | ".join(reasons) if reasons else f"No available {model} printers"
  253. def _get_missing_filament_types(self, printer_id: int, required_types: list[str]) -> list[str]:
  254. """Get the list of required filament types that are not loaded on the printer.
  255. Args:
  256. printer_id: The printer ID
  257. required_types: List of filament types needed (e.g., ["PLA", "PETG"])
  258. Returns:
  259. List of missing filament types (empty if all are loaded)
  260. """
  261. status = printer_manager.get_status(printer_id)
  262. if not status:
  263. return required_types # Can't determine, assume all missing
  264. # Collect all filament types loaded on this printer (AMS units + external spool)
  265. loaded_types: set[str] = set()
  266. # Check AMS units (stored in raw_data["ams"])
  267. ams_data = status.raw_data.get("ams", [])
  268. if ams_data:
  269. for ams_unit in ams_data:
  270. for tray in ams_unit.get("tray", []):
  271. tray_type = tray.get("tray_type")
  272. if tray_type:
  273. loaded_types.add(tray_type.upper())
  274. # Check external spool (virtual tray, stored in raw_data["vt_tray"])
  275. vt_tray = status.raw_data.get("vt_tray")
  276. if vt_tray:
  277. vt_type = vt_tray.get("tray_type")
  278. if vt_type:
  279. loaded_types.add(vt_type.upper())
  280. # Find which required types are missing (case-insensitive comparison)
  281. missing = []
  282. for req_type in required_types:
  283. if req_type.upper() not in loaded_types:
  284. missing.append(req_type)
  285. return missing
  286. async def _compute_ams_mapping_for_printer(
  287. self, db: AsyncSession, printer_id: int, item: PrintQueueItem
  288. ) -> list[int] | None:
  289. """Compute AMS mapping for a printer based on filament requirements.
  290. This is called for model-based queue items after a printer is assigned,
  291. to compute the correct AMS slot mapping for that specific printer's hardware.
  292. Args:
  293. db: Database session
  294. printer_id: The assigned printer ID
  295. item: The queue item (contains archive_id or library_file_id)
  296. Returns:
  297. AMS mapping array or None if no mapping needed/possible
  298. """
  299. # Get printer status
  300. status = printer_manager.get_status(printer_id)
  301. if not status:
  302. logger.warning(f"Cannot compute AMS mapping: printer {printer_id} status unavailable")
  303. return None
  304. # Get filament requirements from source file
  305. filament_reqs = await self._get_filament_requirements(db, item)
  306. if not filament_reqs:
  307. logger.debug(f"No filament requirements found for queue item {item.id}")
  308. return None
  309. # Build loaded filaments from printer status
  310. loaded_filaments = self._build_loaded_filaments(status)
  311. if not loaded_filaments:
  312. logger.debug(f"No filaments loaded on printer {printer_id}")
  313. return None
  314. # Compute mapping: match required filaments to available slots
  315. return self._match_filaments_to_slots(filament_reqs, loaded_filaments)
  316. async def _get_filament_requirements(self, db: AsyncSession, item: PrintQueueItem) -> list[dict] | None:
  317. """Extract filament requirements from the source 3MF file.
  318. Args:
  319. db: Database session
  320. item: Queue item with archive_id or library_file_id
  321. Returns:
  322. List of filament requirement dicts with slot_id, type, color, used_grams
  323. """
  324. file_path: Path | None = None
  325. if item.archive_id:
  326. result = await db.execute(select(PrintArchive).where(PrintArchive.id == item.archive_id))
  327. archive = result.scalar_one_or_none()
  328. if archive:
  329. file_path = settings.base_dir / archive.file_path
  330. elif item.library_file_id:
  331. result = await db.execute(select(LibraryFile).where(LibraryFile.id == item.library_file_id))
  332. library_file = result.scalar_one_or_none()
  333. if library_file:
  334. lib_path = Path(library_file.file_path)
  335. file_path = lib_path if lib_path.is_absolute() else settings.base_dir / library_file.file_path
  336. if not file_path or not file_path.exists():
  337. return None
  338. filaments = []
  339. try:
  340. with zipfile.ZipFile(file_path, "r") as zf:
  341. if "Metadata/slice_info.config" not in zf.namelist():
  342. return None
  343. content = zf.read("Metadata/slice_info.config").decode()
  344. root = ET.fromstring(content)
  345. # Check if plate_id is specified - use that plate's filaments
  346. plate_id = item.plate_id
  347. if plate_id:
  348. for plate_elem in root.findall("./plate"):
  349. plate_index = None
  350. for meta in plate_elem.findall("metadata"):
  351. if meta.get("key") == "index":
  352. plate_index = int(meta.get("value", "0"))
  353. break
  354. if plate_index == plate_id:
  355. for filament_elem in plate_elem.findall("./filament"):
  356. filament_id = filament_elem.get("id")
  357. filament_type = filament_elem.get("type", "")
  358. filament_color = filament_elem.get("color", "")
  359. used_g = filament_elem.get("used_g", "0")
  360. try:
  361. used_grams = float(used_g)
  362. if used_grams > 0 and filament_id:
  363. filaments.append(
  364. {
  365. "slot_id": int(filament_id),
  366. "type": filament_type,
  367. "color": filament_color,
  368. "used_grams": round(used_grams, 1),
  369. }
  370. )
  371. except (ValueError, TypeError):
  372. pass
  373. break
  374. else:
  375. # No plate_id - extract all filaments with used_g > 0
  376. for filament_elem in root.findall("./filament"):
  377. filament_id = filament_elem.get("id")
  378. filament_type = filament_elem.get("type", "")
  379. filament_color = filament_elem.get("color", "")
  380. used_g = filament_elem.get("used_g", "0")
  381. try:
  382. used_grams = float(used_g)
  383. if used_grams > 0 and filament_id:
  384. filaments.append(
  385. {
  386. "slot_id": int(filament_id),
  387. "type": filament_type,
  388. "color": filament_color,
  389. "used_grams": round(used_grams, 1),
  390. }
  391. )
  392. except (ValueError, TypeError):
  393. pass
  394. filaments.sort(key=lambda x: x["slot_id"])
  395. except Exception as e:
  396. logger.warning(f"Failed to parse filament requirements: {e}")
  397. return None
  398. return filaments if filaments else None
  399. def _build_loaded_filaments(self, status) -> list[dict]:
  400. """Build list of loaded filaments from printer status.
  401. Args:
  402. status: PrinterState from printer_manager
  403. Returns:
  404. List of loaded filament dicts with type, color, ams_id, tray_id, global_tray_id
  405. """
  406. filaments = []
  407. # Parse AMS units from raw_data
  408. ams_data = status.raw_data.get("ams", [])
  409. for ams_unit in ams_data:
  410. ams_id = ams_unit.get("id", 0)
  411. trays = ams_unit.get("tray", [])
  412. is_ht = len(trays) == 1 # AMS-HT has single tray
  413. for tray in trays:
  414. tray_type = tray.get("tray_type")
  415. if tray_type:
  416. tray_id = tray.get("id", 0)
  417. tray_color = tray.get("tray_color", "")
  418. # Normalize color: remove alpha, add hash
  419. color = self._normalize_color(tray_color)
  420. # Calculate global tray ID
  421. global_tray_id = ams_id * 4 + tray_id
  422. filaments.append(
  423. {
  424. "type": tray_type,
  425. "color": color,
  426. "ams_id": ams_id,
  427. "tray_id": tray_id,
  428. "is_ht": is_ht,
  429. "is_external": False,
  430. "global_tray_id": global_tray_id,
  431. }
  432. )
  433. # Check external spool (vt_tray)
  434. vt_tray = status.raw_data.get("vt_tray")
  435. if vt_tray and vt_tray.get("tray_type"):
  436. color = self._normalize_color(vt_tray.get("tray_color", ""))
  437. filaments.append(
  438. {
  439. "type": vt_tray["tray_type"],
  440. "color": color,
  441. "ams_id": -1,
  442. "tray_id": 0,
  443. "is_ht": False,
  444. "is_external": True,
  445. "global_tray_id": 254,
  446. }
  447. )
  448. return filaments
  449. def _normalize_color(self, color: str | None) -> str:
  450. """Normalize color to #RRGGBB format."""
  451. if not color:
  452. return "#808080"
  453. hex_color = color.replace("#", "")[:6]
  454. return f"#{hex_color}"
  455. def _normalize_color_for_compare(self, color: str | None) -> str:
  456. """Normalize color for comparison (lowercase, no hash)."""
  457. if not color:
  458. return ""
  459. return color.replace("#", "").lower()[:6]
  460. def _colors_are_similar(self, color1: str | None, color2: str | None, threshold: int = 40) -> bool:
  461. """Check if two colors are visually similar within a threshold."""
  462. hex1 = self._normalize_color_for_compare(color1)
  463. hex2 = self._normalize_color_for_compare(color2)
  464. if not hex1 or not hex2 or len(hex1) < 6 or len(hex2) < 6:
  465. return False
  466. try:
  467. r1 = int(hex1[0:2], 16)
  468. g1 = int(hex1[2:4], 16)
  469. b1 = int(hex1[4:6], 16)
  470. r2 = int(hex2[0:2], 16)
  471. g2 = int(hex2[2:4], 16)
  472. b2 = int(hex2[4:6], 16)
  473. return abs(r1 - r2) <= threshold and abs(g1 - g2) <= threshold and abs(b1 - b2) <= threshold
  474. except ValueError:
  475. return False
  476. def _match_filaments_to_slots(self, required: list[dict], loaded: list[dict]) -> list[int] | None:
  477. """Match required filaments to loaded filaments and build AMS mapping.
  478. Priority: exact color match > similar color match > type-only match
  479. Args:
  480. required: List of required filaments with slot_id, type, color
  481. loaded: List of loaded filaments with type, color, global_tray_id
  482. Returns:
  483. AMS mapping array (position = slot_id - 1, value = global_tray_id or -1)
  484. """
  485. if not required:
  486. return None
  487. # Track used trays to avoid duplicate assignment
  488. used_tray_ids: set[int] = set()
  489. comparisons = []
  490. for req in required:
  491. req_type = (req.get("type") or "").upper()
  492. req_color = req.get("color", "")
  493. # Find best match: exact color > similar color > type-only
  494. exact_match = None
  495. similar_match = None
  496. type_only_match = None
  497. for f in loaded:
  498. if f["global_tray_id"] in used_tray_ids:
  499. continue
  500. f_type = (f.get("type") or "").upper()
  501. if f_type != req_type:
  502. continue
  503. # Type matches - check color
  504. f_color = f.get("color", "")
  505. if self._normalize_color_for_compare(f_color) == self._normalize_color_for_compare(req_color):
  506. exact_match = f
  507. break # Best possible match
  508. elif self._colors_are_similar(f_color, req_color):
  509. if not similar_match:
  510. similar_match = f
  511. elif not type_only_match:
  512. type_only_match = f
  513. match = exact_match or similar_match or type_only_match
  514. if match:
  515. used_tray_ids.add(match["global_tray_id"])
  516. comparisons.append({"slot_id": req.get("slot_id", 0), "global_tray_id": match["global_tray_id"]})
  517. else:
  518. comparisons.append({"slot_id": req.get("slot_id", 0), "global_tray_id": -1})
  519. # Build mapping array
  520. if not comparisons:
  521. return None
  522. max_slot_id = max(c["slot_id"] for c in comparisons)
  523. if max_slot_id <= 0:
  524. return None
  525. mapping = [-1] * max_slot_id
  526. for c in comparisons:
  527. slot_id = c["slot_id"]
  528. if slot_id and slot_id > 0:
  529. mapping[slot_id - 1] = c["global_tray_id"]
  530. return mapping
  531. def _is_printer_idle(self, printer_id: int) -> bool:
  532. """Check if a printer is connected and idle."""
  533. if not printer_manager.is_connected(printer_id):
  534. return False
  535. state = printer_manager.get_status(printer_id)
  536. if not state:
  537. return False
  538. # Printer is idle if state is IDLE, FINISH, FAILED, or unknown
  539. # FAILED means previous print failed, printer is ready for new print
  540. return state.state in ("IDLE", "FINISH", "FAILED", "unknown")
  541. async def _get_smart_plug(self, db: AsyncSession, printer_id: int) -> SmartPlug | None:
  542. """Get the smart plug associated with a printer."""
  543. result = await db.execute(select(SmartPlug).where(SmartPlug.printer_id == printer_id))
  544. return result.scalar_one_or_none()
  545. async def _power_on_and_wait(self, plug: SmartPlug, printer_id: int, db: AsyncSession) -> bool:
  546. """Turn on smart plug and wait for printer to connect.
  547. Returns True if printer connected successfully within timeout.
  548. """
  549. # Get the appropriate service for the plug type (Tasmota or Home Assistant)
  550. service = await smart_plug_manager.get_service_for_plug(plug, db)
  551. # Check current plug state
  552. status = await service.get_status(plug)
  553. if not status.get("reachable"):
  554. logger.warning(f"Smart plug '{plug.name}' is not reachable")
  555. return False
  556. # Turn on if not already on
  557. if status.get("state") != "ON":
  558. success = await service.turn_on(plug)
  559. if not success:
  560. logger.warning(f"Failed to turn on smart plug '{plug.name}'")
  561. return False
  562. logger.info(f"Powered on smart plug '{plug.name}' for printer {printer_id}")
  563. # Get printer from database for connection
  564. result = await db.execute(select(Printer).where(Printer.id == printer_id))
  565. printer = result.scalar_one_or_none()
  566. if not printer:
  567. logger.error(f"Printer {printer_id} not found in database")
  568. return False
  569. # Wait for printer to boot (give it some time before trying to connect)
  570. logger.info(f"Waiting 30s for printer {printer_id} to boot...")
  571. await asyncio.sleep(30)
  572. # Try to connect to the printer periodically
  573. elapsed = 30 # Already waited 30s
  574. while elapsed < self._power_on_wait_time:
  575. # Try to connect
  576. logger.info(f"Attempting to connect to printer {printer_id}...")
  577. try:
  578. connected = await printer_manager.connect_printer(printer)
  579. if connected:
  580. logger.info(f"Printer {printer_id} connected after {elapsed}s")
  581. # Give it a moment to stabilize and get status
  582. await asyncio.sleep(5)
  583. return True
  584. except Exception as e:
  585. logger.debug(f"Connection attempt failed: {e}")
  586. await asyncio.sleep(self._power_on_check_interval)
  587. elapsed += self._power_on_check_interval
  588. logger.debug(f"Waiting for printer {printer_id} to connect... ({elapsed}s)")
  589. logger.warning(f"Printer {printer_id} did not connect within {self._power_on_wait_time}s after power on")
  590. return False
  591. async def _check_previous_success(self, db: AsyncSession, item: PrintQueueItem) -> bool:
  592. """Check if the previous print on this printer succeeded."""
  593. # Find the most recent completed queue item for this printer
  594. result = await db.execute(
  595. select(PrintQueueItem)
  596. .where(PrintQueueItem.printer_id == item.printer_id)
  597. .where(PrintQueueItem.id != item.id)
  598. .where(PrintQueueItem.status.in_(["completed", "failed", "skipped", "aborted"]))
  599. .order_by(PrintQueueItem.completed_at.desc())
  600. .limit(1)
  601. )
  602. prev_item = result.scalar_one_or_none()
  603. # If no previous item, assume success (first in queue)
  604. if not prev_item:
  605. return True
  606. return prev_item.status == "completed"
  607. async def _power_off_if_needed(self, db: AsyncSession, item: PrintQueueItem):
  608. """Power off printer if auto_off_after is enabled (waits for cooldown)."""
  609. if not item.auto_off_after:
  610. return
  611. plug = await self._get_smart_plug(db, item.printer_id)
  612. if plug and plug.enabled:
  613. logger.info(f"Auto-off: Waiting for printer {item.printer_id} to cool down before power off...")
  614. # Wait for cooldown (up to 10 minutes)
  615. await printer_manager.wait_for_cooldown(item.printer_id, target_temp=50.0, timeout=600)
  616. logger.info(f"Auto-off: Powering off printer {item.printer_id}")
  617. service = await smart_plug_manager.get_service_for_plug(plug, db)
  618. await service.turn_off(plug)
  619. async def _get_job_name(self, db: AsyncSession, item: PrintQueueItem) -> str:
  620. """Get a human-readable name for a queue item."""
  621. if item.archive_id:
  622. result = await db.execute(select(PrintArchive).where(PrintArchive.id == item.archive_id))
  623. archive = result.scalar_one_or_none()
  624. if archive:
  625. return archive.filename.replace(".gcode.3mf", "").replace(".3mf", "")
  626. if item.library_file_id:
  627. result = await db.execute(select(LibraryFile).where(LibraryFile.id == item.library_file_id))
  628. library_file = result.scalar_one_or_none()
  629. if library_file:
  630. return library_file.filename.replace(".gcode.3mf", "").replace(".3mf", "")
  631. return f"Job #{item.id}"
  632. async def _get_printer(self, db: AsyncSession, printer_id: int) -> Printer | None:
  633. """Get printer by ID."""
  634. result = await db.execute(select(Printer).where(Printer.id == printer_id))
  635. return result.scalar_one_or_none()
  636. async def _start_print(self, db: AsyncSession, item: PrintQueueItem):
  637. """Upload file and start print for a queue item.
  638. Supports two sources:
  639. - archive_id: Print from an existing archive
  640. - library_file_id: Print from a library file (file manager)
  641. """
  642. logger.info(f"Starting queue item {item.id}")
  643. # Get printer first (needed for both paths)
  644. result = await db.execute(select(Printer).where(Printer.id == item.printer_id))
  645. printer = result.scalar_one_or_none()
  646. if not printer:
  647. item.status = "failed"
  648. item.error_message = "Printer not found"
  649. item.completed_at = datetime.utcnow()
  650. await db.commit()
  651. logger.error(f"Queue item {item.id}: Printer {item.printer_id} not found")
  652. await self._power_off_if_needed(db, item)
  653. return
  654. # Check printer is connected
  655. if not printer_manager.is_connected(item.printer_id):
  656. item.status = "failed"
  657. item.error_message = "Printer not connected"
  658. item.completed_at = datetime.utcnow()
  659. await db.commit()
  660. logger.error(f"Queue item {item.id}: Printer {item.printer_id} not connected")
  661. await self._power_off_if_needed(db, item)
  662. return
  663. # Determine source: archive or library file
  664. archive = None
  665. library_file = None
  666. file_path = None
  667. filename = None
  668. if item.archive_id:
  669. # Print from archive
  670. result = await db.execute(select(PrintArchive).where(PrintArchive.id == item.archive_id))
  671. archive = result.scalar_one_or_none()
  672. if not archive:
  673. item.status = "failed"
  674. item.error_message = "Archive not found"
  675. item.completed_at = datetime.utcnow()
  676. await db.commit()
  677. logger.error(f"Queue item {item.id}: Archive {item.archive_id} not found")
  678. await self._power_off_if_needed(db, item)
  679. return
  680. file_path = settings.base_dir / archive.file_path
  681. filename = archive.filename
  682. elif item.library_file_id:
  683. # Print from library file (file manager)
  684. result = await db.execute(select(LibraryFile).where(LibraryFile.id == item.library_file_id))
  685. library_file = result.scalar_one_or_none()
  686. if not library_file:
  687. item.status = "failed"
  688. item.error_message = "Library file not found"
  689. item.completed_at = datetime.utcnow()
  690. await db.commit()
  691. logger.error(f"Queue item {item.id}: Library file {item.library_file_id} not found")
  692. await self._power_off_if_needed(db, item)
  693. return
  694. # Library files store absolute paths
  695. from pathlib import Path
  696. lib_path = Path(library_file.file_path)
  697. file_path = lib_path if lib_path.is_absolute() else settings.base_dir / library_file.file_path
  698. filename = library_file.filename
  699. else:
  700. # Neither archive nor library file specified
  701. item.status = "failed"
  702. item.error_message = "No source file specified"
  703. item.completed_at = datetime.utcnow()
  704. await db.commit()
  705. logger.error(f"Queue item {item.id}: No archive_id or library_file_id specified")
  706. await self._power_off_if_needed(db, item)
  707. return
  708. # Check file exists on disk
  709. if not file_path.exists():
  710. item.status = "failed"
  711. item.error_message = "Source file not found on disk"
  712. item.completed_at = datetime.utcnow()
  713. await db.commit()
  714. logger.error(f"Queue item {item.id}: File not found: {file_path}")
  715. await self._power_off_if_needed(db, item)
  716. return
  717. # Upload file to printer via FTP
  718. # Use a clean filename to avoid issues with double extensions like .gcode.3mf
  719. base_name = filename
  720. if base_name.endswith(".gcode.3mf"):
  721. base_name = base_name[:-10] # Remove .gcode.3mf
  722. elif base_name.endswith(".3mf"):
  723. base_name = base_name[:-4] # Remove .3mf
  724. remote_filename = f"{base_name}.3mf"
  725. # Upload to root directory (not /cache/) - the start_print command references
  726. # files by name only (ftp://{filename}), so they must be in the root
  727. remote_path = f"/{remote_filename}"
  728. # Get FTP retry settings
  729. ftp_retry_enabled, ftp_retry_count, ftp_retry_delay, ftp_timeout = await get_ftp_retry_settings()
  730. # Delete existing file if present (avoids 553 error on overwrite)
  731. try:
  732. await delete_file_async(
  733. printer.ip_address,
  734. printer.access_code,
  735. remote_path,
  736. socket_timeout=ftp_timeout,
  737. printer_model=printer.model,
  738. )
  739. except Exception:
  740. pass # File may not exist, that's fine
  741. try:
  742. if ftp_retry_enabled:
  743. uploaded = await with_ftp_retry(
  744. upload_file_async,
  745. printer.ip_address,
  746. printer.access_code,
  747. file_path,
  748. remote_path,
  749. socket_timeout=ftp_timeout,
  750. printer_model=printer.model,
  751. max_retries=ftp_retry_count,
  752. retry_delay=ftp_retry_delay,
  753. operation_name=f"Upload print to {printer.name}",
  754. )
  755. else:
  756. uploaded = await upload_file_async(
  757. printer.ip_address,
  758. printer.access_code,
  759. file_path,
  760. remote_path,
  761. socket_timeout=ftp_timeout,
  762. printer_model=printer.model,
  763. )
  764. except Exception as e:
  765. uploaded = False
  766. logger.error(f"Queue item {item.id}: FTP error: {e}")
  767. if not uploaded:
  768. item.status = "failed"
  769. item.error_message = "Failed to upload file to printer"
  770. item.completed_at = datetime.utcnow()
  771. await db.commit()
  772. logger.error(f"Queue item {item.id}: FTP upload failed")
  773. # Send failure notification
  774. await notification_service.on_queue_job_failed(
  775. job_name=filename.replace(".gcode.3mf", "").replace(".3mf", ""),
  776. printer_id=printer.id,
  777. printer_name=printer.name,
  778. reason="Failed to upload file to printer",
  779. db=db,
  780. )
  781. await self._power_off_if_needed(db, item)
  782. return
  783. # Register as expected print so we don't create a duplicate archive
  784. # Only applicable for archive-based prints
  785. if archive:
  786. from backend.app.main import register_expected_print
  787. register_expected_print(item.printer_id, remote_filename, archive.id)
  788. # Parse AMS mapping if stored
  789. ams_mapping = None
  790. if item.ams_mapping:
  791. try:
  792. ams_mapping = json.loads(item.ams_mapping)
  793. except json.JSONDecodeError:
  794. logger.warning(f"Queue item {item.id}: Invalid AMS mapping JSON, ignoring")
  795. # Start the print with AMS mapping, plate_id and print options
  796. started = printer_manager.start_print(
  797. item.printer_id,
  798. remote_filename,
  799. plate_id=item.plate_id or 1,
  800. ams_mapping=ams_mapping,
  801. bed_levelling=item.bed_levelling,
  802. flow_cali=item.flow_cali,
  803. vibration_cali=item.vibration_cali,
  804. layer_inspect=item.layer_inspect,
  805. timelapse=item.timelapse,
  806. use_ams=item.use_ams,
  807. )
  808. if started:
  809. item.status = "printing"
  810. item.started_at = datetime.utcnow()
  811. await db.commit()
  812. logger.info(f"Queue item {item.id}: Print started - {filename}")
  813. # Get estimated time for notification
  814. estimated_time = None
  815. if archive and archive.print_time_seconds:
  816. estimated_time = archive.print_time_seconds
  817. elif library_file and library_file.print_time_seconds:
  818. estimated_time = library_file.print_time_seconds
  819. # Send job started notification
  820. await notification_service.on_queue_job_started(
  821. job_name=filename.replace(".gcode.3mf", "").replace(".3mf", ""),
  822. printer_id=printer.id,
  823. printer_name=printer.name,
  824. db=db,
  825. estimated_time=estimated_time,
  826. )
  827. # MQTT relay - publish queue job started
  828. try:
  829. from backend.app.services.mqtt_relay import mqtt_relay
  830. await mqtt_relay.on_queue_job_started(
  831. job_id=item.id,
  832. filename=filename,
  833. printer_id=printer.id,
  834. printer_name=printer.name,
  835. printer_serial=printer.serial_number,
  836. )
  837. except Exception:
  838. pass # Don't fail if MQTT fails
  839. else:
  840. item.status = "failed"
  841. item.error_message = "Failed to send print command"
  842. item.completed_at = datetime.utcnow()
  843. await db.commit()
  844. logger.error(f"Queue item {item.id}: Failed to start print")
  845. # Send failure notification
  846. await notification_service.on_queue_job_failed(
  847. job_name=filename.replace(".gcode.3mf", "").replace(".3mf", ""),
  848. printer_id=printer.id,
  849. printer_name=printer.name,
  850. reason="Failed to send print command",
  851. db=db,
  852. )
  853. await self._power_off_if_needed(db, item)
  854. # Global scheduler instance
  855. scheduler = PrintScheduler()