bambu_mqtt.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. import json
  2. import ssl
  3. import asyncio
  4. import logging
  5. from collections import deque
  6. from datetime import datetime
  7. from typing import Callable
  8. from dataclasses import dataclass, field
  9. import paho.mqtt.client as mqtt
  10. logger = logging.getLogger(__name__)
  11. @dataclass
  12. class MQTTLogEntry:
  13. """Log entry for MQTT message debugging."""
  14. timestamp: str
  15. topic: str
  16. direction: str # "in" or "out"
  17. payload: dict
  18. @dataclass
  19. class HMSError:
  20. """Health Management System error from printer."""
  21. code: str
  22. module: int
  23. severity: int # 1=fatal, 2=serious, 3=common, 4=info
  24. message: str = ""
  25. @dataclass
  26. class PrinterState:
  27. connected: bool = False
  28. state: str = "unknown"
  29. current_print: str | None = None
  30. subtask_name: str | None = None
  31. progress: float = 0.0
  32. remaining_time: int = 0
  33. layer_num: int = 0
  34. total_layers: int = 0
  35. temperatures: dict = field(default_factory=dict)
  36. raw_data: dict = field(default_factory=dict)
  37. gcode_file: str | None = None
  38. subtask_id: str | None = None
  39. hms_errors: list = field(default_factory=list) # List of HMSError
  40. class BambuMQTTClient:
  41. """MQTT client for Bambu Lab printer communication."""
  42. MQTT_PORT = 8883
  43. def __init__(
  44. self,
  45. ip_address: str,
  46. serial_number: str,
  47. access_code: str,
  48. on_state_change: Callable[[PrinterState], None] | None = None,
  49. on_print_start: Callable[[dict], None] | None = None,
  50. on_print_complete: Callable[[dict], None] | None = None,
  51. ):
  52. self.ip_address = ip_address
  53. self.serial_number = serial_number
  54. self.access_code = access_code
  55. self.on_state_change = on_state_change
  56. self.on_print_start = on_print_start
  57. self.on_print_complete = on_print_complete
  58. self.state = PrinterState()
  59. self._client: mqtt.Client | None = None
  60. self._loop: asyncio.AbstractEventLoop | None = None
  61. self._previous_gcode_state: str | None = None
  62. self._previous_gcode_file: str | None = None
  63. self._message_log: deque[MQTTLogEntry] = deque(maxlen=100)
  64. self._logging_enabled: bool = False
  65. @property
  66. def topic_subscribe(self) -> str:
  67. return f"device/{self.serial_number}/report"
  68. @property
  69. def topic_publish(self) -> str:
  70. return f"device/{self.serial_number}/request"
  71. def _on_connect(self, client, userdata, flags, rc, properties=None):
  72. if rc == 0:
  73. self.state.connected = True
  74. client.subscribe(self.topic_subscribe)
  75. # Request full status update
  76. self._request_push_all()
  77. else:
  78. self.state.connected = False
  79. def _on_disconnect(self, client, userdata, disconnect_flags=None, rc=None, properties=None):
  80. self.state.connected = False
  81. if self.on_state_change:
  82. self.on_state_change(self.state)
  83. def _on_message(self, client, userdata, msg):
  84. try:
  85. payload = json.loads(msg.payload.decode())
  86. # Log message if logging is enabled
  87. if self._logging_enabled:
  88. self._message_log.append(MQTTLogEntry(
  89. timestamp=datetime.now().isoformat(),
  90. topic=msg.topic,
  91. direction="in",
  92. payload=payload,
  93. ))
  94. self._process_message(payload)
  95. except json.JSONDecodeError:
  96. pass
  97. def _process_message(self, payload: dict):
  98. """Process incoming MQTT message from printer."""
  99. if "print" in payload:
  100. print_data = payload["print"]
  101. # Log when we see gcode_state changes
  102. if "gcode_state" in print_data:
  103. logger.info(
  104. f"[{self.serial_number}] Received gcode_state: {print_data.get('gcode_state')}, "
  105. f"gcode_file: {print_data.get('gcode_file')}, subtask_name: {print_data.get('subtask_name')}"
  106. )
  107. self._update_state(print_data)
  108. def _update_state(self, data: dict):
  109. """Update printer state from message data."""
  110. previous_state = self.state.state
  111. # Update state fields
  112. if "gcode_state" in data:
  113. self.state.state = data["gcode_state"]
  114. if "gcode_file" in data:
  115. self.state.gcode_file = data["gcode_file"]
  116. self.state.current_print = data["gcode_file"]
  117. if "subtask_name" in data:
  118. self.state.subtask_name = data["subtask_name"]
  119. # Prefer subtask_name as current_print if available
  120. if data["subtask_name"]:
  121. self.state.current_print = data["subtask_name"]
  122. if "subtask_id" in data:
  123. self.state.subtask_id = data["subtask_id"]
  124. if "mc_percent" in data:
  125. self.state.progress = float(data["mc_percent"])
  126. if "mc_remaining_time" in data:
  127. self.state.remaining_time = int(data["mc_remaining_time"])
  128. if "layer_num" in data:
  129. self.state.layer_num = int(data["layer_num"])
  130. if "total_layer_num" in data:
  131. self.state.total_layers = int(data["total_layer_num"])
  132. # Temperature data
  133. temps = {}
  134. # Log all temperature-related fields for debugging (only when we have temp data)
  135. temp_fields = {k: v for k, v in data.items() if 'temp' in k.lower() or 'nozzle' in k.lower()}
  136. if temp_fields and not hasattr(self, '_temp_fields_logged'):
  137. logger.info(f"[{self.serial_number}] Temperature fields in MQTT data: {temp_fields}")
  138. self._temp_fields_logged = True
  139. if "bed_temper" in data:
  140. temps["bed"] = float(data["bed_temper"])
  141. if "bed_target_temper" in data:
  142. temps["bed_target"] = float(data["bed_target_temper"])
  143. if "nozzle_temper" in data:
  144. temps["nozzle"] = float(data["nozzle_temper"])
  145. if "nozzle_target_temper" in data:
  146. temps["nozzle_target"] = float(data["nozzle_target_temper"])
  147. # Second nozzle for dual-extruder printers (H2 series)
  148. # Try multiple possible field names used by different firmware versions
  149. if "nozzle_temper_2" in data:
  150. temps["nozzle_2"] = float(data["nozzle_temper_2"])
  151. elif "right_nozzle_temper" in data:
  152. temps["nozzle_2"] = float(data["right_nozzle_temper"])
  153. if "nozzle_target_temper_2" in data:
  154. temps["nozzle_2_target"] = float(data["nozzle_target_temper_2"])
  155. elif "right_nozzle_target_temper" in data:
  156. temps["nozzle_2_target"] = float(data["right_nozzle_target_temper"])
  157. # Also check for left nozzle as primary (some H2 models)
  158. if "left_nozzle_temper" in data and "nozzle" not in temps:
  159. temps["nozzle"] = float(data["left_nozzle_temper"])
  160. if "left_nozzle_target_temper" in data and "nozzle_target" not in temps:
  161. temps["nozzle_target"] = float(data["left_nozzle_target_temper"])
  162. if "chamber_temper" in data:
  163. temps["chamber"] = float(data["chamber_temper"])
  164. if temps:
  165. self.state.temperatures = temps
  166. # Parse HMS (Health Management System) errors
  167. if "hms" in data:
  168. hms_list = data["hms"]
  169. self.state.hms_errors = []
  170. if isinstance(hms_list, list):
  171. for hms in hms_list:
  172. if isinstance(hms, dict):
  173. # HMS format: {"attr": code, "code": full_code}
  174. # The code is a hex string, severity is in bits
  175. code = hms.get("code", hms.get("attr", "0"))
  176. if isinstance(code, int):
  177. code = hex(code)
  178. # Parse severity from code (typically last 4 bits indicate level)
  179. try:
  180. code_int = int(str(code).replace("0x", ""), 16) if code else 0
  181. severity = (code_int >> 16) & 0xF # Extract severity bits
  182. module = (code_int >> 24) & 0xFF # Extract module bits
  183. except (ValueError, TypeError):
  184. severity = 3
  185. module = 0
  186. self.state.hms_errors.append(HMSError(
  187. code=str(code),
  188. module=module,
  189. severity=severity if severity > 0 else 3,
  190. ))
  191. self.state.raw_data = data
  192. # Log state transitions for debugging
  193. if "gcode_state" in data:
  194. logger.debug(
  195. f"[{self.serial_number}] gcode_state: {self._previous_gcode_state} -> {self.state.state}, "
  196. f"file: {self.state.gcode_file}, subtask: {self.state.subtask_name}"
  197. )
  198. # Detect print start (state changes TO RUNNING with a file)
  199. current_file = self.state.gcode_file or self.state.current_print
  200. is_new_print = (
  201. self.state.state == "RUNNING"
  202. and self._previous_gcode_state != "RUNNING"
  203. and current_file
  204. )
  205. # Also detect if file changed while running (new print started)
  206. is_file_change = (
  207. self.state.state == "RUNNING"
  208. and current_file
  209. and current_file != self._previous_gcode_file
  210. and self._previous_gcode_file is not None
  211. )
  212. if is_new_print or is_file_change:
  213. # Clear any old HMS errors when a new print starts
  214. self.state.hms_errors = []
  215. if (is_new_print or is_file_change) and self.on_print_start:
  216. logger.info(
  217. f"[{self.serial_number}] PRINT START detected - file: {current_file}, "
  218. f"subtask: {self.state.subtask_name}, is_new: {is_new_print}, is_file_change: {is_file_change}"
  219. )
  220. self.on_print_start({
  221. "filename": current_file,
  222. "subtask_name": self.state.subtask_name,
  223. "raw_data": data,
  224. })
  225. # Detect print completion (FINISH = success, FAILED = error)
  226. if (
  227. self._previous_gcode_state == "RUNNING"
  228. and self.state.state in ("FINISH", "FAILED")
  229. and self.on_print_complete
  230. ):
  231. logger.info(
  232. f"[{self.serial_number}] PRINT COMPLETE detected - state: {self.state.state}, "
  233. f"file: {self._previous_gcode_file or current_file}"
  234. )
  235. self.on_print_complete({
  236. "status": "completed" if self.state.state == "FINISH" else "failed",
  237. "filename": self._previous_gcode_file or current_file,
  238. "raw_data": data,
  239. })
  240. self._previous_gcode_state = self.state.state
  241. if current_file:
  242. self._previous_gcode_file = current_file
  243. if self.on_state_change:
  244. self.on_state_change(self.state)
  245. def _request_push_all(self):
  246. """Request full status update from printer."""
  247. if self._client:
  248. message = {"pushing": {"command": "pushall"}}
  249. self._client.publish(self.topic_publish, json.dumps(message))
  250. def connect(self):
  251. """Connect to the printer MQTT broker."""
  252. self._client = mqtt.Client(
  253. callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
  254. client_id=f"bambutrack_{self.serial_number}",
  255. protocol=mqtt.MQTTv311,
  256. )
  257. self._client.username_pw_set("bblp", self.access_code)
  258. self._client.on_connect = self._on_connect
  259. self._client.on_disconnect = self._on_disconnect
  260. self._client.on_message = self._on_message
  261. # TLS setup - Bambu uses self-signed certs
  262. ssl_context = ssl.create_default_context()
  263. ssl_context.check_hostname = False
  264. ssl_context.verify_mode = ssl.CERT_NONE
  265. self._client.tls_set_context(ssl_context)
  266. self._client.connect_async(self.ip_address, self.MQTT_PORT)
  267. self._client.loop_start()
  268. def start_print(self, filename: str, plate_id: int = 1):
  269. """Start a print job on the printer."""
  270. if self._client and self.state.connected:
  271. # Bambu print command format
  272. command = {
  273. "print": {
  274. "command": "project_file",
  275. "param": f"Metadata/plate_{plate_id}.gcode",
  276. "subtask_name": filename,
  277. "url": f"ftp://{filename}",
  278. "bed_type": "auto",
  279. "timelapse": False,
  280. "bed_leveling": True,
  281. "flow_cali": True,
  282. "vibration_cali": True,
  283. "layer_inspect": False,
  284. "use_ams": True,
  285. }
  286. }
  287. self._client.publish(self.topic_publish, json.dumps(command))
  288. return True
  289. return False
  290. def disconnect(self):
  291. """Disconnect from the printer."""
  292. if self._client:
  293. self._client.loop_stop()
  294. self._client.disconnect()
  295. self._client = None
  296. self.state.connected = False
  297. def send_command(self, command: dict):
  298. """Send a command to the printer."""
  299. if self._client and self.state.connected:
  300. # Log outgoing message if logging is enabled
  301. if self._logging_enabled:
  302. self._message_log.append(MQTTLogEntry(
  303. timestamp=datetime.now().isoformat(),
  304. topic=self.topic_publish,
  305. direction="out",
  306. payload=command,
  307. ))
  308. self._client.publish(self.topic_publish, json.dumps(command))
  309. def enable_logging(self, enabled: bool = True):
  310. """Enable or disable MQTT message logging."""
  311. self._logging_enabled = enabled
  312. if not enabled:
  313. self._message_log.clear()
  314. def get_logs(self) -> list[MQTTLogEntry]:
  315. """Get all logged MQTT messages."""
  316. return list(self._message_log)
  317. def clear_logs(self):
  318. """Clear the message log."""
  319. self._message_log.clear()
  320. @property
  321. def logging_enabled(self) -> bool:
  322. """Check if logging is enabled."""
  323. return self._logging_enabled