print_scheduler.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. """Print scheduler service - processes the print queue."""
  2. import asyncio
  3. import logging
  4. from datetime import datetime
  5. from sqlalchemy import select
  6. from sqlalchemy.ext.asyncio import AsyncSession
  7. from backend.app.core.config import settings
  8. from backend.app.core.database import async_session
  9. from backend.app.models.archive import PrintArchive
  10. from backend.app.models.library import LibraryFile
  11. from backend.app.models.print_queue import PrintQueueItem
  12. from backend.app.models.printer import Printer
  13. from backend.app.models.smart_plug import SmartPlug
  14. from backend.app.services.bambu_ftp import delete_file_async, get_ftp_retry_settings, upload_file_async, with_ftp_retry
  15. from backend.app.services.printer_manager import printer_manager
  16. from backend.app.services.tasmota import tasmota_service
  17. logger = logging.getLogger(__name__)
  18. class PrintScheduler:
  19. """Background scheduler that processes the print queue."""
  20. def __init__(self):
  21. self._running = False
  22. self._check_interval = 30 # seconds
  23. self._power_on_wait_time = 180 # seconds to wait for printer after power on (3 min)
  24. self._power_on_check_interval = 10 # seconds between connection checks
  25. async def run(self):
  26. """Main loop - check queue every interval."""
  27. self._running = True
  28. logger.info("Print scheduler started")
  29. while self._running:
  30. try:
  31. await self.check_queue()
  32. except Exception as e:
  33. logger.error(f"Scheduler error: {e}")
  34. await asyncio.sleep(self._check_interval)
  35. def stop(self):
  36. """Stop the scheduler."""
  37. self._running = False
  38. logger.info("Print scheduler stopped")
  39. async def check_queue(self):
  40. """Check for prints ready to start."""
  41. async with async_session() as db:
  42. # Get all pending items, ordered by printer and position
  43. result = await db.execute(
  44. select(PrintQueueItem)
  45. .where(PrintQueueItem.status == "pending")
  46. .order_by(PrintQueueItem.printer_id, PrintQueueItem.position)
  47. )
  48. items = list(result.scalars().all())
  49. if not items:
  50. return
  51. # Track busy printers to avoid assigning multiple items to same printer
  52. busy_printers: set[int] = set()
  53. for item in items:
  54. # Check scheduled time first (scheduled_time is stored in UTC from ISO string)
  55. if item.scheduled_time and item.scheduled_time > datetime.utcnow():
  56. continue
  57. # Skip items that require manual start
  58. if item.manual_start:
  59. continue
  60. if item.printer_id:
  61. # Specific printer assignment (existing behavior)
  62. if item.printer_id in busy_printers:
  63. continue
  64. # Check if printer is idle
  65. printer_idle = self._is_printer_idle(item.printer_id)
  66. printer_connected = printer_manager.is_connected(item.printer_id)
  67. # If printer not connected, try to power on via smart plug
  68. if not printer_connected:
  69. plug = await self._get_smart_plug(db, item.printer_id)
  70. if plug and plug.auto_on and plug.enabled:
  71. logger.info(f"Printer {item.printer_id} offline, attempting to power on via smart plug")
  72. powered_on = await self._power_on_and_wait(plug, item.printer_id, db)
  73. if powered_on:
  74. printer_connected = True
  75. printer_idle = self._is_printer_idle(item.printer_id)
  76. else:
  77. logger.warning(f"Could not power on printer {item.printer_id} via smart plug")
  78. busy_printers.add(item.printer_id)
  79. continue
  80. else:
  81. # No plug or auto_on disabled
  82. busy_printers.add(item.printer_id)
  83. continue
  84. # Check if printer is idle (busy with another print)
  85. if not printer_idle:
  86. busy_printers.add(item.printer_id)
  87. continue
  88. # Check condition (previous print success)
  89. if item.require_previous_success:
  90. if not await self._check_previous_success(db, item):
  91. item.status = "skipped"
  92. item.error_message = "Previous print failed or was aborted"
  93. item.completed_at = datetime.now()
  94. await db.commit()
  95. logger.info(f"Skipped queue item {item.id} - previous print failed")
  96. continue
  97. # Start the print
  98. await self._start_print(db, item)
  99. busy_printers.add(item.printer_id)
  100. elif item.target_model:
  101. # Model-based assignment - find any idle printer of matching model
  102. printer_id = await self._find_idle_printer_for_model(db, item.target_model, busy_printers)
  103. if printer_id:
  104. # Check condition (previous print success) before assigning
  105. if item.require_previous_success:
  106. if not await self._check_previous_success(db, item):
  107. item.status = "skipped"
  108. item.error_message = "Previous print failed or was aborted"
  109. item.completed_at = datetime.now()
  110. await db.commit()
  111. logger.info(f"Skipped queue item {item.id} - previous print failed")
  112. continue
  113. # Assign printer and start
  114. item.printer_id = printer_id
  115. logger.info(f"Model-based assignment: queue item {item.id} assigned to printer {printer_id}")
  116. await self._start_print(db, item)
  117. busy_printers.add(printer_id)
  118. async def _find_idle_printer_for_model(self, db: AsyncSession, model: str, exclude_ids: set[int]) -> int | None:
  119. """Find an idle, connected printer matching the model.
  120. Args:
  121. db: Database session
  122. model: Printer model to match (e.g., "X1C", "P1S")
  123. exclude_ids: Printer IDs to exclude (already busy)
  124. Returns:
  125. Printer ID if found, None otherwise
  126. """
  127. result = await db.execute(
  128. select(Printer).where(Printer.model == model).where(Printer.is_active == True) # noqa: E712
  129. )
  130. for printer in result.scalars().all():
  131. if printer.id not in exclude_ids:
  132. if self._is_printer_idle(printer.id) and printer_manager.is_connected(printer.id):
  133. return printer.id
  134. return None
  135. def _is_printer_idle(self, printer_id: int) -> bool:
  136. """Check if a printer is connected and idle."""
  137. if not printer_manager.is_connected(printer_id):
  138. return False
  139. state = printer_manager.get_status(printer_id)
  140. if not state:
  141. return False
  142. # Printer is idle if state is IDLE, FINISH, FAILED, or unknown
  143. # FAILED means previous print failed, printer is ready for new print
  144. return state.state in ("IDLE", "FINISH", "FAILED", "unknown")
  145. async def _get_smart_plug(self, db: AsyncSession, printer_id: int) -> SmartPlug | None:
  146. """Get the smart plug associated with a printer."""
  147. result = await db.execute(select(SmartPlug).where(SmartPlug.printer_id == printer_id))
  148. return result.scalar_one_or_none()
  149. async def _power_on_and_wait(self, plug: SmartPlug, printer_id: int, db: AsyncSession) -> bool:
  150. """Turn on smart plug and wait for printer to connect.
  151. Returns True if printer connected successfully within timeout.
  152. """
  153. # Check current plug state
  154. status = await tasmota_service.get_status(plug)
  155. if not status.get("reachable"):
  156. logger.warning(f"Smart plug '{plug.name}' is not reachable")
  157. return False
  158. # Turn on if not already on
  159. if status.get("state") != "ON":
  160. success = await tasmota_service.turn_on(plug)
  161. if not success:
  162. logger.warning(f"Failed to turn on smart plug '{plug.name}'")
  163. return False
  164. logger.info(f"Powered on smart plug '{plug.name}' for printer {printer_id}")
  165. # Get printer from database for connection
  166. result = await db.execute(select(Printer).where(Printer.id == printer_id))
  167. printer = result.scalar_one_or_none()
  168. if not printer:
  169. logger.error(f"Printer {printer_id} not found in database")
  170. return False
  171. # Wait for printer to boot (give it some time before trying to connect)
  172. logger.info(f"Waiting 30s for printer {printer_id} to boot...")
  173. await asyncio.sleep(30)
  174. # Try to connect to the printer periodically
  175. elapsed = 30 # Already waited 30s
  176. while elapsed < self._power_on_wait_time:
  177. # Try to connect
  178. logger.info(f"Attempting to connect to printer {printer_id}...")
  179. try:
  180. connected = await printer_manager.connect_printer(printer)
  181. if connected:
  182. logger.info(f"Printer {printer_id} connected after {elapsed}s")
  183. # Give it a moment to stabilize and get status
  184. await asyncio.sleep(5)
  185. return True
  186. except Exception as e:
  187. logger.debug(f"Connection attempt failed: {e}")
  188. await asyncio.sleep(self._power_on_check_interval)
  189. elapsed += self._power_on_check_interval
  190. logger.debug(f"Waiting for printer {printer_id} to connect... ({elapsed}s)")
  191. logger.warning(f"Printer {printer_id} did not connect within {self._power_on_wait_time}s after power on")
  192. return False
  193. async def _check_previous_success(self, db: AsyncSession, item: PrintQueueItem) -> bool:
  194. """Check if the previous print on this printer succeeded."""
  195. # Find the most recent completed queue item for this printer
  196. result = await db.execute(
  197. select(PrintQueueItem)
  198. .where(PrintQueueItem.printer_id == item.printer_id)
  199. .where(PrintQueueItem.id != item.id)
  200. .where(PrintQueueItem.status.in_(["completed", "failed", "skipped", "aborted"]))
  201. .order_by(PrintQueueItem.completed_at.desc())
  202. .limit(1)
  203. )
  204. prev_item = result.scalar_one_or_none()
  205. # If no previous item, assume success (first in queue)
  206. if not prev_item:
  207. return True
  208. return prev_item.status == "completed"
  209. async def _power_off_if_needed(self, db: AsyncSession, item: PrintQueueItem):
  210. """Power off printer if auto_off_after is enabled (waits for cooldown)."""
  211. if not item.auto_off_after:
  212. return
  213. plug = await self._get_smart_plug(db, item.printer_id)
  214. if plug and plug.enabled:
  215. logger.info(f"Auto-off: Waiting for printer {item.printer_id} to cool down before power off...")
  216. # Wait for cooldown (up to 10 minutes)
  217. await printer_manager.wait_for_cooldown(item.printer_id, target_temp=50.0, timeout=600)
  218. logger.info(f"Auto-off: Powering off printer {item.printer_id}")
  219. await tasmota_service.turn_off(plug)
  220. async def _start_print(self, db: AsyncSession, item: PrintQueueItem):
  221. """Upload file and start print for a queue item.
  222. Supports two sources:
  223. - archive_id: Print from an existing archive
  224. - library_file_id: Print from a library file (file manager)
  225. """
  226. logger.info(f"Starting queue item {item.id}")
  227. # Get printer first (needed for both paths)
  228. result = await db.execute(select(Printer).where(Printer.id == item.printer_id))
  229. printer = result.scalar_one_or_none()
  230. if not printer:
  231. item.status = "failed"
  232. item.error_message = "Printer not found"
  233. item.completed_at = datetime.utcnow()
  234. await db.commit()
  235. logger.error(f"Queue item {item.id}: Printer {item.printer_id} not found")
  236. await self._power_off_if_needed(db, item)
  237. return
  238. # Check printer is connected
  239. if not printer_manager.is_connected(item.printer_id):
  240. item.status = "failed"
  241. item.error_message = "Printer not connected"
  242. item.completed_at = datetime.utcnow()
  243. await db.commit()
  244. logger.error(f"Queue item {item.id}: Printer {item.printer_id} not connected")
  245. await self._power_off_if_needed(db, item)
  246. return
  247. # Determine source: archive or library file
  248. archive = None
  249. library_file = None
  250. file_path = None
  251. filename = None
  252. if item.archive_id:
  253. # Print from archive
  254. result = await db.execute(select(PrintArchive).where(PrintArchive.id == item.archive_id))
  255. archive = result.scalar_one_or_none()
  256. if not archive:
  257. item.status = "failed"
  258. item.error_message = "Archive not found"
  259. item.completed_at = datetime.utcnow()
  260. await db.commit()
  261. logger.error(f"Queue item {item.id}: Archive {item.archive_id} not found")
  262. await self._power_off_if_needed(db, item)
  263. return
  264. file_path = settings.base_dir / archive.file_path
  265. filename = archive.filename
  266. elif item.library_file_id:
  267. # Print from library file (file manager)
  268. result = await db.execute(select(LibraryFile).where(LibraryFile.id == item.library_file_id))
  269. library_file = result.scalar_one_or_none()
  270. if not library_file:
  271. item.status = "failed"
  272. item.error_message = "Library file not found"
  273. item.completed_at = datetime.utcnow()
  274. await db.commit()
  275. logger.error(f"Queue item {item.id}: Library file {item.library_file_id} not found")
  276. await self._power_off_if_needed(db, item)
  277. return
  278. # Library files store absolute paths
  279. from pathlib import Path
  280. lib_path = Path(library_file.file_path)
  281. file_path = lib_path if lib_path.is_absolute() else settings.base_dir / library_file.file_path
  282. filename = library_file.filename
  283. else:
  284. # Neither archive nor library file specified
  285. item.status = "failed"
  286. item.error_message = "No source file specified"
  287. item.completed_at = datetime.utcnow()
  288. await db.commit()
  289. logger.error(f"Queue item {item.id}: No archive_id or library_file_id specified")
  290. await self._power_off_if_needed(db, item)
  291. return
  292. # Check file exists on disk
  293. if not file_path.exists():
  294. item.status = "failed"
  295. item.error_message = "Source file not found on disk"
  296. item.completed_at = datetime.utcnow()
  297. await db.commit()
  298. logger.error(f"Queue item {item.id}: File not found: {file_path}")
  299. await self._power_off_if_needed(db, item)
  300. return
  301. # Upload file to printer via FTP
  302. # Use a clean filename to avoid issues with double extensions like .gcode.3mf
  303. base_name = filename
  304. if base_name.endswith(".gcode.3mf"):
  305. base_name = base_name[:-10] # Remove .gcode.3mf
  306. elif base_name.endswith(".3mf"):
  307. base_name = base_name[:-4] # Remove .3mf
  308. remote_filename = f"{base_name}.3mf"
  309. # Upload to root directory (not /cache/) - the start_print command references
  310. # files by name only (ftp://{filename}), so they must be in the root
  311. remote_path = f"/{remote_filename}"
  312. # Get FTP retry settings
  313. ftp_retry_enabled, ftp_retry_count, ftp_retry_delay, ftp_timeout = await get_ftp_retry_settings()
  314. # Delete existing file if present (avoids 553 error on overwrite)
  315. try:
  316. await delete_file_async(
  317. printer.ip_address,
  318. printer.access_code,
  319. remote_path,
  320. socket_timeout=ftp_timeout,
  321. printer_model=printer.model,
  322. )
  323. except Exception:
  324. pass # File may not exist, that's fine
  325. try:
  326. if ftp_retry_enabled:
  327. uploaded = await with_ftp_retry(
  328. upload_file_async,
  329. printer.ip_address,
  330. printer.access_code,
  331. file_path,
  332. remote_path,
  333. socket_timeout=ftp_timeout,
  334. printer_model=printer.model,
  335. max_retries=ftp_retry_count,
  336. retry_delay=ftp_retry_delay,
  337. operation_name=f"Upload print to {printer.name}",
  338. )
  339. else:
  340. uploaded = await upload_file_async(
  341. printer.ip_address,
  342. printer.access_code,
  343. file_path,
  344. remote_path,
  345. socket_timeout=ftp_timeout,
  346. printer_model=printer.model,
  347. )
  348. except Exception as e:
  349. uploaded = False
  350. logger.error(f"Queue item {item.id}: FTP error: {e}")
  351. if not uploaded:
  352. item.status = "failed"
  353. item.error_message = "Failed to upload file to printer"
  354. item.completed_at = datetime.utcnow()
  355. await db.commit()
  356. logger.error(f"Queue item {item.id}: FTP upload failed")
  357. await self._power_off_if_needed(db, item)
  358. return
  359. # Register as expected print so we don't create a duplicate archive
  360. # Only applicable for archive-based prints
  361. if archive:
  362. from backend.app.main import register_expected_print
  363. register_expected_print(item.printer_id, remote_filename, archive.id)
  364. # Parse AMS mapping if stored
  365. ams_mapping = None
  366. if item.ams_mapping:
  367. try:
  368. import json
  369. ams_mapping = json.loads(item.ams_mapping)
  370. except json.JSONDecodeError:
  371. logger.warning(f"Queue item {item.id}: Invalid AMS mapping JSON, ignoring")
  372. # Start the print with AMS mapping, plate_id and print options
  373. started = printer_manager.start_print(
  374. item.printer_id,
  375. remote_filename,
  376. plate_id=item.plate_id or 1,
  377. ams_mapping=ams_mapping,
  378. bed_levelling=item.bed_levelling,
  379. flow_cali=item.flow_cali,
  380. vibration_cali=item.vibration_cali,
  381. layer_inspect=item.layer_inspect,
  382. timelapse=item.timelapse,
  383. use_ams=item.use_ams,
  384. )
  385. if started:
  386. item.status = "printing"
  387. item.started_at = datetime.utcnow()
  388. await db.commit()
  389. logger.info(f"Queue item {item.id}: Print started - {filename}")
  390. # MQTT relay - publish queue job started
  391. try:
  392. from backend.app.services.mqtt_relay import mqtt_relay
  393. await mqtt_relay.on_queue_job_started(
  394. job_id=item.id,
  395. filename=filename,
  396. printer_id=printer.id,
  397. printer_name=printer.name,
  398. printer_serial=printer.serial_number,
  399. )
  400. except Exception:
  401. pass # Don't fail if MQTT fails
  402. else:
  403. item.status = "failed"
  404. item.error_message = "Failed to send print command"
  405. item.completed_at = datetime.utcnow()
  406. await db.commit()
  407. logger.error(f"Queue item {item.id}: Failed to start print")
  408. await self._power_off_if_needed(db, item)
  409. # Global scheduler instance
  410. scheduler = PrintScheduler()