bambu_mqtt.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392
  1. import json
  2. import ssl
  3. import asyncio
  4. import logging
  5. from collections import deque
  6. from datetime import datetime
  7. from typing import Callable
  8. from dataclasses import dataclass, field
  9. import paho.mqtt.client as mqtt
  10. logger = logging.getLogger(__name__)
  11. @dataclass
  12. class MQTTLogEntry:
  13. """Log entry for MQTT message debugging."""
  14. timestamp: str
  15. topic: str
  16. direction: str # "in" or "out"
  17. payload: dict
  18. @dataclass
  19. class HMSError:
  20. """Health Management System error from printer."""
  21. code: str
  22. module: int
  23. severity: int # 1=fatal, 2=serious, 3=common, 4=info
  24. message: str = ""
  25. @dataclass
  26. class PrinterState:
  27. connected: bool = False
  28. state: str = "unknown"
  29. current_print: str | None = None
  30. subtask_name: str | None = None
  31. progress: float = 0.0
  32. remaining_time: int = 0
  33. layer_num: int = 0
  34. total_layers: int = 0
  35. temperatures: dict = field(default_factory=dict)
  36. raw_data: dict = field(default_factory=dict)
  37. gcode_file: str | None = None
  38. subtask_id: str | None = None
  39. hms_errors: list = field(default_factory=list) # List of HMSError
  40. class BambuMQTTClient:
  41. """MQTT client for Bambu Lab printer communication."""
  42. MQTT_PORT = 8883
  43. def __init__(
  44. self,
  45. ip_address: str,
  46. serial_number: str,
  47. access_code: str,
  48. on_state_change: Callable[[PrinterState], None] | None = None,
  49. on_print_start: Callable[[dict], None] | None = None,
  50. on_print_complete: Callable[[dict], None] | None = None,
  51. ):
  52. self.ip_address = ip_address
  53. self.serial_number = serial_number
  54. self.access_code = access_code
  55. self.on_state_change = on_state_change
  56. self.on_print_start = on_print_start
  57. self.on_print_complete = on_print_complete
  58. self.state = PrinterState()
  59. self._client: mqtt.Client | None = None
  60. self._loop: asyncio.AbstractEventLoop | None = None
  61. self._previous_gcode_state: str | None = None
  62. self._previous_gcode_file: str | None = None
  63. self._message_log: deque[MQTTLogEntry] = deque(maxlen=100)
  64. self._logging_enabled: bool = False
  65. @property
  66. def topic_subscribe(self) -> str:
  67. return f"device/{self.serial_number}/report"
  68. @property
  69. def topic_publish(self) -> str:
  70. return f"device/{self.serial_number}/request"
  71. def _on_connect(self, client, userdata, flags, rc, properties=None):
  72. if rc == 0:
  73. self.state.connected = True
  74. client.subscribe(self.topic_subscribe)
  75. # Request full status update
  76. self._request_push_all()
  77. else:
  78. self.state.connected = False
  79. def _on_disconnect(self, client, userdata, disconnect_flags=None, rc=None, properties=None):
  80. self.state.connected = False
  81. if self.on_state_change:
  82. self.on_state_change(self.state)
  83. def _on_message(self, client, userdata, msg):
  84. try:
  85. payload = json.loads(msg.payload.decode())
  86. # Log message if logging is enabled
  87. if self._logging_enabled:
  88. self._message_log.append(MQTTLogEntry(
  89. timestamp=datetime.now().isoformat(),
  90. topic=msg.topic,
  91. direction="in",
  92. payload=payload,
  93. ))
  94. self._process_message(payload)
  95. except json.JSONDecodeError:
  96. pass
  97. def _process_message(self, payload: dict):
  98. """Process incoming MQTT message from printer."""
  99. if "print" in payload:
  100. print_data = payload["print"]
  101. # Log when we see gcode_state changes
  102. if "gcode_state" in print_data:
  103. logger.info(
  104. f"[{self.serial_number}] Received gcode_state: {print_data.get('gcode_state')}, "
  105. f"gcode_file: {print_data.get('gcode_file')}, subtask_name: {print_data.get('subtask_name')}"
  106. )
  107. self._update_state(print_data)
  108. def _update_state(self, data: dict):
  109. """Update printer state from message data."""
  110. previous_state = self.state.state
  111. # Update state fields
  112. if "gcode_state" in data:
  113. self.state.state = data["gcode_state"]
  114. if "gcode_file" in data:
  115. self.state.gcode_file = data["gcode_file"]
  116. self.state.current_print = data["gcode_file"]
  117. if "subtask_name" in data:
  118. self.state.subtask_name = data["subtask_name"]
  119. # Prefer subtask_name as current_print if available
  120. if data["subtask_name"]:
  121. self.state.current_print = data["subtask_name"]
  122. if "subtask_id" in data:
  123. self.state.subtask_id = data["subtask_id"]
  124. if "mc_percent" in data:
  125. self.state.progress = float(data["mc_percent"])
  126. if "mc_remaining_time" in data:
  127. self.state.remaining_time = int(data["mc_remaining_time"])
  128. if "layer_num" in data:
  129. self.state.layer_num = int(data["layer_num"])
  130. if "total_layer_num" in data:
  131. self.state.total_layers = int(data["total_layer_num"])
  132. # Temperature data
  133. temps = {}
  134. # Log all temperature-related fields for debugging (only when we have temp data)
  135. temp_fields = {k: v for k, v in data.items() if 'temp' in k.lower() or 'nozzle' in k.lower()}
  136. if temp_fields and not hasattr(self, '_temp_fields_logged'):
  137. logger.info(f"[{self.serial_number}] Temperature fields in MQTT data: {temp_fields}")
  138. self._temp_fields_logged = True
  139. if "bed_temper" in data:
  140. temps["bed"] = float(data["bed_temper"])
  141. if "bed_target_temper" in data:
  142. temps["bed_target"] = float(data["bed_target_temper"])
  143. if "nozzle_temper" in data:
  144. temps["nozzle"] = float(data["nozzle_temper"])
  145. if "nozzle_target_temper" in data:
  146. temps["nozzle_target"] = float(data["nozzle_target_temper"])
  147. # Second nozzle for dual-extruder printers (H2 series)
  148. # Try multiple possible field names used by different firmware versions
  149. if "nozzle_temper_2" in data:
  150. temps["nozzle_2"] = float(data["nozzle_temper_2"])
  151. elif "right_nozzle_temper" in data:
  152. temps["nozzle_2"] = float(data["right_nozzle_temper"])
  153. if "nozzle_target_temper_2" in data:
  154. temps["nozzle_2_target"] = float(data["nozzle_target_temper_2"])
  155. elif "right_nozzle_target_temper" in data:
  156. temps["nozzle_2_target"] = float(data["right_nozzle_target_temper"])
  157. # Also check for left nozzle as primary (some H2 models)
  158. if "left_nozzle_temper" in data and "nozzle" not in temps:
  159. temps["nozzle"] = float(data["left_nozzle_temper"])
  160. if "left_nozzle_target_temper" in data and "nozzle_target" not in temps:
  161. temps["nozzle_target"] = float(data["left_nozzle_target_temper"])
  162. if "chamber_temper" in data:
  163. temps["chamber"] = float(data["chamber_temper"])
  164. if temps:
  165. self.state.temperatures = temps
  166. # Parse HMS (Health Management System) errors
  167. if "hms" in data:
  168. hms_list = data["hms"]
  169. self.state.hms_errors = []
  170. if isinstance(hms_list, list):
  171. for hms in hms_list:
  172. if isinstance(hms, dict):
  173. # HMS format: {"attr": code, "code": full_code}
  174. # The code is a hex string, severity is in bits
  175. code = hms.get("code", hms.get("attr", "0"))
  176. if isinstance(code, int):
  177. code = hex(code)
  178. # Parse severity from code (typically last 4 bits indicate level)
  179. try:
  180. code_int = int(str(code).replace("0x", ""), 16) if code else 0
  181. severity = (code_int >> 16) & 0xF # Extract severity bits
  182. module = (code_int >> 24) & 0xFF # Extract module bits
  183. except (ValueError, TypeError):
  184. severity = 3
  185. module = 0
  186. self.state.hms_errors.append(HMSError(
  187. code=str(code),
  188. module=module,
  189. severity=severity if severity > 0 else 3,
  190. ))
  191. self.state.raw_data = data
  192. # Log state transitions for debugging
  193. if "gcode_state" in data:
  194. logger.debug(
  195. f"[{self.serial_number}] gcode_state: {self._previous_gcode_state} -> {self.state.state}, "
  196. f"file: {self.state.gcode_file}, subtask: {self.state.subtask_name}"
  197. )
  198. # Detect print start (state changes TO RUNNING with a file)
  199. current_file = self.state.gcode_file or self.state.current_print
  200. is_new_print = (
  201. self.state.state == "RUNNING"
  202. and self._previous_gcode_state != "RUNNING"
  203. and current_file
  204. )
  205. # Also detect if file changed while running (new print started)
  206. is_file_change = (
  207. self.state.state == "RUNNING"
  208. and current_file
  209. and current_file != self._previous_gcode_file
  210. and self._previous_gcode_file is not None
  211. )
  212. if is_new_print or is_file_change:
  213. # Clear any old HMS errors when a new print starts
  214. self.state.hms_errors = []
  215. if (is_new_print or is_file_change) and self.on_print_start:
  216. logger.info(
  217. f"[{self.serial_number}] PRINT START detected - file: {current_file}, "
  218. f"subtask: {self.state.subtask_name}, is_new: {is_new_print}, is_file_change: {is_file_change}"
  219. )
  220. self.on_print_start({
  221. "filename": current_file,
  222. "subtask_name": self.state.subtask_name,
  223. "raw_data": data,
  224. })
  225. # Detect print completion (FINISH = success, FAILED = error, IDLE = aborted)
  226. if (
  227. self._previous_gcode_state == "RUNNING"
  228. and self.state.state in ("FINISH", "FAILED", "IDLE")
  229. and self.on_print_complete
  230. ):
  231. if self.state.state == "FINISH":
  232. status = "completed"
  233. elif self.state.state == "FAILED":
  234. status = "failed"
  235. else:
  236. status = "aborted"
  237. logger.info(
  238. f"[{self.serial_number}] PRINT COMPLETE detected - state: {self.state.state}, "
  239. f"status: {status}, file: {self._previous_gcode_file or current_file}"
  240. )
  241. self.on_print_complete({
  242. "status": status,
  243. "filename": self._previous_gcode_file or current_file,
  244. "raw_data": data,
  245. })
  246. self._previous_gcode_state = self.state.state
  247. if current_file:
  248. self._previous_gcode_file = current_file
  249. if self.on_state_change:
  250. self.on_state_change(self.state)
  251. def _request_push_all(self):
  252. """Request full status update from printer."""
  253. if self._client:
  254. message = {"pushing": {"command": "pushall"}}
  255. self._client.publish(self.topic_publish, json.dumps(message))
  256. def connect(self):
  257. """Connect to the printer MQTT broker."""
  258. self._client = mqtt.Client(
  259. callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
  260. client_id=f"bambutrack_{self.serial_number}",
  261. protocol=mqtt.MQTTv311,
  262. )
  263. self._client.username_pw_set("bblp", self.access_code)
  264. self._client.on_connect = self._on_connect
  265. self._client.on_disconnect = self._on_disconnect
  266. self._client.on_message = self._on_message
  267. # TLS setup - Bambu uses self-signed certs
  268. ssl_context = ssl.create_default_context()
  269. ssl_context.check_hostname = False
  270. ssl_context.verify_mode = ssl.CERT_NONE
  271. self._client.tls_set_context(ssl_context)
  272. # Use shorter keepalive (15s) for faster disconnect detection
  273. # Paho considers connection lost after 1.5x keepalive with no response
  274. self._client.connect_async(self.ip_address, self.MQTT_PORT, keepalive=15)
  275. self._client.loop_start()
  276. def start_print(self, filename: str, plate_id: int = 1):
  277. """Start a print job on the printer.
  278. The file should already be uploaded to /cache/ on the printer via FTP.
  279. """
  280. if self._client and self.state.connected:
  281. # Bambu print command format
  282. # Based on: https://github.com/darkorb/bambu-ftp-and-print
  283. command = {
  284. "print": {
  285. "sequence_id": 0,
  286. "command": "project_file",
  287. "param": f"Metadata/plate_{plate_id}.gcode",
  288. "subtask_name": filename,
  289. "url": f"ftp://{filename}",
  290. "timelapse": False,
  291. "bed_leveling": True,
  292. "flow_cali": True,
  293. "vibration_cali": True,
  294. "layer_inspect": False,
  295. "use_ams": True,
  296. }
  297. }
  298. logger.info(f"[{self.serial_number}] Sending print command: {json.dumps(command)}")
  299. self._client.publish(self.topic_publish, json.dumps(command))
  300. return True
  301. return False
  302. def stop_print(self) -> bool:
  303. """Stop the current print job."""
  304. if self._client and self.state.connected:
  305. command = {
  306. "print": {
  307. "command": "stop",
  308. "sequence_id": "0"
  309. }
  310. }
  311. self._client.publish(self.topic_publish, json.dumps(command))
  312. logger.info(f"[{self.serial_number}] Sent stop print command")
  313. return True
  314. return False
  315. def disconnect(self):
  316. """Disconnect from the printer."""
  317. if self._client:
  318. self._client.loop_stop()
  319. self._client.disconnect()
  320. self._client = None
  321. self.state.connected = False
  322. def send_command(self, command: dict):
  323. """Send a command to the printer."""
  324. if self._client and self.state.connected:
  325. # Log outgoing message if logging is enabled
  326. if self._logging_enabled:
  327. self._message_log.append(MQTTLogEntry(
  328. timestamp=datetime.now().isoformat(),
  329. topic=self.topic_publish,
  330. direction="out",
  331. payload=command,
  332. ))
  333. self._client.publish(self.topic_publish, json.dumps(command))
  334. def enable_logging(self, enabled: bool = True):
  335. """Enable or disable MQTT message logging."""
  336. self._logging_enabled = enabled
  337. # Don't clear logs when stopping - user can manually clear with clear_logs()
  338. def get_logs(self) -> list[MQTTLogEntry]:
  339. """Get all logged MQTT messages."""
  340. return list(self._message_log)
  341. def clear_logs(self):
  342. """Clear the message log."""
  343. self._message_log.clear()
  344. @property
  345. def logging_enabled(self) -> bool:
  346. """Check if logging is enabled."""
  347. return self._logging_enabled