bambu_mqtt.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. import json
  2. import ssl
  3. import asyncio
  4. import logging
  5. import time
  6. from collections import deque
  7. from datetime import datetime
  8. from typing import Callable
  9. from dataclasses import dataclass, field
  10. import paho.mqtt.client as mqtt
  11. logger = logging.getLogger(__name__)
  12. @dataclass
  13. class MQTTLogEntry:
  14. """Log entry for MQTT message debugging."""
  15. timestamp: str
  16. topic: str
  17. direction: str # "in" or "out"
  18. payload: dict
  19. @dataclass
  20. class HMSError:
  21. """Health Management System error from printer."""
  22. code: str
  23. module: int
  24. severity: int # 1=fatal, 2=serious, 3=common, 4=info
  25. message: str = ""
  26. @dataclass
  27. class PrinterState:
  28. connected: bool = False
  29. state: str = "unknown"
  30. current_print: str | None = None
  31. subtask_name: str | None = None
  32. progress: float = 0.0
  33. remaining_time: int = 0
  34. layer_num: int = 0
  35. total_layers: int = 0
  36. temperatures: dict = field(default_factory=dict)
  37. raw_data: dict = field(default_factory=dict)
  38. gcode_file: str | None = None
  39. subtask_id: str | None = None
  40. hms_errors: list = field(default_factory=list) # List of HMSError
  41. class BambuMQTTClient:
  42. """MQTT client for Bambu Lab printer communication."""
  43. MQTT_PORT = 8883
  44. def __init__(
  45. self,
  46. ip_address: str,
  47. serial_number: str,
  48. access_code: str,
  49. on_state_change: Callable[[PrinterState], None] | None = None,
  50. on_print_start: Callable[[dict], None] | None = None,
  51. on_print_complete: Callable[[dict], None] | None = None,
  52. ):
  53. self.ip_address = ip_address
  54. self.serial_number = serial_number
  55. self.access_code = access_code
  56. self.on_state_change = on_state_change
  57. self.on_print_start = on_print_start
  58. self.on_print_complete = on_print_complete
  59. self.state = PrinterState()
  60. self._client: mqtt.Client | None = None
  61. self._loop: asyncio.AbstractEventLoop | None = None
  62. self._previous_gcode_state: str | None = None
  63. self._previous_gcode_file: str | None = None
  64. self._was_running: bool = False # Track if we've seen RUNNING state for current print
  65. self._completion_triggered: bool = False # Prevent duplicate completion triggers
  66. self._message_log: deque[MQTTLogEntry] = deque(maxlen=100)
  67. self._logging_enabled: bool = False
  68. self._last_message_time: float = 0.0 # Track when we last received a message
  69. @property
  70. def topic_subscribe(self) -> str:
  71. return f"device/{self.serial_number}/report"
  72. @property
  73. def topic_publish(self) -> str:
  74. return f"device/{self.serial_number}/request"
  75. def _on_connect(self, client, userdata, flags, rc, properties=None):
  76. if rc == 0:
  77. self.state.connected = True
  78. client.subscribe(self.topic_subscribe)
  79. # Request full status update
  80. self._request_push_all()
  81. else:
  82. self.state.connected = False
  83. def _on_disconnect(self, client, userdata, disconnect_flags=None, rc=None, properties=None):
  84. # Ignore spurious disconnect callbacks if we've received a message recently
  85. # Paho-mqtt sometimes fires disconnect callbacks while the connection is still active
  86. time_since_last_message = time.time() - self._last_message_time
  87. if time_since_last_message < 30.0 and self._last_message_time > 0:
  88. logger.debug(
  89. f"[{self.serial_number}] Ignoring spurious disconnect (last message {time_since_last_message:.1f}s ago)"
  90. )
  91. return
  92. logger.warning(f"[{self.serial_number}] MQTT disconnected: rc={rc}, flags={disconnect_flags}")
  93. self.state.connected = False
  94. if self.on_state_change:
  95. self.on_state_change(self.state)
  96. def _on_message(self, client, userdata, msg):
  97. try:
  98. payload = json.loads(msg.payload.decode())
  99. # Track last message time - receiving a message proves we're connected
  100. self._last_message_time = time.time()
  101. self.state.connected = True
  102. # Log message if logging is enabled
  103. if self._logging_enabled:
  104. self._message_log.append(MQTTLogEntry(
  105. timestamp=datetime.now().isoformat(),
  106. topic=msg.topic,
  107. direction="in",
  108. payload=payload,
  109. ))
  110. self._process_message(payload)
  111. except json.JSONDecodeError:
  112. pass
  113. def _process_message(self, payload: dict):
  114. """Process incoming MQTT message from printer."""
  115. if "print" in payload:
  116. print_data = payload["print"]
  117. # Log when we see gcode_state changes
  118. if "gcode_state" in print_data:
  119. logger.info(
  120. f"[{self.serial_number}] Received gcode_state: {print_data.get('gcode_state')}, "
  121. f"gcode_file: {print_data.get('gcode_file')}, subtask_name: {print_data.get('subtask_name')}"
  122. )
  123. self._update_state(print_data)
  124. def _update_state(self, data: dict):
  125. """Update printer state from message data."""
  126. previous_state = self.state.state
  127. # Update state fields
  128. if "gcode_state" in data:
  129. self.state.state = data["gcode_state"]
  130. if "gcode_file" in data:
  131. self.state.gcode_file = data["gcode_file"]
  132. self.state.current_print = data["gcode_file"]
  133. if "subtask_name" in data:
  134. self.state.subtask_name = data["subtask_name"]
  135. # Prefer subtask_name as current_print if available
  136. if data["subtask_name"]:
  137. self.state.current_print = data["subtask_name"]
  138. if "subtask_id" in data:
  139. self.state.subtask_id = data["subtask_id"]
  140. if "mc_percent" in data:
  141. self.state.progress = float(data["mc_percent"])
  142. if "mc_remaining_time" in data:
  143. self.state.remaining_time = int(data["mc_remaining_time"])
  144. if "layer_num" in data:
  145. self.state.layer_num = int(data["layer_num"])
  146. if "total_layer_num" in data:
  147. self.state.total_layers = int(data["total_layer_num"])
  148. # Temperature data
  149. temps = {}
  150. # Log all temperature-related fields for debugging (only when we have temp data)
  151. temp_fields = {k: v for k, v in data.items() if 'temp' in k.lower() or 'nozzle' in k.lower()}
  152. if temp_fields and not hasattr(self, '_temp_fields_logged'):
  153. logger.info(f"[{self.serial_number}] Temperature fields in MQTT data: {temp_fields}")
  154. self._temp_fields_logged = True
  155. if "bed_temper" in data:
  156. temps["bed"] = float(data["bed_temper"])
  157. if "bed_target_temper" in data:
  158. temps["bed_target"] = float(data["bed_target_temper"])
  159. if "nozzle_temper" in data:
  160. temps["nozzle"] = float(data["nozzle_temper"])
  161. if "nozzle_target_temper" in data:
  162. temps["nozzle_target"] = float(data["nozzle_target_temper"])
  163. # Second nozzle for dual-extruder printers (H2 series)
  164. # Try multiple possible field names used by different firmware versions
  165. if "nozzle_temper_2" in data:
  166. temps["nozzle_2"] = float(data["nozzle_temper_2"])
  167. elif "right_nozzle_temper" in data:
  168. temps["nozzle_2"] = float(data["right_nozzle_temper"])
  169. if "nozzle_target_temper_2" in data:
  170. temps["nozzle_2_target"] = float(data["nozzle_target_temper_2"])
  171. elif "right_nozzle_target_temper" in data:
  172. temps["nozzle_2_target"] = float(data["right_nozzle_target_temper"])
  173. # Also check for left nozzle as primary (some H2 models)
  174. if "left_nozzle_temper" in data and "nozzle" not in temps:
  175. temps["nozzle"] = float(data["left_nozzle_temper"])
  176. if "left_nozzle_target_temper" in data and "nozzle_target" not in temps:
  177. temps["nozzle_target"] = float(data["left_nozzle_target_temper"])
  178. if "chamber_temper" in data:
  179. temps["chamber"] = float(data["chamber_temper"])
  180. if temps:
  181. self.state.temperatures = temps
  182. # Parse HMS (Health Management System) errors
  183. if "hms" in data:
  184. hms_list = data["hms"]
  185. self.state.hms_errors = []
  186. if isinstance(hms_list, list):
  187. for hms in hms_list:
  188. if isinstance(hms, dict):
  189. # HMS format: {"attr": code, "code": full_code}
  190. # The code is a hex string, severity is in bits
  191. code = hms.get("code", hms.get("attr", "0"))
  192. if isinstance(code, int):
  193. code = hex(code)
  194. # Parse severity from code (typically last 4 bits indicate level)
  195. try:
  196. code_int = int(str(code).replace("0x", ""), 16) if code else 0
  197. severity = (code_int >> 16) & 0xF # Extract severity bits
  198. module = (code_int >> 24) & 0xFF # Extract module bits
  199. except (ValueError, TypeError):
  200. severity = 3
  201. module = 0
  202. self.state.hms_errors.append(HMSError(
  203. code=str(code),
  204. module=module,
  205. severity=severity if severity > 0 else 3,
  206. ))
  207. self.state.raw_data = data
  208. # Log state transitions for debugging
  209. if "gcode_state" in data:
  210. logger.debug(
  211. f"[{self.serial_number}] gcode_state: {self._previous_gcode_state} -> {self.state.state}, "
  212. f"file: {self.state.gcode_file}, subtask: {self.state.subtask_name}"
  213. )
  214. # Detect print start (state changes TO RUNNING with a file)
  215. current_file = self.state.gcode_file or self.state.current_print
  216. is_new_print = (
  217. self.state.state == "RUNNING"
  218. and self._previous_gcode_state != "RUNNING"
  219. and current_file
  220. )
  221. # Also detect if file changed while running (new print started)
  222. is_file_change = (
  223. self.state.state == "RUNNING"
  224. and current_file
  225. and current_file != self._previous_gcode_file
  226. and self._previous_gcode_file is not None
  227. )
  228. # Track RUNNING state for more robust completion detection
  229. if self.state.state == "RUNNING" and current_file:
  230. if not self._was_running:
  231. logger.info(f"[{self.serial_number}] Now tracking RUNNING state for {current_file}")
  232. self._was_running = True
  233. self._completion_triggered = False
  234. if is_new_print or is_file_change:
  235. # Clear any old HMS errors when a new print starts
  236. self.state.hms_errors = []
  237. # Reset completion tracking for new print
  238. self._was_running = True
  239. self._completion_triggered = False
  240. if (is_new_print or is_file_change) and self.on_print_start:
  241. logger.info(
  242. f"[{self.serial_number}] PRINT START detected - file: {current_file}, "
  243. f"subtask: {self.state.subtask_name}, is_new: {is_new_print}, is_file_change: {is_file_change}"
  244. )
  245. self.on_print_start({
  246. "filename": current_file,
  247. "subtask_name": self.state.subtask_name,
  248. "raw_data": data,
  249. })
  250. # Detect print completion (FINISH = success, FAILED = error, IDLE = aborted)
  251. # Use _was_running flag in addition to _previous_gcode_state for more robust detection
  252. # This handles cases where server restarts during a print
  253. should_trigger_completion = (
  254. self.state.state in ("FINISH", "FAILED")
  255. and not self._completion_triggered
  256. and self.on_print_complete
  257. and (
  258. self._previous_gcode_state == "RUNNING" # Normal transition
  259. or (self._was_running and self._previous_gcode_state != self.state.state) # After server restart
  260. )
  261. )
  262. # For IDLE, only trigger if we just came from RUNNING (explicit abort/cancel)
  263. if (
  264. self.state.state == "IDLE"
  265. and self._previous_gcode_state == "RUNNING"
  266. and not self._completion_triggered
  267. and self.on_print_complete
  268. ):
  269. should_trigger_completion = True
  270. if should_trigger_completion:
  271. if self.state.state == "FINISH":
  272. status = "completed"
  273. elif self.state.state == "FAILED":
  274. status = "failed"
  275. else:
  276. status = "aborted"
  277. logger.info(
  278. f"[{self.serial_number}] PRINT COMPLETE detected - state: {self.state.state}, "
  279. f"status: {status}, file: {self._previous_gcode_file or current_file}, "
  280. f"subtask: {self.state.subtask_name}, was_running: {self._was_running}"
  281. )
  282. self._completion_triggered = True
  283. self._was_running = False
  284. self.on_print_complete({
  285. "status": status,
  286. "filename": self._previous_gcode_file or current_file,
  287. "subtask_name": self.state.subtask_name,
  288. "raw_data": data,
  289. })
  290. self._previous_gcode_state = self.state.state
  291. if current_file:
  292. self._previous_gcode_file = current_file
  293. if self.on_state_change:
  294. self.on_state_change(self.state)
  295. def _request_push_all(self):
  296. """Request full status update from printer."""
  297. if self._client:
  298. message = {"pushing": {"command": "pushall"}}
  299. self._client.publish(self.topic_publish, json.dumps(message))
  300. def connect(self):
  301. """Connect to the printer MQTT broker."""
  302. self._client = mqtt.Client(
  303. callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
  304. client_id=f"bambutrack_{self.serial_number}",
  305. protocol=mqtt.MQTTv311,
  306. )
  307. self._client.username_pw_set("bblp", self.access_code)
  308. self._client.on_connect = self._on_connect
  309. self._client.on_disconnect = self._on_disconnect
  310. self._client.on_message = self._on_message
  311. # TLS setup - Bambu uses self-signed certs
  312. ssl_context = ssl.create_default_context()
  313. ssl_context.check_hostname = False
  314. ssl_context.verify_mode = ssl.CERT_NONE
  315. self._client.tls_set_context(ssl_context)
  316. # Use shorter keepalive (15s) for faster disconnect detection
  317. # Paho considers connection lost after 1.5x keepalive with no response
  318. self._client.connect_async(self.ip_address, self.MQTT_PORT, keepalive=15)
  319. self._client.loop_start()
  320. def start_print(self, filename: str, plate_id: int = 1):
  321. """Start a print job on the printer.
  322. The file should already be uploaded to /cache/ on the printer via FTP.
  323. """
  324. if self._client and self.state.connected:
  325. # Bambu print command format
  326. # Based on: https://github.com/darkorb/bambu-ftp-and-print
  327. command = {
  328. "print": {
  329. "sequence_id": 0,
  330. "command": "project_file",
  331. "param": f"Metadata/plate_{plate_id}.gcode",
  332. "subtask_name": filename,
  333. "url": f"ftp://{filename}",
  334. "timelapse": False,
  335. "bed_leveling": True,
  336. "flow_cali": True,
  337. "vibration_cali": True,
  338. "layer_inspect": False,
  339. "use_ams": True,
  340. }
  341. }
  342. logger.info(f"[{self.serial_number}] Sending print command: {json.dumps(command)}")
  343. self._client.publish(self.topic_publish, json.dumps(command))
  344. return True
  345. return False
  346. def stop_print(self) -> bool:
  347. """Stop the current print job."""
  348. if self._client and self.state.connected:
  349. command = {
  350. "print": {
  351. "command": "stop",
  352. "sequence_id": "0"
  353. }
  354. }
  355. self._client.publish(self.topic_publish, json.dumps(command))
  356. logger.info(f"[{self.serial_number}] Sent stop print command")
  357. return True
  358. return False
  359. def disconnect(self):
  360. """Disconnect from the printer."""
  361. if self._client:
  362. self._client.loop_stop()
  363. self._client.disconnect()
  364. self._client = None
  365. self.state.connected = False
  366. def send_command(self, command: dict):
  367. """Send a command to the printer."""
  368. if self._client and self.state.connected:
  369. # Log outgoing message if logging is enabled
  370. if self._logging_enabled:
  371. self._message_log.append(MQTTLogEntry(
  372. timestamp=datetime.now().isoformat(),
  373. topic=self.topic_publish,
  374. direction="out",
  375. payload=command,
  376. ))
  377. self._client.publish(self.topic_publish, json.dumps(command))
  378. def enable_logging(self, enabled: bool = True):
  379. """Enable or disable MQTT message logging."""
  380. self._logging_enabled = enabled
  381. # Don't clear logs when stopping - user can manually clear with clear_logs()
  382. def get_logs(self) -> list[MQTTLogEntry]:
  383. """Get all logged MQTT messages."""
  384. return list(self._message_log)
  385. def clear_logs(self):
  386. """Clear the message log."""
  387. self._message_log.clear()
  388. @property
  389. def logging_enabled(self) -> bool:
  390. """Check if logging is enabled."""
  391. return self._logging_enabled