| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407 |
- import json
- import ssl
- import asyncio
- import logging
- import time
- from collections import deque
- from datetime import datetime
- from typing import Callable
- from dataclasses import dataclass, field
- import paho.mqtt.client as mqtt
- logger = logging.getLogger(__name__)
- @dataclass
- class MQTTLogEntry:
- """Log entry for MQTT message debugging."""
- timestamp: str
- topic: str
- direction: str # "in" or "out"
- payload: dict
- @dataclass
- class HMSError:
- """Health Management System error from printer."""
- code: str
- module: int
- severity: int # 1=fatal, 2=serious, 3=common, 4=info
- message: str = ""
- @dataclass
- class PrinterState:
- connected: bool = False
- state: str = "unknown"
- current_print: str | None = None
- subtask_name: str | None = None
- progress: float = 0.0
- remaining_time: int = 0
- layer_num: int = 0
- total_layers: int = 0
- temperatures: dict = field(default_factory=dict)
- raw_data: dict = field(default_factory=dict)
- gcode_file: str | None = None
- subtask_id: str | None = None
- hms_errors: list = field(default_factory=list) # List of HMSError
- class BambuMQTTClient:
- """MQTT client for Bambu Lab printer communication."""
- MQTT_PORT = 8883
- def __init__(
- self,
- ip_address: str,
- serial_number: str,
- access_code: str,
- on_state_change: Callable[[PrinterState], None] | None = None,
- on_print_start: Callable[[dict], None] | None = None,
- on_print_complete: Callable[[dict], None] | None = None,
- ):
- self.ip_address = ip_address
- self.serial_number = serial_number
- self.access_code = access_code
- self.on_state_change = on_state_change
- self.on_print_start = on_print_start
- self.on_print_complete = on_print_complete
- self.state = PrinterState()
- self._client: mqtt.Client | None = None
- self._loop: asyncio.AbstractEventLoop | None = None
- self._previous_gcode_state: str | None = None
- self._previous_gcode_file: str | None = None
- self._message_log: deque[MQTTLogEntry] = deque(maxlen=100)
- self._logging_enabled: bool = False
- self._last_message_time: float = 0.0 # Track when we last received a message
- @property
- def topic_subscribe(self) -> str:
- return f"device/{self.serial_number}/report"
- @property
- def topic_publish(self) -> str:
- return f"device/{self.serial_number}/request"
- def _on_connect(self, client, userdata, flags, rc, properties=None):
- if rc == 0:
- self.state.connected = True
- client.subscribe(self.topic_subscribe)
- # Request full status update
- self._request_push_all()
- else:
- self.state.connected = False
- def _on_disconnect(self, client, userdata, disconnect_flags=None, rc=None, properties=None):
- # Ignore spurious disconnect callbacks if we've received a message recently
- # Paho-mqtt sometimes fires disconnect callbacks while the connection is still active
- time_since_last_message = time.time() - self._last_message_time
- if time_since_last_message < 30.0 and self._last_message_time > 0:
- logger.debug(
- f"[{self.serial_number}] Ignoring spurious disconnect (last message {time_since_last_message:.1f}s ago)"
- )
- return
- logger.warning(f"[{self.serial_number}] MQTT disconnected: rc={rc}, flags={disconnect_flags}")
- self.state.connected = False
- if self.on_state_change:
- self.on_state_change(self.state)
- def _on_message(self, client, userdata, msg):
- try:
- payload = json.loads(msg.payload.decode())
- # Track last message time - receiving a message proves we're connected
- self._last_message_time = time.time()
- self.state.connected = True
- # Log message if logging is enabled
- if self._logging_enabled:
- self._message_log.append(MQTTLogEntry(
- timestamp=datetime.now().isoformat(),
- topic=msg.topic,
- direction="in",
- payload=payload,
- ))
- self._process_message(payload)
- except json.JSONDecodeError:
- pass
- def _process_message(self, payload: dict):
- """Process incoming MQTT message from printer."""
- if "print" in payload:
- print_data = payload["print"]
- # Log when we see gcode_state changes
- if "gcode_state" in print_data:
- logger.info(
- f"[{self.serial_number}] Received gcode_state: {print_data.get('gcode_state')}, "
- f"gcode_file: {print_data.get('gcode_file')}, subtask_name: {print_data.get('subtask_name')}"
- )
- self._update_state(print_data)
- def _update_state(self, data: dict):
- """Update printer state from message data."""
- previous_state = self.state.state
- # Update state fields
- if "gcode_state" in data:
- self.state.state = data["gcode_state"]
- if "gcode_file" in data:
- self.state.gcode_file = data["gcode_file"]
- self.state.current_print = data["gcode_file"]
- if "subtask_name" in data:
- self.state.subtask_name = data["subtask_name"]
- # Prefer subtask_name as current_print if available
- if data["subtask_name"]:
- self.state.current_print = data["subtask_name"]
- if "subtask_id" in data:
- self.state.subtask_id = data["subtask_id"]
- if "mc_percent" in data:
- self.state.progress = float(data["mc_percent"])
- if "mc_remaining_time" in data:
- self.state.remaining_time = int(data["mc_remaining_time"])
- if "layer_num" in data:
- self.state.layer_num = int(data["layer_num"])
- if "total_layer_num" in data:
- self.state.total_layers = int(data["total_layer_num"])
- # Temperature data
- temps = {}
- # Log all temperature-related fields for debugging (only when we have temp data)
- temp_fields = {k: v for k, v in data.items() if 'temp' in k.lower() or 'nozzle' in k.lower()}
- if temp_fields and not hasattr(self, '_temp_fields_logged'):
- logger.info(f"[{self.serial_number}] Temperature fields in MQTT data: {temp_fields}")
- self._temp_fields_logged = True
- if "bed_temper" in data:
- temps["bed"] = float(data["bed_temper"])
- if "bed_target_temper" in data:
- temps["bed_target"] = float(data["bed_target_temper"])
- if "nozzle_temper" in data:
- temps["nozzle"] = float(data["nozzle_temper"])
- if "nozzle_target_temper" in data:
- temps["nozzle_target"] = float(data["nozzle_target_temper"])
- # Second nozzle for dual-extruder printers (H2 series)
- # Try multiple possible field names used by different firmware versions
- if "nozzle_temper_2" in data:
- temps["nozzle_2"] = float(data["nozzle_temper_2"])
- elif "right_nozzle_temper" in data:
- temps["nozzle_2"] = float(data["right_nozzle_temper"])
- if "nozzle_target_temper_2" in data:
- temps["nozzle_2_target"] = float(data["nozzle_target_temper_2"])
- elif "right_nozzle_target_temper" in data:
- temps["nozzle_2_target"] = float(data["right_nozzle_target_temper"])
- # Also check for left nozzle as primary (some H2 models)
- if "left_nozzle_temper" in data and "nozzle" not in temps:
- temps["nozzle"] = float(data["left_nozzle_temper"])
- if "left_nozzle_target_temper" in data and "nozzle_target" not in temps:
- temps["nozzle_target"] = float(data["left_nozzle_target_temper"])
- if "chamber_temper" in data:
- temps["chamber"] = float(data["chamber_temper"])
- if temps:
- self.state.temperatures = temps
- # Parse HMS (Health Management System) errors
- if "hms" in data:
- hms_list = data["hms"]
- self.state.hms_errors = []
- if isinstance(hms_list, list):
- for hms in hms_list:
- if isinstance(hms, dict):
- # HMS format: {"attr": code, "code": full_code}
- # The code is a hex string, severity is in bits
- code = hms.get("code", hms.get("attr", "0"))
- if isinstance(code, int):
- code = hex(code)
- # Parse severity from code (typically last 4 bits indicate level)
- try:
- code_int = int(str(code).replace("0x", ""), 16) if code else 0
- severity = (code_int >> 16) & 0xF # Extract severity bits
- module = (code_int >> 24) & 0xFF # Extract module bits
- except (ValueError, TypeError):
- severity = 3
- module = 0
- self.state.hms_errors.append(HMSError(
- code=str(code),
- module=module,
- severity=severity if severity > 0 else 3,
- ))
- self.state.raw_data = data
- # Log state transitions for debugging
- if "gcode_state" in data:
- logger.debug(
- f"[{self.serial_number}] gcode_state: {self._previous_gcode_state} -> {self.state.state}, "
- f"file: {self.state.gcode_file}, subtask: {self.state.subtask_name}"
- )
- # Detect print start (state changes TO RUNNING with a file)
- current_file = self.state.gcode_file or self.state.current_print
- is_new_print = (
- self.state.state == "RUNNING"
- and self._previous_gcode_state != "RUNNING"
- and current_file
- )
- # Also detect if file changed while running (new print started)
- is_file_change = (
- self.state.state == "RUNNING"
- and current_file
- and current_file != self._previous_gcode_file
- and self._previous_gcode_file is not None
- )
- if is_new_print or is_file_change:
- # Clear any old HMS errors when a new print starts
- self.state.hms_errors = []
- if (is_new_print or is_file_change) and self.on_print_start:
- logger.info(
- f"[{self.serial_number}] PRINT START detected - file: {current_file}, "
- f"subtask: {self.state.subtask_name}, is_new: {is_new_print}, is_file_change: {is_file_change}"
- )
- self.on_print_start({
- "filename": current_file,
- "subtask_name": self.state.subtask_name,
- "raw_data": data,
- })
- # Detect print completion (FINISH = success, FAILED = error, IDLE = aborted)
- if (
- self._previous_gcode_state == "RUNNING"
- and self.state.state in ("FINISH", "FAILED", "IDLE")
- and self.on_print_complete
- ):
- if self.state.state == "FINISH":
- status = "completed"
- elif self.state.state == "FAILED":
- status = "failed"
- else:
- status = "aborted"
- logger.info(
- f"[{self.serial_number}] PRINT COMPLETE detected - state: {self.state.state}, "
- f"status: {status}, file: {self._previous_gcode_file or current_file}"
- )
- self.on_print_complete({
- "status": status,
- "filename": self._previous_gcode_file or current_file,
- "raw_data": data,
- })
- self._previous_gcode_state = self.state.state
- if current_file:
- self._previous_gcode_file = current_file
- if self.on_state_change:
- self.on_state_change(self.state)
- def _request_push_all(self):
- """Request full status update from printer."""
- if self._client:
- message = {"pushing": {"command": "pushall"}}
- self._client.publish(self.topic_publish, json.dumps(message))
- def connect(self):
- """Connect to the printer MQTT broker."""
- self._client = mqtt.Client(
- callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
- client_id=f"bambutrack_{self.serial_number}",
- protocol=mqtt.MQTTv311,
- )
- self._client.username_pw_set("bblp", self.access_code)
- self._client.on_connect = self._on_connect
- self._client.on_disconnect = self._on_disconnect
- self._client.on_message = self._on_message
- # TLS setup - Bambu uses self-signed certs
- ssl_context = ssl.create_default_context()
- ssl_context.check_hostname = False
- ssl_context.verify_mode = ssl.CERT_NONE
- self._client.tls_set_context(ssl_context)
- # Use shorter keepalive (15s) for faster disconnect detection
- # Paho considers connection lost after 1.5x keepalive with no response
- self._client.connect_async(self.ip_address, self.MQTT_PORT, keepalive=15)
- self._client.loop_start()
- def start_print(self, filename: str, plate_id: int = 1):
- """Start a print job on the printer.
- The file should already be uploaded to /cache/ on the printer via FTP.
- """
- if self._client and self.state.connected:
- # Bambu print command format
- # Based on: https://github.com/darkorb/bambu-ftp-and-print
- command = {
- "print": {
- "sequence_id": 0,
- "command": "project_file",
- "param": f"Metadata/plate_{plate_id}.gcode",
- "subtask_name": filename,
- "url": f"ftp://{filename}",
- "timelapse": False,
- "bed_leveling": True,
- "flow_cali": True,
- "vibration_cali": True,
- "layer_inspect": False,
- "use_ams": True,
- }
- }
- logger.info(f"[{self.serial_number}] Sending print command: {json.dumps(command)}")
- self._client.publish(self.topic_publish, json.dumps(command))
- return True
- return False
- def stop_print(self) -> bool:
- """Stop the current print job."""
- if self._client and self.state.connected:
- command = {
- "print": {
- "command": "stop",
- "sequence_id": "0"
- }
- }
- self._client.publish(self.topic_publish, json.dumps(command))
- logger.info(f"[{self.serial_number}] Sent stop print command")
- return True
- return False
- def disconnect(self):
- """Disconnect from the printer."""
- if self._client:
- self._client.loop_stop()
- self._client.disconnect()
- self._client = None
- self.state.connected = False
- def send_command(self, command: dict):
- """Send a command to the printer."""
- if self._client and self.state.connected:
- # Log outgoing message if logging is enabled
- if self._logging_enabled:
- self._message_log.append(MQTTLogEntry(
- timestamp=datetime.now().isoformat(),
- topic=self.topic_publish,
- direction="out",
- payload=command,
- ))
- self._client.publish(self.topic_publish, json.dumps(command))
- def enable_logging(self, enabled: bool = True):
- """Enable or disable MQTT message logging."""
- self._logging_enabled = enabled
- # Don't clear logs when stopping - user can manually clear with clear_logs()
- def get_logs(self) -> list[MQTTLogEntry]:
- """Get all logged MQTT messages."""
- return list(self._message_log)
- def clear_logs(self):
- """Clear the message log."""
- self._message_log.clear()
- @property
- def logging_enabled(self) -> bool:
- """Check if logging is enabled."""
- return self._logging_enabled
|