kprofiles.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. """API routes for K-profile (pressure advance) management."""
  2. import logging
  3. from fastapi import APIRouter, Depends, HTTPException
  4. from sqlalchemy.ext.asyncio import AsyncSession
  5. from sqlalchemy import select
  6. from backend.app.core.database import get_db
  7. from backend.app.models.printer import Printer
  8. from backend.app.schemas.kprofile import (
  9. KProfile,
  10. KProfileCreate,
  11. KProfileDelete,
  12. KProfilesResponse,
  13. )
  14. from backend.app.services.printer_manager import printer_manager
  15. logger = logging.getLogger(__name__)
  16. router = APIRouter(prefix="/printers/{printer_id}/kprofiles", tags=["kprofiles"])
  17. @router.get("/", response_model=KProfilesResponse)
  18. async def get_kprofiles(
  19. printer_id: int,
  20. nozzle_diameter: str = "0.4",
  21. db: AsyncSession = Depends(get_db),
  22. ):
  23. """Get K-profiles from a printer.
  24. Args:
  25. printer_id: ID of the printer
  26. nozzle_diameter: Filter by nozzle diameter (default: "0.4")
  27. """
  28. # Check printer exists
  29. result = await db.execute(select(Printer).where(Printer.id == printer_id))
  30. printer = result.scalar_one_or_none()
  31. if not printer:
  32. raise HTTPException(404, "Printer not found")
  33. # Get MQTT client for printer
  34. client = printer_manager.get_client(printer_id)
  35. if not client or not client.state.connected:
  36. raise HTTPException(400, "Printer not connected")
  37. # Request K-profiles from printer
  38. profiles = await client.get_kprofiles(nozzle_diameter=nozzle_diameter)
  39. # Convert from MQTT dataclass to Pydantic schema
  40. return KProfilesResponse(
  41. profiles=[
  42. KProfile(
  43. slot_id=p.slot_id,
  44. extruder_id=p.extruder_id,
  45. nozzle_id=p.nozzle_id,
  46. nozzle_diameter=p.nozzle_diameter,
  47. filament_id=p.filament_id,
  48. name=p.name,
  49. k_value=p.k_value,
  50. n_coef=p.n_coef,
  51. ams_id=p.ams_id,
  52. tray_id=p.tray_id,
  53. setting_id=p.setting_id,
  54. )
  55. for p in profiles
  56. ],
  57. nozzle_diameter=nozzle_diameter,
  58. )
  59. @router.post("/", response_model=dict)
  60. async def set_kprofile(
  61. printer_id: int,
  62. profile: KProfileCreate,
  63. db: AsyncSession = Depends(get_db),
  64. ):
  65. """Create or update a K-profile on the printer.
  66. Args:
  67. printer_id: ID of the printer
  68. profile: K-profile data to set
  69. """
  70. # Check printer exists
  71. result = await db.execute(select(Printer).where(Printer.id == printer_id))
  72. printer = result.scalar_one_or_none()
  73. if not printer:
  74. raise HTTPException(404, "Printer not found")
  75. # Get MQTT client for printer
  76. client = printer_manager.get_client(printer_id)
  77. if not client or not client.state.connected:
  78. raise HTTPException(400, "Printer not connected")
  79. # Send the K-profile to printer
  80. success = client.set_kprofile(
  81. filament_id=profile.filament_id,
  82. name=profile.name,
  83. k_value=profile.k_value,
  84. nozzle_diameter=profile.nozzle_diameter,
  85. nozzle_id=profile.nozzle_id,
  86. extruder_id=profile.extruder_id,
  87. setting_id=profile.setting_id,
  88. slot_id=profile.slot_id,
  89. )
  90. if not success:
  91. raise HTTPException(500, "Failed to send K-profile command")
  92. return {"success": True, "message": "K-profile set successfully"}
  93. @router.delete("/", response_model=dict)
  94. async def delete_kprofile(
  95. printer_id: int,
  96. profile: KProfileDelete,
  97. db: AsyncSession = Depends(get_db),
  98. ):
  99. """Delete a K-profile from the printer.
  100. Args:
  101. printer_id: ID of the printer
  102. profile: K-profile identification data for deletion
  103. """
  104. # Check printer exists
  105. result = await db.execute(select(Printer).where(Printer.id == printer_id))
  106. printer = result.scalar_one_or_none()
  107. if not printer:
  108. raise HTTPException(404, "Printer not found")
  109. # Get MQTT client for printer
  110. client = printer_manager.get_client(printer_id)
  111. if not client or not client.state.connected:
  112. raise HTTPException(400, "Printer not connected")
  113. # Send the delete command to printer
  114. success = client.delete_kprofile(
  115. cali_idx=profile.slot_id,
  116. filament_id=profile.filament_id,
  117. nozzle_id=profile.nozzle_id,
  118. nozzle_diameter=profile.nozzle_diameter,
  119. extruder_id=profile.extruder_id,
  120. )
  121. if not success:
  122. raise HTTPException(500, "Failed to send K-profile delete command")
  123. return {"success": True, "message": "K-profile deleted successfully"}