|
@@ -54,6 +54,28 @@ class NozzleInfo:
|
|
|
nozzle_diameter: str = "" # e.g., "0.4"
|
|
nozzle_diameter: str = "" # e.g., "0.4"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
+@dataclass
|
|
|
|
|
+class PrintOptions:
|
|
|
|
|
+ """AI detection and print options from xcam data."""
|
|
|
|
|
+ # Core AI detectors
|
|
|
|
|
+ spaghetti_detector: bool = False
|
|
|
|
|
+ print_halt: bool = False
|
|
|
|
|
+ halt_print_sensitivity: str = "medium" # Spaghetti sensitivity
|
|
|
|
|
+ first_layer_inspector: bool = False
|
|
|
|
|
+ printing_monitor: bool = False # AI print quality monitoring
|
|
|
|
|
+ buildplate_marker_detector: bool = False
|
|
|
|
|
+ allow_skip_parts: bool = False
|
|
|
|
|
+ # Additional AI detectors - decoded from cfg bitmask
|
|
|
|
|
+ nozzle_clumping_detector: bool = True
|
|
|
|
|
+ nozzle_clumping_sensitivity: str = "medium"
|
|
|
|
|
+ pileup_detector: bool = True
|
|
|
|
|
+ pileup_sensitivity: str = "medium"
|
|
|
|
|
+ airprint_detector: bool = True
|
|
|
|
|
+ airprint_sensitivity: str = "medium"
|
|
|
|
|
+ auto_recovery_step_loss: bool = True # Uses print.print_option command
|
|
|
|
|
+ filament_tangle_detect: bool = False
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
@dataclass
|
|
@dataclass
|
|
|
class PrinterState:
|
|
class PrinterState:
|
|
|
connected: bool = False
|
|
connected: bool = False
|
|
@@ -71,10 +93,13 @@ class PrinterState:
|
|
|
hms_errors: list = field(default_factory=list) # List of HMSError
|
|
hms_errors: list = field(default_factory=list) # List of HMSError
|
|
|
kprofiles: list = field(default_factory=list) # List of KProfile
|
|
kprofiles: list = field(default_factory=list) # List of KProfile
|
|
|
sdcard: bool = False # SD card inserted
|
|
sdcard: bool = False # SD card inserted
|
|
|
|
|
+ store_to_sdcard: bool = False # Store sent files on SD card (home_flag bit 11)
|
|
|
timelapse: bool = False # Timelapse recording active
|
|
timelapse: bool = False # Timelapse recording active
|
|
|
ipcam: bool = False # Live view / camera streaming enabled
|
|
ipcam: bool = False # Live view / camera streaming enabled
|
|
|
# Nozzle hardware info (for dual nozzle printers, index 0 = left, 1 = right)
|
|
# Nozzle hardware info (for dual nozzle printers, index 0 = left, 1 = right)
|
|
|
nozzles: list = field(default_factory=lambda: [NozzleInfo(), NozzleInfo()])
|
|
nozzles: list = field(default_factory=lambda: [NozzleInfo(), NozzleInfo()])
|
|
|
|
|
+ # AI detection and print options
|
|
|
|
|
+ print_options: PrintOptions = field(default_factory=PrintOptions)
|
|
|
|
|
|
|
|
|
|
|
|
|
class BambuMQTTClient:
|
|
class BambuMQTTClient:
|
|
@@ -117,6 +142,11 @@ class BambuMQTTClient:
|
|
|
self._pending_kprofile_response: asyncio.Event | None = None
|
|
self._pending_kprofile_response: asyncio.Event | None = None
|
|
|
self._kprofile_response_data: list | None = None
|
|
self._kprofile_response_data: list | None = None
|
|
|
|
|
|
|
|
|
|
+ # Xcam hold timers - OrcaSlicer pattern: ignore incoming data for 3 seconds after command
|
|
|
|
|
+ # Key: module_name, Value: timestamp when command was sent
|
|
|
|
|
+ self._xcam_hold_start: dict[str, float] = {}
|
|
|
|
|
+ self._xcam_hold_time: float = 3.0 # Ignore incoming data for 3 seconds after command
|
|
|
|
|
+
|
|
|
@property
|
|
@property
|
|
|
def topic_subscribe(self) -> str:
|
|
def topic_subscribe(self) -> str:
|
|
|
return f"device/{self.serial_number}/report"
|
|
return f"device/{self.serial_number}/report"
|
|
@@ -184,15 +214,14 @@ class BambuMQTTClient:
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
logger.error(f"[{self.serial_number}] Error handling AMS data: {e}")
|
|
logger.error(f"[{self.serial_number}] Error handling AMS data: {e}")
|
|
|
|
|
|
|
|
- # Handle xcam data (camera settings) at top level
|
|
|
|
|
|
|
+ # Handle xcam data (camera settings and AI detection) at top level
|
|
|
if "xcam" in payload:
|
|
if "xcam" in payload:
|
|
|
xcam_data = payload["xcam"]
|
|
xcam_data = payload["xcam"]
|
|
|
- logger.debug(f"[{self.serial_number}] Received xcam data: {xcam_data}")
|
|
|
|
|
- if isinstance(xcam_data, dict):
|
|
|
|
|
- if "ipcam_record" in xcam_data:
|
|
|
|
|
- self.state.ipcam = xcam_data.get("ipcam_record") == "enable"
|
|
|
|
|
- if "timelapse" in xcam_data:
|
|
|
|
|
- self.state.timelapse = xcam_data.get("timelapse") == "enable"
|
|
|
|
|
|
|
+ logger.info(f"[{self.serial_number}] Received xcam data at top level: {xcam_data}")
|
|
|
|
|
+ self._parse_xcam_data(xcam_data)
|
|
|
|
|
+ # Fire state change callback for top-level xcam (not nested in "print")
|
|
|
|
|
+ if "print" not in payload and self.on_state_change:
|
|
|
|
|
+ self.on_state_change(self.state)
|
|
|
|
|
|
|
|
# Handle system responses (accessories info, etc.)
|
|
# Handle system responses (accessories info, etc.)
|
|
|
if "system" in payload:
|
|
if "system" in payload:
|
|
@@ -202,6 +231,12 @@ class BambuMQTTClient:
|
|
|
|
|
|
|
|
if "print" in payload:
|
|
if "print" in payload:
|
|
|
print_data = payload["print"]
|
|
print_data = payload["print"]
|
|
|
|
|
+
|
|
|
|
|
+ # Check if xcam is nested inside print data
|
|
|
|
|
+ if "xcam" in print_data:
|
|
|
|
|
+ logger.info(f"[{self.serial_number}] Found xcam inside print data: {print_data['xcam']}")
|
|
|
|
|
+ self._parse_xcam_data(print_data["xcam"])
|
|
|
|
|
+
|
|
|
# Log when we see gcode_state changes
|
|
# Log when we see gcode_state changes
|
|
|
if "gcode_state" in print_data:
|
|
if "gcode_state" in print_data:
|
|
|
logger.info(
|
|
logger.info(
|
|
@@ -244,6 +279,195 @@ class BambuMQTTClient:
|
|
|
# actual nozzle is 'HH01' hardened steel high-flow)
|
|
# actual nozzle is 'HH01' hardened steel high-flow)
|
|
|
logger.info(f"[{self.serial_number}] Accessories response (not used for nozzle data): {data}")
|
|
logger.info(f"[{self.serial_number}] Accessories response (not used for nozzle data): {data}")
|
|
|
|
|
|
|
|
|
|
+ def _parse_xcam_data(self, xcam_data):
|
|
|
|
|
+ """Parse xcam data for camera settings and AI detection options."""
|
|
|
|
|
+ if not isinstance(xcam_data, dict):
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ current_time = time.time()
|
|
|
|
|
+
|
|
|
|
|
+ # Helper to check if we should accept incoming value for a module
|
|
|
|
|
+ # OrcaSlicer pattern: simple hold timer, ignore ALL data for 3 seconds after command
|
|
|
|
|
+ def should_accept_value(module_name: str, incoming_value: bool) -> bool:
|
|
|
|
|
+ """Check if we should accept an incoming xcam value.
|
|
|
|
|
+
|
|
|
|
|
+ OrcaSlicer pattern: After sending a command, ignore incoming data
|
|
|
|
|
+ for 3 seconds. After that, accept whatever the printer sends.
|
|
|
|
|
+ """
|
|
|
|
|
+ if module_name not in self._xcam_hold_start:
|
|
|
|
|
+ return True # No hold timer, accept incoming
|
|
|
|
|
+
|
|
|
|
|
+ hold_start = self._xcam_hold_start[module_name]
|
|
|
|
|
+ elapsed = current_time - hold_start
|
|
|
|
|
+
|
|
|
|
|
+ if elapsed > self._xcam_hold_time:
|
|
|
|
|
+ # Hold timer expired - accept incoming and clear hold
|
|
|
|
|
+ del self._xcam_hold_start[module_name]
|
|
|
|
|
+ logger.debug(
|
|
|
|
|
+ f"[{self.serial_number}] Hold expired for {module_name}, accepting {incoming_value}"
|
|
|
|
|
+ )
|
|
|
|
|
+ return True
|
|
|
|
|
+
|
|
|
|
|
+ # Within hold period - ignore incoming data
|
|
|
|
|
+ logger.debug(
|
|
|
|
|
+ f"[{self.serial_number}] Ignoring {module_name}={incoming_value} "
|
|
|
|
|
+ f"(hold active, {elapsed:.1f}s < {self._xcam_hold_time}s)"
|
|
|
|
|
+ )
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ # Log all xcam fields for debugging
|
|
|
|
|
+ logger.debug(f"[{self.serial_number}] Parsing xcam data - all fields: {list(xcam_data.keys())}")
|
|
|
|
|
+
|
|
|
|
|
+ # The cfg bitmask contains the ACTUAL detector states - the individual boolean
|
|
|
|
|
+ # fields (spaghetti_detector, etc.) are often stale/cached.
|
|
|
|
|
+ # CFG bitmask structure (each detector uses 3 bits: [sens_low, sens_high, enabled]):
|
|
|
|
|
+ # - Bits 5-7: spaghetti_detector (sens in 5-6, enabled in 7)
|
|
|
|
|
+ # - Bits 8-10: pileup_detector (sens in 8-9, enabled in 10)
|
|
|
|
|
+ # - Bits 11-13: clump_detector/nozzle_clumping (sens in 11-12, enabled in 13)
|
|
|
|
|
+ # - Bits 14-16: airprint_detector (sens in 14-15, enabled in 16)
|
|
|
|
|
+ # Sensitivity values: 0=low, 1=medium, 2=high
|
|
|
|
|
+ if "cfg" in xcam_data:
|
|
|
|
|
+ cfg = xcam_data["cfg"]
|
|
|
|
|
+ logger.debug(f"[{self.serial_number}] xcam cfg bitmask: {cfg} (binary: {bin(cfg)})")
|
|
|
|
|
+
|
|
|
|
|
+ def decode_detector(start_bit):
|
|
|
|
|
+ """Decode a detector from cfg: returns (enabled, sensitivity_str)"""
|
|
|
|
|
+ sens_bits = (cfg >> start_bit) & 0x3
|
|
|
|
|
+ enabled = bool((cfg >> (start_bit + 2)) & 1)
|
|
|
|
|
+ sensitivity = {0: "low", 1: "medium", 2: "high"}.get(sens_bits, "medium")
|
|
|
|
|
+ return enabled, sensitivity
|
|
|
|
|
+
|
|
|
|
|
+ # Spaghetti detector (bits 5-7)
|
|
|
|
|
+ cfg_spaghetti, cfg_sensitivity = decode_detector(5)
|
|
|
|
|
+ if should_accept_value("spaghetti_detector", cfg_spaghetti):
|
|
|
|
|
+ old_value = self.state.print_options.spaghetti_detector
|
|
|
|
|
+ if cfg_spaghetti != old_value:
|
|
|
|
|
+ logger.info(f"[{self.serial_number}] spaghetti_detector changed (from cfg): {old_value} -> {cfg_spaghetti}")
|
|
|
|
|
+ self.state.print_options.spaghetti_detector = cfg_spaghetti
|
|
|
|
|
+
|
|
|
|
|
+ # Check hold timer for sensitivity before accepting
|
|
|
|
|
+ if "halt_print_sensitivity" not in self._xcam_hold_start:
|
|
|
|
|
+ if cfg_sensitivity != self.state.print_options.halt_print_sensitivity:
|
|
|
|
|
+ logger.info(
|
|
|
|
|
+ f"[{self.serial_number}] Sensitivity changed (from cfg): "
|
|
|
|
|
+ f"{self.state.print_options.halt_print_sensitivity} -> {cfg_sensitivity}"
|
|
|
|
|
+ )
|
|
|
|
|
+ self.state.print_options.halt_print_sensitivity = cfg_sensitivity
|
|
|
|
|
+ else:
|
|
|
|
|
+ hold_start = self._xcam_hold_start["halt_print_sensitivity"]
|
|
|
|
|
+ elapsed = current_time - hold_start
|
|
|
|
|
+ if elapsed <= self._xcam_hold_time:
|
|
|
|
|
+ logger.debug(
|
|
|
|
|
+ f"[{self.serial_number}] Ignoring cfg sensitivity={cfg_sensitivity} "
|
|
|
|
|
+ f"(hold active, {elapsed:.1f}s < {self._xcam_hold_time}s)"
|
|
|
|
|
+ )
|
|
|
|
|
+ else:
|
|
|
|
|
+ # Hold expired - accept from cfg
|
|
|
|
|
+ if cfg_sensitivity != self.state.print_options.halt_print_sensitivity:
|
|
|
|
|
+ logger.info(
|
|
|
|
|
+ f"[{self.serial_number}] Sensitivity synced (from cfg after hold): "
|
|
|
|
|
+ f"{self.state.print_options.halt_print_sensitivity} -> {cfg_sensitivity}"
|
|
|
|
|
+ )
|
|
|
|
|
+ self.state.print_options.halt_print_sensitivity = cfg_sensitivity
|
|
|
|
|
+ del self._xcam_hold_start["halt_print_sensitivity"]
|
|
|
|
|
+
|
|
|
|
|
+ # Pileup detector (bits 8-10)
|
|
|
|
|
+ cfg_pileup, cfg_pileup_sens = decode_detector(8)
|
|
|
|
|
+ if should_accept_value("pileup_detector", cfg_pileup):
|
|
|
|
|
+ if cfg_pileup != self.state.print_options.pileup_detector:
|
|
|
|
|
+ logger.info(f"[{self.serial_number}] pileup_detector changed (from cfg): {self.state.print_options.pileup_detector} -> {cfg_pileup}")
|
|
|
|
|
+ self.state.print_options.pileup_detector = cfg_pileup
|
|
|
|
|
+ # Pileup sensitivity with hold timer
|
|
|
|
|
+ if "pileup_sensitivity" not in self._xcam_hold_start:
|
|
|
|
|
+ if cfg_pileup_sens != self.state.print_options.pileup_sensitivity:
|
|
|
|
|
+ logger.info(f"[{self.serial_number}] pileup_sensitivity changed (from cfg): {self.state.print_options.pileup_sensitivity} -> {cfg_pileup_sens}")
|
|
|
|
|
+ self.state.print_options.pileup_sensitivity = cfg_pileup_sens
|
|
|
|
|
+ else:
|
|
|
|
|
+ hold_start = self._xcam_hold_start["pileup_sensitivity"]
|
|
|
|
|
+ elapsed = current_time - hold_start
|
|
|
|
|
+ if elapsed > self._xcam_hold_time:
|
|
|
|
|
+ if cfg_pileup_sens != self.state.print_options.pileup_sensitivity:
|
|
|
|
|
+ logger.info(f"[{self.serial_number}] pileup_sensitivity synced (from cfg after hold): {self.state.print_options.pileup_sensitivity} -> {cfg_pileup_sens}")
|
|
|
|
|
+ self.state.print_options.pileup_sensitivity = cfg_pileup_sens
|
|
|
|
|
+ del self._xcam_hold_start["pileup_sensitivity"]
|
|
|
|
|
+
|
|
|
|
|
+ # Clump/nozzle clumping detector (bits 11-13)
|
|
|
|
|
+ cfg_clump, cfg_clump_sens = decode_detector(11)
|
|
|
|
|
+ if should_accept_value("clump_detector", cfg_clump):
|
|
|
|
|
+ if cfg_clump != self.state.print_options.nozzle_clumping_detector:
|
|
|
|
|
+ logger.info(f"[{self.serial_number}] nozzle_clumping_detector changed (from cfg): {self.state.print_options.nozzle_clumping_detector} -> {cfg_clump}")
|
|
|
|
|
+ self.state.print_options.nozzle_clumping_detector = cfg_clump
|
|
|
|
|
+ # Clump sensitivity with hold timer
|
|
|
|
|
+ if "nozzle_clumping_sensitivity" not in self._xcam_hold_start:
|
|
|
|
|
+ if cfg_clump_sens != self.state.print_options.nozzle_clumping_sensitivity:
|
|
|
|
|
+ logger.info(f"[{self.serial_number}] nozzle_clumping_sensitivity changed (from cfg): {self.state.print_options.nozzle_clumping_sensitivity} -> {cfg_clump_sens}")
|
|
|
|
|
+ self.state.print_options.nozzle_clumping_sensitivity = cfg_clump_sens
|
|
|
|
|
+ else:
|
|
|
|
|
+ hold_start = self._xcam_hold_start["nozzle_clumping_sensitivity"]
|
|
|
|
|
+ elapsed = current_time - hold_start
|
|
|
|
|
+ if elapsed > self._xcam_hold_time:
|
|
|
|
|
+ if cfg_clump_sens != self.state.print_options.nozzle_clumping_sensitivity:
|
|
|
|
|
+ logger.info(f"[{self.serial_number}] nozzle_clumping_sensitivity synced (from cfg after hold): {self.state.print_options.nozzle_clumping_sensitivity} -> {cfg_clump_sens}")
|
|
|
|
|
+ self.state.print_options.nozzle_clumping_sensitivity = cfg_clump_sens
|
|
|
|
|
+ del self._xcam_hold_start["nozzle_clumping_sensitivity"]
|
|
|
|
|
+
|
|
|
|
|
+ # Airprint detector (bits 14-16)
|
|
|
|
|
+ cfg_airprint, cfg_airprint_sens = decode_detector(14)
|
|
|
|
|
+ if should_accept_value("airprint_detector", cfg_airprint):
|
|
|
|
|
+ if cfg_airprint != self.state.print_options.airprint_detector:
|
|
|
|
|
+ logger.info(f"[{self.serial_number}] airprint_detector changed (from cfg): {self.state.print_options.airprint_detector} -> {cfg_airprint}")
|
|
|
|
|
+ self.state.print_options.airprint_detector = cfg_airprint
|
|
|
|
|
+ # Airprint sensitivity with hold timer
|
|
|
|
|
+ if "airprint_sensitivity" not in self._xcam_hold_start:
|
|
|
|
|
+ if cfg_airprint_sens != self.state.print_options.airprint_sensitivity:
|
|
|
|
|
+ logger.info(f"[{self.serial_number}] airprint_sensitivity changed (from cfg): {self.state.print_options.airprint_sensitivity} -> {cfg_airprint_sens}")
|
|
|
|
|
+ self.state.print_options.airprint_sensitivity = cfg_airprint_sens
|
|
|
|
|
+ else:
|
|
|
|
|
+ hold_start = self._xcam_hold_start["airprint_sensitivity"]
|
|
|
|
|
+ elapsed = current_time - hold_start
|
|
|
|
|
+ if elapsed > self._xcam_hold_time:
|
|
|
|
|
+ if cfg_airprint_sens != self.state.print_options.airprint_sensitivity:
|
|
|
|
|
+ logger.info(f"[{self.serial_number}] airprint_sensitivity synced (from cfg after hold): {self.state.print_options.airprint_sensitivity} -> {cfg_airprint_sens}")
|
|
|
|
|
+ self.state.print_options.airprint_sensitivity = cfg_airprint_sens
|
|
|
|
|
+ del self._xcam_hold_start["airprint_sensitivity"]
|
|
|
|
|
+
|
|
|
|
|
+ # Camera settings
|
|
|
|
|
+ if "ipcam_record" in xcam_data:
|
|
|
|
|
+ self.state.ipcam = xcam_data.get("ipcam_record") == "enable"
|
|
|
|
|
+ if "timelapse" in xcam_data:
|
|
|
|
|
+ self.state.timelapse = xcam_data.get("timelapse") == "enable"
|
|
|
|
|
+
|
|
|
|
|
+ # Skip spaghetti_detector boolean field - we read from cfg bitmask above
|
|
|
|
|
+ if "print_halt" in xcam_data:
|
|
|
|
|
+ self.state.print_options.print_halt = bool(xcam_data.get("print_halt"))
|
|
|
|
|
+ # Skip halt_print_sensitivity field - it's always stale ("medium")
|
|
|
|
|
+ # We read the actual sensitivity from cfg bits 5-6 above
|
|
|
|
|
+ if "first_layer_inspector" in xcam_data:
|
|
|
|
|
+ new_value = bool(xcam_data.get("first_layer_inspector"))
|
|
|
|
|
+ if should_accept_value("first_layer_inspector", new_value):
|
|
|
|
|
+ self.state.print_options.first_layer_inspector = new_value
|
|
|
|
|
+ if "printing_monitor" in xcam_data:
|
|
|
|
|
+ new_value = bool(xcam_data.get("printing_monitor"))
|
|
|
|
|
+ if should_accept_value("printing_monitor", new_value):
|
|
|
|
|
+ self.state.print_options.printing_monitor = new_value
|
|
|
|
|
+ if "buildplate_marker_detector" in xcam_data:
|
|
|
|
|
+ new_value = bool(xcam_data.get("buildplate_marker_detector"))
|
|
|
|
|
+ if should_accept_value("buildplate_marker_detector", new_value):
|
|
|
|
|
+ self.state.print_options.buildplate_marker_detector = new_value
|
|
|
|
|
+ if "allow_skip_parts" in xcam_data:
|
|
|
|
|
+ new_value = bool(xcam_data.get("allow_skip_parts"))
|
|
|
|
|
+ if should_accept_value("allow_skip_parts", new_value):
|
|
|
|
|
+ self.state.print_options.allow_skip_parts = new_value
|
|
|
|
|
+
|
|
|
|
|
+ # Additional AI detectors - these are decoded from cfg bitmask above, not from
|
|
|
|
|
+ # individual boolean fields (which are not sent by the printer)
|
|
|
|
|
+ # pileup_detector, nozzle_clumping_detector, airprint_detector - from cfg
|
|
|
|
|
+ # auto_recovery_step_loss and filament_tangle_detect - tracked locally only
|
|
|
|
|
+ if "auto_recovery_step_loss" in xcam_data:
|
|
|
|
|
+ self.state.print_options.auto_recovery_step_loss = bool(xcam_data.get("auto_recovery_step_loss"))
|
|
|
|
|
+ if "filament_tangle_detect" in xcam_data:
|
|
|
|
|
+ self.state.print_options.filament_tangle_detect = bool(xcam_data.get("filament_tangle_detect"))
|
|
|
|
|
+
|
|
|
def _handle_ams_data(self, ams_data):
|
|
def _handle_ams_data(self, ams_data):
|
|
|
"""Handle AMS data changes for Spoolman integration.
|
|
"""Handle AMS data changes for Spoolman integration.
|
|
|
|
|
|
|
@@ -380,6 +604,18 @@ class BambuMQTTClient:
|
|
|
if "sdcard" in data:
|
|
if "sdcard" in data:
|
|
|
self.state.sdcard = data["sdcard"] is True
|
|
self.state.sdcard = data["sdcard"] is True
|
|
|
|
|
|
|
|
|
|
+ # Parse home_flag for "Store Sent Files on External Storage" setting (bit 11)
|
|
|
|
|
+ if "home_flag" in data:
|
|
|
|
|
+ home_flag = data["home_flag"]
|
|
|
|
|
+ # Bit 11 controls "Store Sent Files on External Storage"
|
|
|
|
|
+ # Convert to unsigned 32-bit if negative
|
|
|
|
|
+ if home_flag < 0:
|
|
|
|
|
+ home_flag = home_flag & 0xFFFFFFFF
|
|
|
|
|
+ store_to_sdcard = bool((home_flag >> 11) & 1)
|
|
|
|
|
+ if store_to_sdcard != self.state.store_to_sdcard:
|
|
|
|
|
+ logger.info(f"[{self.serial_number}] store_to_sdcard changed: {self.state.store_to_sdcard} -> {store_to_sdcard}")
|
|
|
|
|
+ self.state.store_to_sdcard = store_to_sdcard
|
|
|
|
|
+
|
|
|
# Parse timelapse status (recording active during print)
|
|
# Parse timelapse status (recording active during print)
|
|
|
if "timelapse" in data:
|
|
if "timelapse" in data:
|
|
|
logger.debug(f"[{self.serial_number}] timelapse field: {data['timelapse']}")
|
|
logger.debug(f"[{self.serial_number}] timelapse field: {data['timelapse']}")
|
|
@@ -651,6 +887,148 @@ class BambuMQTTClient:
|
|
|
return True
|
|
return True
|
|
|
return False
|
|
return False
|
|
|
|
|
|
|
|
|
|
+ def set_xcam_option(
|
|
|
|
|
+ self,
|
|
|
|
|
+ module_name: str,
|
|
|
|
|
+ enabled: bool,
|
|
|
|
|
+ print_halt: bool = True,
|
|
|
|
|
+ sensitivity: str = "medium"
|
|
|
|
|
+ ) -> bool:
|
|
|
|
|
+ """Set an xcam (AI detection) option on the printer.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ module_name: The xcam module to control (e.g., "spaghetti_detector",
|
|
|
|
|
+ "first_layer_inspector", "printing_monitor", "buildplate_marker_detector")
|
|
|
|
|
+ enabled: Whether to enable or disable the feature
|
|
|
|
|
+ print_halt: Whether to halt print on detection (only applies to some detectors)
|
|
|
|
|
+ sensitivity: Sensitivity level ("low", "medium", "high", or "never_halt")
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ True if command was sent, False if not connected
|
|
|
|
|
+ """
|
|
|
|
|
+ if not self._client or not self.state.connected:
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ # auto_recovery_step_loss uses a different command format (print.print_option)
|
|
|
|
|
+ if module_name == "auto_recovery_step_loss":
|
|
|
|
|
+ return self._set_print_option("auto_recovery", enabled)
|
|
|
|
|
+
|
|
|
|
|
+ self._sequence_id += 1
|
|
|
|
|
+
|
|
|
|
|
+ # Build the xcam control command (exact OrcaSlicer format)
|
|
|
|
|
+ # Key findings from OrcaSlicer source:
|
|
|
|
|
+ # - Uses "xcam" wrapper (not "print")
|
|
|
|
|
+ # - print_halt is ALWAYS true (legacy protocol requirement)
|
|
|
|
|
+ # - Both "control" and "enable" are set to the same value
|
|
|
|
|
+ # - halt_print_sensitivity controls actual halt behavior
|
|
|
|
|
+ command = {
|
|
|
|
|
+ "xcam": {
|
|
|
|
|
+ "command": "xcam_control_set",
|
|
|
|
|
+ "sequence_id": str(self._sequence_id),
|
|
|
|
|
+ "module_name": module_name,
|
|
|
|
|
+ "control": enabled,
|
|
|
|
|
+ "enable": enabled, # old protocol compatibility
|
|
|
|
|
+ "print_halt": True, # ALWAYS true per OrcaSlicer
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ # Only add sensitivity if not "never_halt"
|
|
|
|
|
+ # OrcaSlicer uses halt_print_sensitivity for ALL detectors
|
|
|
|
|
+ # The module_name field determines which detector's sensitivity is being set
|
|
|
|
|
+ if sensitivity and sensitivity != "never_halt":
|
|
|
|
|
+ command["xcam"]["halt_print_sensitivity"] = sensitivity
|
|
|
|
|
+
|
|
|
|
|
+ command_json = json.dumps(command)
|
|
|
|
|
+ self._client.publish(self.topic_publish, command_json, qos=1)
|
|
|
|
|
+ logger.info(f"[{self.serial_number}] Set xcam option: {module_name}={enabled}, sensitivity={sensitivity}")
|
|
|
|
|
+ logger.debug(f"[{self.serial_number}] MQTT command sent: {command_json}")
|
|
|
|
|
+
|
|
|
|
|
+ # OrcaSlicer pattern: Set hold timer to ignore incoming data for 3 seconds
|
|
|
|
|
+ # This prevents stale MQTT data from immediately overwriting our change
|
|
|
|
|
+ self._xcam_hold_start[module_name] = time.time()
|
|
|
|
|
+
|
|
|
|
|
+ # Update local state immediately for responsive UI
|
|
|
|
|
+ # NOTE: Spaghetti and Pileup sensitivities are linked in firmware
|
|
|
|
|
+ # When spaghetti_detector sensitivity is changed, pileup also changes
|
|
|
|
|
+ if module_name == "spaghetti_detector":
|
|
|
|
|
+ self.state.print_options.spaghetti_detector = enabled
|
|
|
|
|
+ self.state.print_options.print_halt = print_halt
|
|
|
|
|
+ if sensitivity and sensitivity != "never_halt":
|
|
|
|
|
+ # spaghetti_detector controls BOTH spaghetti and pileup sensitivities
|
|
|
|
|
+ self.state.print_options.halt_print_sensitivity = sensitivity
|
|
|
|
|
+ self.state.print_options.pileup_sensitivity = sensitivity
|
|
|
|
|
+ self._xcam_hold_start["halt_print_sensitivity"] = time.time()
|
|
|
|
|
+ self._xcam_hold_start["pileup_sensitivity"] = time.time()
|
|
|
|
|
+ elif module_name == "first_layer_inspector":
|
|
|
|
|
+ self.state.print_options.first_layer_inspector = enabled
|
|
|
|
|
+ elif module_name == "printing_monitor":
|
|
|
|
|
+ self.state.print_options.printing_monitor = enabled
|
|
|
|
|
+ elif module_name == "buildplate_marker_detector":
|
|
|
|
|
+ self.state.print_options.buildplate_marker_detector = enabled
|
|
|
|
|
+ elif module_name == "allow_skip_parts":
|
|
|
|
|
+ self.state.print_options.allow_skip_parts = enabled
|
|
|
|
|
+ elif module_name == "pileup_detector":
|
|
|
|
|
+ self.state.print_options.pileup_detector = enabled
|
|
|
|
|
+ # Pileup sensitivity is linked to spaghetti - both are set via spaghetti_detector
|
|
|
|
|
+ elif module_name == "clump_detector":
|
|
|
|
|
+ self.state.print_options.nozzle_clumping_detector = enabled
|
|
|
|
|
+ if sensitivity and sensitivity != "never_halt":
|
|
|
|
|
+ self.state.print_options.nozzle_clumping_sensitivity = sensitivity
|
|
|
|
|
+ self._xcam_hold_start["nozzle_clumping_sensitivity"] = time.time()
|
|
|
|
|
+ elif module_name == "airprint_detector":
|
|
|
|
|
+ self.state.print_options.airprint_detector = enabled
|
|
|
|
|
+ if sensitivity and sensitivity != "never_halt":
|
|
|
|
|
+ self.state.print_options.airprint_sensitivity = sensitivity
|
|
|
|
|
+ self._xcam_hold_start["airprint_sensitivity"] = time.time()
|
|
|
|
|
+ elif module_name == "auto_recovery_step_loss":
|
|
|
|
|
+ self.state.print_options.auto_recovery_step_loss = enabled
|
|
|
|
|
+
|
|
|
|
|
+ return True
|
|
|
|
|
+
|
|
|
|
|
+ def _set_print_option(self, option_name: str, enabled: bool) -> bool:
|
|
|
|
|
+ """Set a print option using the print.print_option command.
|
|
|
|
|
+
|
|
|
|
|
+ This is different from xcam_control_set and is used for options like:
|
|
|
|
|
+ - auto_recovery
|
|
|
|
|
+ - air_print_detect
|
|
|
|
|
+ - filament_tangle_detect
|
|
|
|
|
+ - nozzle_blob_detect
|
|
|
|
|
+ - sound_enable
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ option_name: The option to control (e.g., "auto_recovery")
|
|
|
|
|
+ enabled: Whether to enable or disable the option
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ True if command was sent, False if not connected
|
|
|
|
|
+ """
|
|
|
|
|
+ if not self._client or not self.state.connected:
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ self._sequence_id += 1
|
|
|
|
|
+
|
|
|
|
|
+ command = {
|
|
|
|
|
+ "print": {
|
|
|
|
|
+ "command": "print_option",
|
|
|
|
|
+ "sequence_id": str(self._sequence_id),
|
|
|
|
|
+ option_name: enabled,
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ command_json = json.dumps(command)
|
|
|
|
|
+ self._client.publish(self.topic_publish, command_json, qos=1)
|
|
|
|
|
+ logger.info(f"[{self.serial_number}] Set print option: {option_name}={enabled}")
|
|
|
|
|
+
|
|
|
|
|
+ # Set hold timer
|
|
|
|
|
+ hold_key = f"print_option_{option_name}"
|
|
|
|
|
+ self._xcam_hold_start[hold_key] = time.time()
|
|
|
|
|
+
|
|
|
|
|
+ # Update local state immediately
|
|
|
|
|
+ if option_name == "auto_recovery":
|
|
|
|
|
+ self.state.print_options.auto_recovery_step_loss = enabled
|
|
|
|
|
+
|
|
|
|
|
+ return True
|
|
|
|
|
+
|
|
|
def disconnect(self):
|
|
def disconnect(self):
|
|
|
"""Disconnect from the printer."""
|
|
"""Disconnect from the printer."""
|
|
|
if self._client:
|
|
if self._client:
|