|
|
@@ -4117,6 +4117,94 @@ class BambuMQTTClient:
|
|
|
|
|
|
return True
|
|
|
|
|
|
+ def load_external_filament(self) -> tuple[bool, str]:
|
|
|
+ """Load filament from the external spool holder.
|
|
|
+
|
|
|
+ Works on all printers regardless of AMS presence.
|
|
|
+ Refuses if filament is already loaded (tray_now != 255) to prevent
|
|
|
+ the firmware from auto-unloading first.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ (success, message) tuple
|
|
|
+ """
|
|
|
+ if not self._client or not self.state.connected:
|
|
|
+ logger.warning("[%s] Cannot load external filament: not connected", self.serial_number)
|
|
|
+ return False, "Printer not connected"
|
|
|
+
|
|
|
+ if self.state.tray_now != 255:
|
|
|
+ logger.info(
|
|
|
+ "[%s] Cannot load external filament: filament already loaded (tray_now=%s)",
|
|
|
+ self.serial_number, self.state.tray_now,
|
|
|
+ )
|
|
|
+ return False, "Filament is already loaded. Unload first before loading."
|
|
|
+
|
|
|
+ self._sequence_id += 1
|
|
|
+ command = {
|
|
|
+ "print": {
|
|
|
+ "command": "ams_change_filament",
|
|
|
+ "sequence_id": str(self._sequence_id),
|
|
|
+ "ams_id": 255,
|
|
|
+ "slot_id": 254,
|
|
|
+ "target": 254,
|
|
|
+ "curr_temp": -1,
|
|
|
+ "tar_temp": -1,
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ command_json = json.dumps(command)
|
|
|
+ logger.info("[%s] Publishing load external filament command: %s", self.serial_number, command_json)
|
|
|
+ self._client.publish(self.topic_publish, command_json, qos=1)
|
|
|
+
|
|
|
+ self._last_load_tray_id = 254
|
|
|
+ self.state.pending_tray_target = 254
|
|
|
+ logger.info("[%s] Set pending_tray_target=254 (external spool load)", self.serial_number)
|
|
|
+
|
|
|
+ return True, "Load external filament command sent"
|
|
|
+
|
|
|
+ def unload_external_filament(self) -> tuple[bool, str]:
|
|
|
+ """Unload filament from the external spool holder.
|
|
|
+
|
|
|
+ Works on all printers regardless of AMS presence.
|
|
|
+ Always uses ams_id=255 (external), never derives a source AMS unit.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ (success, message) tuple
|
|
|
+ """
|
|
|
+ if not self._client or not self.state.connected:
|
|
|
+ logger.warning("[%s] Cannot unload external filament: not connected", self.serial_number)
|
|
|
+ return False, "Printer not connected"
|
|
|
+
|
|
|
+ if self.state.tray_now == 255:
|
|
|
+ logger.info("[%s] Cannot unload external filament: no filament loaded", self.serial_number)
|
|
|
+ return False, "No filament is currently loaded."
|
|
|
+
|
|
|
+ nozzle_temp = int(self.state.temperatures.get("nozzle", 210))
|
|
|
+ if nozzle_temp < 180:
|
|
|
+ nozzle_temp = 210
|
|
|
+
|
|
|
+ self._sequence_id += 1
|
|
|
+ command = {
|
|
|
+ "print": {
|
|
|
+ "command": "ams_change_filament",
|
|
|
+ "sequence_id": str(self._sequence_id),
|
|
|
+ "ams_id": 255,
|
|
|
+ "slot_id": 255,
|
|
|
+ "target": 255,
|
|
|
+ "curr_temp": nozzle_temp,
|
|
|
+ "tar_temp": nozzle_temp,
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ command_json = json.dumps(command)
|
|
|
+ logger.info("[%s] Publishing unload external filament command: %s", self.serial_number, command_json)
|
|
|
+ self._client.publish(self.topic_publish, command_json, qos=1)
|
|
|
+
|
|
|
+ self._last_load_tray_id = None
|
|
|
+ self.state.pending_tray_target = None
|
|
|
+ logger.info("[%s] Cleared pending_tray_target (external spool unload)", self.serial_number)
|
|
|
+
|
|
|
+ return True, "Unload external filament command sent"
|
|
|
+
|
|
|
def ams_control(self, action: str) -> bool:
|
|
|
"""Control AMS operations.
|
|
|
|