print_queue.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. """API routes for print queue management."""
  2. import json
  3. import logging
  4. from datetime import datetime
  5. from fastapi import APIRouter, Depends, HTTPException, Query
  6. from sqlalchemy import func, select
  7. from sqlalchemy.ext.asyncio import AsyncSession
  8. from sqlalchemy.orm import selectinload
  9. from backend.app.core.database import get_db
  10. from backend.app.models.archive import PrintArchive
  11. from backend.app.models.print_queue import PrintQueueItem
  12. from backend.app.models.printer import Printer
  13. from backend.app.schemas.print_queue import (
  14. PrintQueueItemCreate,
  15. PrintQueueItemResponse,
  16. PrintQueueItemUpdate,
  17. PrintQueueReorder,
  18. )
  19. logger = logging.getLogger(__name__)
  20. router = APIRouter(prefix="/queue", tags=["queue"])
  21. def _enrich_response(item: PrintQueueItem) -> PrintQueueItemResponse:
  22. """Add nested archive/printer info to response."""
  23. # Parse ams_mapping from JSON string BEFORE model_validate
  24. ams_mapping_parsed = None
  25. if item.ams_mapping:
  26. try:
  27. ams_mapping_parsed = json.loads(item.ams_mapping)
  28. except json.JSONDecodeError:
  29. ams_mapping_parsed = None
  30. # Create response with parsed ams_mapping
  31. item_dict = {
  32. "id": item.id,
  33. "printer_id": item.printer_id,
  34. "archive_id": item.archive_id,
  35. "position": item.position,
  36. "scheduled_time": item.scheduled_time,
  37. "require_previous_success": item.require_previous_success,
  38. "auto_off_after": item.auto_off_after,
  39. "manual_start": item.manual_start,
  40. "ams_mapping": ams_mapping_parsed,
  41. "status": item.status,
  42. "started_at": item.started_at,
  43. "completed_at": item.completed_at,
  44. "error_message": item.error_message,
  45. "created_at": item.created_at,
  46. }
  47. response = PrintQueueItemResponse(**item_dict)
  48. if item.archive:
  49. response.archive_name = item.archive.print_name or item.archive.filename
  50. response.archive_thumbnail = item.archive.thumbnail_path
  51. response.print_time_seconds = item.archive.print_time_seconds
  52. if item.printer:
  53. response.printer_name = item.printer.name
  54. return response
  55. @router.get("/", response_model=list[PrintQueueItemResponse])
  56. async def list_queue(
  57. printer_id: int | None = Query(None, description="Filter by printer"),
  58. status: str | None = Query(None, description="Filter by status"),
  59. db: AsyncSession = Depends(get_db),
  60. ):
  61. """List all queue items, optionally filtered by printer or status."""
  62. query = (
  63. select(PrintQueueItem)
  64. .options(selectinload(PrintQueueItem.archive), selectinload(PrintQueueItem.printer))
  65. .order_by(PrintQueueItem.printer_id, PrintQueueItem.position)
  66. )
  67. if printer_id is not None:
  68. query = query.where(PrintQueueItem.printer_id == printer_id)
  69. if status:
  70. query = query.where(PrintQueueItem.status == status)
  71. result = await db.execute(query)
  72. items = result.scalars().all()
  73. return [_enrich_response(item) for item in items]
  74. @router.post("/", response_model=PrintQueueItemResponse)
  75. async def add_to_queue(
  76. data: PrintQueueItemCreate,
  77. db: AsyncSession = Depends(get_db),
  78. ):
  79. """Add an item to the print queue."""
  80. # Validate printer exists
  81. result = await db.execute(select(Printer).where(Printer.id == data.printer_id))
  82. if not result.scalar_one_or_none():
  83. raise HTTPException(400, "Printer not found")
  84. # Validate archive exists
  85. result = await db.execute(select(PrintArchive).where(PrintArchive.id == data.archive_id))
  86. if not result.scalar_one_or_none():
  87. raise HTTPException(400, "Archive not found")
  88. # Get next position for this printer
  89. result = await db.execute(
  90. select(func.max(PrintQueueItem.position))
  91. .where(PrintQueueItem.printer_id == data.printer_id)
  92. .where(PrintQueueItem.status == "pending")
  93. )
  94. max_pos = result.scalar() or 0
  95. item = PrintQueueItem(
  96. printer_id=data.printer_id,
  97. archive_id=data.archive_id,
  98. scheduled_time=data.scheduled_time,
  99. require_previous_success=data.require_previous_success,
  100. auto_off_after=data.auto_off_after,
  101. manual_start=data.manual_start,
  102. ams_mapping=json.dumps(data.ams_mapping) if data.ams_mapping else None,
  103. position=max_pos + 1,
  104. status="pending",
  105. )
  106. db.add(item)
  107. await db.commit()
  108. await db.refresh(item)
  109. # Load relationships for response
  110. await db.refresh(item, ["archive", "printer"])
  111. logger.info(f"Added archive {data.archive_id} to queue for printer {data.printer_id}")
  112. # MQTT relay - publish queue job added
  113. try:
  114. from backend.app.services.mqtt_relay import mqtt_relay
  115. await mqtt_relay.on_queue_job_added(
  116. job_id=item.id,
  117. filename=item.archive.filename if item.archive else "",
  118. printer_id=item.printer_id,
  119. printer_name=item.printer.name if item.printer else None,
  120. )
  121. except Exception:
  122. pass # Don't fail queue add if MQTT fails
  123. return _enrich_response(item)
  124. @router.get("/{item_id}", response_model=PrintQueueItemResponse)
  125. async def get_queue_item(item_id: int, db: AsyncSession = Depends(get_db)):
  126. """Get a specific queue item."""
  127. result = await db.execute(
  128. select(PrintQueueItem)
  129. .options(selectinload(PrintQueueItem.archive), selectinload(PrintQueueItem.printer))
  130. .where(PrintQueueItem.id == item_id)
  131. )
  132. item = result.scalar_one_or_none()
  133. if not item:
  134. raise HTTPException(404, "Queue item not found")
  135. return _enrich_response(item)
  136. @router.patch("/{item_id}", response_model=PrintQueueItemResponse)
  137. async def update_queue_item(
  138. item_id: int,
  139. data: PrintQueueItemUpdate,
  140. db: AsyncSession = Depends(get_db),
  141. ):
  142. """Update a queue item."""
  143. result = await db.execute(select(PrintQueueItem).where(PrintQueueItem.id == item_id))
  144. item = result.scalar_one_or_none()
  145. if not item:
  146. raise HTTPException(404, "Queue item not found")
  147. if item.status != "pending":
  148. raise HTTPException(400, "Can only update pending items")
  149. update_data = data.model_dump(exclude_unset=True)
  150. # Validate new printer_id if being changed
  151. if "printer_id" in update_data:
  152. result = await db.execute(select(Printer).where(Printer.id == update_data["printer_id"]))
  153. if not result.scalar_one_or_none():
  154. raise HTTPException(400, "Printer not found")
  155. # Serialize ams_mapping to JSON for TEXT column storage
  156. if "ams_mapping" in update_data:
  157. update_data["ams_mapping"] = json.dumps(update_data["ams_mapping"]) if update_data["ams_mapping"] else None
  158. for field, value in update_data.items():
  159. setattr(item, field, value)
  160. await db.commit()
  161. await db.refresh(item, ["archive", "printer"])
  162. logger.info(f"Updated queue item {item_id}")
  163. return _enrich_response(item)
  164. @router.delete("/{item_id}")
  165. async def delete_queue_item(item_id: int, db: AsyncSession = Depends(get_db)):
  166. """Remove an item from the queue."""
  167. result = await db.execute(select(PrintQueueItem).where(PrintQueueItem.id == item_id))
  168. item = result.scalar_one_or_none()
  169. if not item:
  170. raise HTTPException(404, "Queue item not found")
  171. if item.status == "printing":
  172. raise HTTPException(400, "Cannot delete item that is currently printing")
  173. await db.delete(item)
  174. await db.commit()
  175. logger.info(f"Deleted queue item {item_id}")
  176. return {"message": "Queue item deleted"}
  177. @router.post("/reorder")
  178. async def reorder_queue(
  179. data: PrintQueueReorder,
  180. db: AsyncSession = Depends(get_db),
  181. ):
  182. """Bulk update positions for queue items."""
  183. for reorder_item in data.items:
  184. result = await db.execute(select(PrintQueueItem).where(PrintQueueItem.id == reorder_item.id))
  185. item = result.scalar_one_or_none()
  186. if item and item.status == "pending":
  187. item.position = reorder_item.position
  188. await db.commit()
  189. logger.info(f"Reordered {len(data.items)} queue items")
  190. return {"message": f"Reordered {len(data.items)} items"}
  191. @router.post("/{item_id}/cancel")
  192. async def cancel_queue_item(item_id: int, db: AsyncSession = Depends(get_db)):
  193. """Cancel a pending queue item."""
  194. result = await db.execute(select(PrintQueueItem).where(PrintQueueItem.id == item_id))
  195. item = result.scalar_one_or_none()
  196. if not item:
  197. raise HTTPException(404, "Queue item not found")
  198. if item.status not in ("pending",):
  199. raise HTTPException(400, f"Cannot cancel item with status '{item.status}'")
  200. item.status = "cancelled"
  201. item.completed_at = datetime.now()
  202. await db.commit()
  203. logger.info(f"Cancelled queue item {item_id}")
  204. return {"message": "Queue item cancelled"}
  205. @router.post("/{item_id}/stop")
  206. async def stop_queue_item(
  207. item_id: int,
  208. db: AsyncSession = Depends(get_db),
  209. ):
  210. """Stop an actively printing queue item."""
  211. import asyncio
  212. from backend.app.models.smart_plug import SmartPlug
  213. from backend.app.services.printer_manager import printer_manager
  214. from backend.app.services.tasmota import tasmota_service
  215. result = await db.execute(select(PrintQueueItem).where(PrintQueueItem.id == item_id))
  216. item = result.scalar_one_or_none()
  217. if not item:
  218. raise HTTPException(404, "Queue item not found")
  219. if item.status != "printing":
  220. raise HTTPException(400, f"Can only stop items that are printing, current status: '{item.status}'")
  221. # Capture values we need for background task
  222. printer_id = item.printer_id
  223. auto_off_after = item.auto_off_after
  224. # Try to send stop command to printer
  225. stop_sent = False
  226. try:
  227. stop_sent = printer_manager.stop_print(printer_id)
  228. if not stop_sent:
  229. logger.warning(f"stop_print returned False for printer {printer_id} - printer may not be connected")
  230. except Exception as e:
  231. logger.error(f"Error sending stop command for queue item {item_id}: {e}")
  232. # Update queue item status regardless - if printer is off, print is already stopped
  233. item.status = "cancelled"
  234. item.completed_at = datetime.now()
  235. item.error_message = "Stopped by user" if stop_sent else "Stopped by user (printer was offline)"
  236. await db.commit()
  237. # Get smart plug info if auto-off is enabled
  238. plug_ip = None
  239. if auto_off_after:
  240. result = await db.execute(select(SmartPlug).where(SmartPlug.printer_id == printer_id))
  241. plug = result.scalar_one_or_none()
  242. if plug and plug.enabled:
  243. plug_ip = plug.ip_address
  244. logger.info(f"Stopped printing queue item {item_id} (stop command sent: {stop_sent})")
  245. # Schedule background task for cooldown + power off
  246. if plug_ip:
  247. async def cooldown_and_poweroff():
  248. logger.info(f"Auto-off: Waiting for printer {printer_id} to cool down before power off...")
  249. await printer_manager.wait_for_cooldown(printer_id, target_temp=50.0, timeout=600)
  250. # Re-fetch plug since we're in a new async context
  251. from backend.app.core.database import async_session
  252. async with async_session() as new_db:
  253. result = await new_db.execute(select(SmartPlug).where(SmartPlug.printer_id == printer_id))
  254. plug = result.scalar_one_or_none()
  255. if plug and plug.enabled:
  256. logger.info(f"Auto-off: Powering off printer {printer_id}")
  257. await tasmota_service.turn_off(plug)
  258. asyncio.create_task(cooldown_and_poweroff())
  259. return {"message": "Print stopped" if stop_sent else "Queue item cancelled (printer was offline)"}
  260. @router.post("/{item_id}/start")
  261. async def start_queue_item(
  262. item_id: int,
  263. db: AsyncSession = Depends(get_db),
  264. ):
  265. """Manually start a staged (manual_start) queue item.
  266. This clears the manual_start flag so the scheduler will pick it up,
  267. or starts immediately if the printer is ready.
  268. """
  269. result = await db.execute(
  270. select(PrintQueueItem)
  271. .options(selectinload(PrintQueueItem.archive), selectinload(PrintQueueItem.printer))
  272. .where(PrintQueueItem.id == item_id)
  273. )
  274. item = result.scalar_one_or_none()
  275. if not item:
  276. raise HTTPException(404, "Queue item not found")
  277. if item.status != "pending":
  278. raise HTTPException(400, f"Can only start pending items, current status: '{item.status}'")
  279. # Clear manual_start flag so scheduler picks it up
  280. item.manual_start = False
  281. await db.commit()
  282. await db.refresh(item, ["archive", "printer"])
  283. logger.info(f"Manually started queue item {item_id} (cleared manual_start flag)")
  284. return _enrich_response(item)