bambu_mqtt.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. import json
  2. import ssl
  3. import asyncio
  4. from collections import deque
  5. from datetime import datetime
  6. from typing import Callable
  7. from dataclasses import dataclass, field
  8. import paho.mqtt.client as mqtt
  9. @dataclass
  10. class MQTTLogEntry:
  11. """Log entry for MQTT message debugging."""
  12. timestamp: str
  13. topic: str
  14. direction: str # "in" or "out"
  15. payload: dict
  16. @dataclass
  17. class HMSError:
  18. """Health Management System error from printer."""
  19. code: str
  20. module: int
  21. severity: int # 1=fatal, 2=serious, 3=common, 4=info
  22. message: str = ""
  23. @dataclass
  24. class PrinterState:
  25. connected: bool = False
  26. state: str = "unknown"
  27. current_print: str | None = None
  28. subtask_name: str | None = None
  29. progress: float = 0.0
  30. remaining_time: int = 0
  31. layer_num: int = 0
  32. total_layers: int = 0
  33. temperatures: dict = field(default_factory=dict)
  34. raw_data: dict = field(default_factory=dict)
  35. gcode_file: str | None = None
  36. subtask_id: str | None = None
  37. hms_errors: list = field(default_factory=list) # List of HMSError
  38. class BambuMQTTClient:
  39. """MQTT client for Bambu Lab printer communication."""
  40. MQTT_PORT = 8883
  41. def __init__(
  42. self,
  43. ip_address: str,
  44. serial_number: str,
  45. access_code: str,
  46. on_state_change: Callable[[PrinterState], None] | None = None,
  47. on_print_start: Callable[[dict], None] | None = None,
  48. on_print_complete: Callable[[dict], None] | None = None,
  49. ):
  50. self.ip_address = ip_address
  51. self.serial_number = serial_number
  52. self.access_code = access_code
  53. self.on_state_change = on_state_change
  54. self.on_print_start = on_print_start
  55. self.on_print_complete = on_print_complete
  56. self.state = PrinterState()
  57. self._client: mqtt.Client | None = None
  58. self._loop: asyncio.AbstractEventLoop | None = None
  59. self._previous_gcode_state: str | None = None
  60. self._previous_gcode_file: str | None = None
  61. self._message_log: deque[MQTTLogEntry] = deque(maxlen=100)
  62. self._logging_enabled: bool = False
  63. @property
  64. def topic_subscribe(self) -> str:
  65. return f"device/{self.serial_number}/report"
  66. @property
  67. def topic_publish(self) -> str:
  68. return f"device/{self.serial_number}/request"
  69. def _on_connect(self, client, userdata, flags, rc, properties=None):
  70. if rc == 0:
  71. self.state.connected = True
  72. client.subscribe(self.topic_subscribe)
  73. # Request full status update
  74. self._request_push_all()
  75. else:
  76. self.state.connected = False
  77. def _on_disconnect(self, client, userdata, disconnect_flags=None, rc=None, properties=None):
  78. self.state.connected = False
  79. if self.on_state_change:
  80. self.on_state_change(self.state)
  81. def _on_message(self, client, userdata, msg):
  82. try:
  83. payload = json.loads(msg.payload.decode())
  84. # Log message if logging is enabled
  85. if self._logging_enabled:
  86. self._message_log.append(MQTTLogEntry(
  87. timestamp=datetime.now().isoformat(),
  88. topic=msg.topic,
  89. direction="in",
  90. payload=payload,
  91. ))
  92. self._process_message(payload)
  93. except json.JSONDecodeError:
  94. pass
  95. def _process_message(self, payload: dict):
  96. """Process incoming MQTT message from printer."""
  97. if "print" in payload:
  98. print_data = payload["print"]
  99. self._update_state(print_data)
  100. def _update_state(self, data: dict):
  101. """Update printer state from message data."""
  102. previous_state = self.state.state
  103. # Update state fields
  104. if "gcode_state" in data:
  105. self.state.state = data["gcode_state"]
  106. if "gcode_file" in data:
  107. self.state.gcode_file = data["gcode_file"]
  108. self.state.current_print = data["gcode_file"]
  109. if "subtask_name" in data:
  110. self.state.subtask_name = data["subtask_name"]
  111. # Prefer subtask_name as current_print if available
  112. if data["subtask_name"]:
  113. self.state.current_print = data["subtask_name"]
  114. if "subtask_id" in data:
  115. self.state.subtask_id = data["subtask_id"]
  116. if "mc_percent" in data:
  117. self.state.progress = float(data["mc_percent"])
  118. if "mc_remaining_time" in data:
  119. self.state.remaining_time = int(data["mc_remaining_time"])
  120. if "layer_num" in data:
  121. self.state.layer_num = int(data["layer_num"])
  122. if "total_layer_num" in data:
  123. self.state.total_layers = int(data["total_layer_num"])
  124. # Temperature data
  125. temps = {}
  126. if "bed_temper" in data:
  127. temps["bed"] = float(data["bed_temper"])
  128. if "bed_target_temper" in data:
  129. temps["bed_target"] = float(data["bed_target_temper"])
  130. if "nozzle_temper" in data:
  131. temps["nozzle"] = float(data["nozzle_temper"])
  132. if "nozzle_target_temper" in data:
  133. temps["nozzle_target"] = float(data["nozzle_target_temper"])
  134. # Second nozzle for dual-extruder printers (H2 series)
  135. if "nozzle_temper_2" in data:
  136. temps["nozzle_2"] = float(data["nozzle_temper_2"])
  137. if "nozzle_target_temper_2" in data:
  138. temps["nozzle_2_target"] = float(data["nozzle_target_temper_2"])
  139. if "chamber_temper" in data:
  140. temps["chamber"] = float(data["chamber_temper"])
  141. if temps:
  142. self.state.temperatures = temps
  143. # Parse HMS (Health Management System) errors
  144. if "hms" in data:
  145. hms_list = data["hms"]
  146. self.state.hms_errors = []
  147. if isinstance(hms_list, list):
  148. for hms in hms_list:
  149. if isinstance(hms, dict):
  150. # HMS format: {"attr": code, "code": full_code}
  151. # The code is a hex string, severity is in bits
  152. code = hms.get("code", hms.get("attr", "0"))
  153. if isinstance(code, int):
  154. code = hex(code)
  155. # Parse severity from code (typically last 4 bits indicate level)
  156. try:
  157. code_int = int(str(code).replace("0x", ""), 16) if code else 0
  158. severity = (code_int >> 16) & 0xF # Extract severity bits
  159. module = (code_int >> 24) & 0xFF # Extract module bits
  160. except (ValueError, TypeError):
  161. severity = 3
  162. module = 0
  163. self.state.hms_errors.append(HMSError(
  164. code=str(code),
  165. module=module,
  166. severity=severity if severity > 0 else 3,
  167. ))
  168. self.state.raw_data = data
  169. # Detect print start (state changes TO RUNNING with a file)
  170. current_file = self.state.gcode_file or self.state.current_print
  171. is_new_print = (
  172. self.state.state == "RUNNING"
  173. and self._previous_gcode_state != "RUNNING"
  174. and current_file
  175. )
  176. # Also detect if file changed while running (new print started)
  177. is_file_change = (
  178. self.state.state == "RUNNING"
  179. and current_file
  180. and current_file != self._previous_gcode_file
  181. and self._previous_gcode_file is not None
  182. )
  183. if (is_new_print or is_file_change) and self.on_print_start:
  184. self.on_print_start({
  185. "filename": current_file,
  186. "subtask_name": self.state.subtask_name,
  187. "raw_data": data,
  188. })
  189. # Detect print completion
  190. if (
  191. self._previous_gcode_state == "RUNNING"
  192. and self.state.state in ("FINISH", "FAILED")
  193. and self.on_print_complete
  194. ):
  195. self.on_print_complete({
  196. "status": "completed" if self.state.state == "FINISH" else "failed",
  197. "filename": self._previous_gcode_file or current_file,
  198. "raw_data": data,
  199. })
  200. self._previous_gcode_state = self.state.state
  201. if current_file:
  202. self._previous_gcode_file = current_file
  203. if self.on_state_change:
  204. self.on_state_change(self.state)
  205. def _request_push_all(self):
  206. """Request full status update from printer."""
  207. if self._client:
  208. message = {"pushing": {"command": "pushall"}}
  209. self._client.publish(self.topic_publish, json.dumps(message))
  210. def connect(self):
  211. """Connect to the printer MQTT broker."""
  212. self._client = mqtt.Client(
  213. callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
  214. client_id=f"bambutrack_{self.serial_number}",
  215. protocol=mqtt.MQTTv311,
  216. )
  217. self._client.username_pw_set("bblp", self.access_code)
  218. self._client.on_connect = self._on_connect
  219. self._client.on_disconnect = self._on_disconnect
  220. self._client.on_message = self._on_message
  221. # TLS setup - Bambu uses self-signed certs
  222. ssl_context = ssl.create_default_context()
  223. ssl_context.check_hostname = False
  224. ssl_context.verify_mode = ssl.CERT_NONE
  225. self._client.tls_set_context(ssl_context)
  226. self._client.connect_async(self.ip_address, self.MQTT_PORT)
  227. self._client.loop_start()
  228. def start_print(self, filename: str, plate_id: int = 1):
  229. """Start a print job on the printer."""
  230. if self._client and self.state.connected:
  231. # Bambu print command format
  232. command = {
  233. "print": {
  234. "command": "project_file",
  235. "param": f"Metadata/plate_{plate_id}.gcode",
  236. "subtask_name": filename,
  237. "url": f"ftp://{filename}",
  238. "bed_type": "auto",
  239. "timelapse": False,
  240. "bed_leveling": True,
  241. "flow_cali": True,
  242. "vibration_cali": True,
  243. "layer_inspect": False,
  244. "use_ams": True,
  245. }
  246. }
  247. self._client.publish(self.topic_publish, json.dumps(command))
  248. return True
  249. return False
  250. def disconnect(self):
  251. """Disconnect from the printer."""
  252. if self._client:
  253. self._client.loop_stop()
  254. self._client.disconnect()
  255. self._client = None
  256. self.state.connected = False
  257. def send_command(self, command: dict):
  258. """Send a command to the printer."""
  259. if self._client and self.state.connected:
  260. # Log outgoing message if logging is enabled
  261. if self._logging_enabled:
  262. self._message_log.append(MQTTLogEntry(
  263. timestamp=datetime.now().isoformat(),
  264. topic=self.topic_publish,
  265. direction="out",
  266. payload=command,
  267. ))
  268. self._client.publish(self.topic_publish, json.dumps(command))
  269. def enable_logging(self, enabled: bool = True):
  270. """Enable or disable MQTT message logging."""
  271. self._logging_enabled = enabled
  272. if not enabled:
  273. self._message_log.clear()
  274. def get_logs(self) -> list[MQTTLogEntry]:
  275. """Get all logged MQTT messages."""
  276. return list(self._message_log)
  277. def clear_logs(self):
  278. """Clear the message log."""
  279. self._message_log.clear()
  280. @property
  281. def logging_enabled(self) -> bool:
  282. """Check if logging is enabled."""
  283. return self._logging_enabled