| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001 |
- import asyncio
- import logging
- import re
- import traceback
- from collections.abc import Callable
- from sqlalchemy import select
- from sqlalchemy.ext.asyncio import AsyncSession
- from backend.app.models.printer import Printer
- from backend.app.services.bambu_mqtt import BambuMQTTClient, MQTTLogEntry, PrinterState, get_stage_name
- logger = logging.getLogger(__name__)
- # Models that have a real chamber temperature sensor
- # Based on Home Assistant Bambu Lab integration
- # P1P/P1S and A1/A1Mini do NOT have chamber temp sensors
- # Includes both display names and internal codes from MQTT/SSDP
- CHAMBER_TEMP_SUPPORTED_MODELS = frozenset(
- [
- # Display names
- "X1",
- "X1C",
- "X1E", # X1 series
- "X2D", # X2 series
- "P2S", # P2 series
- "H2C",
- "H2D",
- "H2DPRO",
- "H2S", # H2 series
- # Internal codes (from MQTT/SSDP)
- "BL-P001", # X1/X1C
- "C13", # X1E
- "N6", # X2D
- "O1D", # H2D
- "O1C", # H2C
- "O1C2", # H2C (dual nozzle variant)
- "O1S", # H2S
- "O1E", # H2D Pro
- "O2D", # H2D Pro (alternate code)
- "N7", # P2S
- ]
- )
- # Models that may incorrectly report stg_cur=0 when idle (firmware bug)
- # Based on Home Assistant Bambu Lab integration observations
- # See: https://github.com/greghesp/ha-bambulab/blob/main/custom_components/bambu_lab/pybambu/models.py
- A1_MODELS = frozenset(
- [
- # Display names
- "A1",
- "A1 MINI",
- "A1-MINI",
- "A1MINI",
- # Internal codes (from MQTT/SSDP)
- "N1", # A1 Mini
- "N2S", # A1
- ]
- )
- # Models affected by the stg_cur=0 idle bug (firmware reports stg_cur=0 when idle,
- # which maps to "Printing" in STAGE_NAMES and overrides the correct IDLE state)
- STG_CUR_IDLE_BUG_MODELS = A1_MODELS | frozenset(
- [
- # Display names
- "P1P",
- "P1S",
- # Internal codes (from MQTT/SSDP)
- "C11", # P1P
- "C12", # P1S
- ]
- )
- def supports_chamber_temp(model: str | None) -> bool:
- """Check if a printer model has a real chamber temperature sensor.
- P1P, P1S, A1, and A1Mini do NOT have chamber temp sensors.
- The 'chamber_temper' value they report is meaningless.
- """
- if not model:
- return False
- # Normalize model name (uppercase, strip whitespace)
- model_upper = model.strip().upper()
- return model_upper in CHAMBER_TEMP_SUPPORTED_MODELS
- def has_stg_cur_idle_bug(model: str | None) -> bool:
- """Check if a printer model may incorrectly report stg_cur=0 when idle.
- Some firmware versions report stg_cur=0 (which maps to "Printing")
- even when the printer is idle. Originally observed on A1/A1 Mini via the
- Home Assistant Bambu Lab integration, also confirmed on P1S.
- """
- if not model:
- return False
- model_upper = model.strip().upper()
- return model_upper in STG_CUR_IDLE_BUG_MODELS
- def is_bed_slinger(model: str | None) -> bool:
- """Whether the printer's Z axis controls the *toolhead*, not the bed.
- Bambu's A1 family (A1, A1 Mini; internal codes N1 / N2S) are open-frame
- bed-slingers: the bed moves on Y, the toolhead moves on X+Z. On every
- other current model (X1, P1, H2, H2C, H2D, H2S, P2S, ...) the bed moves
- on Z and the toolhead is fixed in Z.
- G-code direction is opposite on these two families. `G1 Z-10` reduces
- the nozzle-bed gap on both, but on bed-on-Z machines it does so by
- moving the BED up, while on bed-slingers it does so by moving the
- TOOLHEAD down — which is what crashed the nozzle in #1334.
- """
- if not model:
- return False
- return model.strip().upper() in A1_MODELS
- # Minimum firmware versions for AMS drying support (confirmed via capture testing)
- # Keys are exact model names (upper-cased). Do NOT use substring matching — it would
- # incorrectly gate X1E (matched by "X1") and H2D Pro (matched by "H2D").
- _DRYING_MIN_FIRMWARE: dict[str, str] = {
- "H2D": "01.02.30.00",
- "H2S": "01.02.00.00",
- "X1": "01.09.00.00",
- "X1C": "01.09.00.00",
- "P1P": "01.08.00.00",
- "P1S": "01.08.00.00",
- "P2S": "01.02.00.00",
- "N7": "01.02.00.00", # P2S internal model code
- }
- # Models that definitely don't support AMS drying (no AMS 2 Pro / AMS-HT compatibility)
- _DRYING_UNSUPPORTED_MODELS = frozenset({"A1", "A1MINI", "A1-MINI", "A1 MINI", "H2C", "O1C", "O1C2", "O1S", "N1", "N2S"})
- def supports_drying(model: str | None, firmware: str | None) -> bool:
- """Check if a printer model supports AMS drying commands.
- Known models with confirmed min firmware get version-gated.
- Known unsupported models are blocked.
- All other models (H2D Pro, X1E, future models) are allowed —
- the command fails gracefully with result: "fail" if unsupported.
- """
- if not model:
- return False
- model_upper = model.strip().upper()
- if model_upper in _DRYING_UNSUPPORTED_MODELS:
- return False
- if model_upper in _DRYING_MIN_FIRMWARE:
- return bool(firmware and firmware >= _DRYING_MIN_FIRMWARE[model_upper])
- # For all other models: allow
- return True
- class PrinterInfo:
- """Basic printer info for callbacks."""
- def __init__(self, name: str, serial_number: str):
- self.name = name
- self.serial_number = serial_number
- class PrinterManager:
- """Manager for multiple printer connections."""
- def __init__(self):
- self._clients: dict[int, BambuMQTTClient] = {}
- self._models: dict[int, str | None] = {} # Cache printer models for feature detection
- self._printer_info: dict[int, PrinterInfo] = {} # Cache printer name/serial for callbacks
- self._on_print_start: Callable[[int, dict], None] | None = None
- self._on_print_complete: Callable[[int, dict], None] | None = None
- self._on_status_change: Callable[[int, PrinterState], None] | None = None
- self._on_ams_change: Callable[[int, list], None] | None = None
- self._on_layer_change: Callable[[int, int], None] | None = None
- self._on_bed_temp_update: Callable[[int, float], None] | None = None
- self._loop: asyncio.AbstractEventLoop | None = None
- # Track who started the current print (Issue #206)
- self._current_print_user: dict[int, dict] = {} # {printer_id: {"user_id": int, "username": str}}
- # Track printers awaiting plate-clear acknowledgment after a finished/failed print.
- # Persisted to DB (printers.awaiting_plate_clear) so the gate survives restarts/power
- # cycles — see issue #961. Loaded into this set at startup via load_awaiting_plate_clear_from_db().
- self._awaiting_plate_clear: set[int] = set()
- def get_printer(self, printer_id: int) -> PrinterInfo | None:
- """Get printer info by ID."""
- return self._printer_info.get(printer_id)
- def set_current_print_user(self, printer_id: int, user_id: int, username: str):
- """Track who started the current print (Issue #206)."""
- self._current_print_user[printer_id] = {"user_id": user_id, "username": username}
- def get_current_print_user(self, printer_id: int) -> dict | None:
- """Get the user who started the current print (Issue #206)."""
- return self._current_print_user.get(printer_id)
- def clear_current_print_user(self, printer_id: int):
- """Clear the current print user when print completes (Issue #206)."""
- self._current_print_user.pop(printer_id, None)
- def is_awaiting_plate_clear(self, printer_id: int) -> bool:
- """Return True when the printer finished/failed a print and is waiting for the
- user to acknowledge the plate is cleared before the queue may dispatch the next job.
- """
- return printer_id in self._awaiting_plate_clear
- def set_awaiting_plate_clear(self, printer_id: int, awaiting: bool):
- """Set/clear the awaiting-plate-clear gate and persist it to DB.
- Persisted so the gate survives Bambuddy/printer restarts (#961): after Auto Off
- cycles the printer, the printer boots into IDLE with no memory of the previous
- finish, and without persistence the queue would bypass the confirmation prompt.
- Also broadcasts an updated ``printer_status`` over the WebSocket (#1128).
- ``awaiting_plate_clear`` is a Bambuddy-side flag — toggling it does not
- produce an MQTT push from the printer, so without an explicit broadcast
- any UI subscriber that's NOT the originating tab would stay stale until
- the next coincidental status refresh. The plate-clear button on the
- printer card disappeared "immediately" only because of an optimistic
- React Query cache update on the click path; clearing the flag through
- any other route (an admin script, a second tab, an automation that
- hits ``POST /printers/{id}/clear-plate`` directly) silently broke the
- UI without it. Centralised here so every current AND future caller is
- covered without each one having to remember to broadcast.
- """
- if awaiting:
- self._awaiting_plate_clear.add(printer_id)
- else:
- self._awaiting_plate_clear.discard(printer_id)
- # Only create the coroutine when there is a loop to run it on — otherwise Python
- # emits "coroutine was never awaited" warnings (e.g. in sync unit tests).
- if self._loop and self._loop.is_running():
- self._schedule_async(self._persist_awaiting_plate_clear(printer_id, awaiting))
- self._schedule_async(self._broadcast_status_change(printer_id))
- async def _broadcast_status_change(self, printer_id: int) -> None:
- """Emit a ``printer_status`` WebSocket update for this printer (#1128).
- Used for state changes that don't come from MQTT — currently just the
- ``awaiting_plate_clear`` flag, but any future Bambuddy-side flag added
- to ``printer_state_to_dict`` should plumb through here too. The
- existing MQTT-driven broadcast in ``main.on_printer_status_change``
- deduplicates on a status_key that intentionally excludes Bambuddy
- flags (so e.g. queue-state changes don't get echoed as printer
- events), which is precisely why those flags need their own emit.
- Lazy-imports ``ws_manager`` to keep ``printer_manager`` clean of
- application-layer infra at module-import time — the broadcast is the
- only thing here that needs it.
- """
- state = self.get_status(printer_id)
- if not state:
- # Printer disconnected or unknown — nothing to broadcast. The
- # next reconnect will produce a fresh status push anyway, so the
- # UI eventually catches up without us forcing a stale snapshot
- # on subscribers now.
- return
- try:
- from backend.app.core.websocket import ws_manager
- await ws_manager.send_printer_status(
- printer_id,
- printer_state_to_dict(state, printer_id, self.get_model(printer_id)),
- )
- except Exception as e:
- logger.warning(
- "Failed to broadcast printer_status after Bambuddy-side state change for printer %d: %s",
- printer_id,
- e,
- )
- async def _persist_awaiting_plate_clear(self, printer_id: int, awaiting: bool):
- from backend.app.core.database import async_session
- try:
- async with async_session() as db:
- printer = await db.get(Printer, printer_id)
- if printer is not None:
- printer.awaiting_plate_clear = awaiting
- await db.commit()
- except Exception as e:
- logger.warning("Failed to persist awaiting_plate_clear for printer %d: %s", printer_id, e)
- async def load_awaiting_plate_clear_from_db(self):
- """Rehydrate the awaiting-plate-clear set from the printers table on startup."""
- from backend.app.core.database import async_session
- try:
- async with async_session() as db:
- result = await db.execute(select(Printer.id).where(Printer.awaiting_plate_clear.is_(True)))
- ids = {row[0] for row in result.all()}
- self._awaiting_plate_clear = ids
- if ids:
- logger.info("Loaded %d printer(s) awaiting plate-clear acknowledgment: %s", len(ids), sorted(ids))
- except Exception as e:
- logger.warning("Failed to load awaiting_plate_clear from DB: %s", e)
- def set_event_loop(self, loop: asyncio.AbstractEventLoop):
- """Set the event loop for async callbacks."""
- self._loop = loop
- def set_print_start_callback(self, callback: Callable[[int, dict], None]):
- """Set callback for print start events."""
- self._on_print_start = callback
- def set_print_complete_callback(self, callback: Callable[[int, dict], None]):
- """Set callback for print completion events."""
- self._on_print_complete = callback
- def set_status_change_callback(self, callback: Callable[[int, PrinterState], None]):
- """Set callback for status change events."""
- self._on_status_change = callback
- def set_ams_change_callback(self, callback: Callable[[int, list], None]):
- """Set callback for AMS data change events."""
- self._on_ams_change = callback
- def set_layer_change_callback(self, callback: Callable[[int, int], None]):
- """Set callback for layer change events. Receives (printer_id, layer_num)."""
- self._on_layer_change = callback
- def set_bed_temp_update_callback(self, callback: Callable[[int, float], None]):
- """Set callback for bed temperature updates. Receives (printer_id, bed_temp)."""
- self._on_bed_temp_update = callback
- def _schedule_async(self, coro):
- """Schedule an async coroutine from a sync context.
- Captures exceptions from the coroutine and logs them to prevent
- silent failures in callbacks.
- """
- if self._loop and self._loop.is_running():
- future = asyncio.run_coroutine_threadsafe(coro, self._loop)
- def handle_exception(f):
- try:
- # This will re-raise any exception from the coroutine
- f.result()
- except Exception as e:
- import logging
- logging.getLogger(__name__).error(f"Exception in scheduled callback: {e}", exc_info=True)
- future.add_done_callback(handle_exception)
- async def connect_printer(self, printer: Printer) -> bool:
- """Connect to a printer."""
- if printer.id in self._clients:
- self.disconnect_printer(printer.id)
- printer_id = printer.id
- def on_state_change(state: PrinterState):
- if self._on_status_change:
- self._schedule_async(self._on_status_change(printer_id, state))
- def on_print_start(data: dict):
- if self._on_print_start:
- self._schedule_async(self._on_print_start(printer_id, data))
- def on_print_complete(data: dict):
- if self._on_print_complete:
- self._schedule_async(self._on_print_complete(printer_id, data))
- def on_ams_change(ams_data: list):
- if self._on_ams_change:
- self._schedule_async(self._on_ams_change(printer_id, ams_data))
- def on_layer_change(layer_num: int):
- if self._on_layer_change:
- self._schedule_async(self._on_layer_change(printer_id, layer_num))
- def on_bed_temp_update(bed_temp: float):
- if self._on_bed_temp_update:
- self._schedule_async(self._on_bed_temp_update(printer_id, bed_temp))
- client = BambuMQTTClient(
- ip_address=printer.ip_address,
- serial_number=printer.serial_number,
- access_code=printer.access_code,
- model=printer.model,
- on_state_change=on_state_change,
- on_print_start=on_print_start,
- on_print_complete=on_print_complete,
- on_ams_change=on_ams_change,
- on_layer_change=on_layer_change,
- on_bed_temp_update=on_bed_temp_update,
- )
- client.connect()
- self._clients[printer_id] = client
- self._models[printer_id] = printer.model # Cache model for feature detection
- self._printer_info[printer_id] = PrinterInfo(printer.name, printer.serial_number)
- # Wait a moment for connection
- await asyncio.sleep(1)
- return client.state.connected
- def disconnect_printer(self, printer_id: int, timeout: float = 0):
- """Disconnect from a printer."""
- if printer_id in self._clients:
- self._clients[printer_id].disconnect(timeout=timeout)
- del self._clients[printer_id]
- self._models.pop(printer_id, None) # Clean up model cache
- self._printer_info.pop(printer_id, None) # Clean up printer info cache
- def disconnect_all(self, timeout: float = 0):
- """Disconnect from all printers."""
- for printer_id in list(self._clients.keys()):
- self.disconnect_printer(printer_id, timeout=timeout)
- def get_status(self, printer_id: int) -> PrinterState | None:
- """Get the current status of a printer (checks for stale connections)."""
- if printer_id in self._clients:
- client = self._clients[printer_id]
- # Check staleness and update connected state if needed
- client.check_staleness()
- return client.state
- return None
- def get_model(self, printer_id: int) -> str | None:
- """Get the cached model for a printer."""
- return self._models.get(printer_id)
- def get_all_statuses(self) -> dict[int, PrinterState]:
- """Get status of all connected printers (checks for stale connections)."""
- result = {}
- for printer_id, client in self._clients.items():
- # Check staleness and update connected state if needed
- client.check_staleness()
- result[printer_id] = client.state
- return result
- def is_connected(self, printer_id: int) -> bool:
- """Check if a printer is connected (checks for stale connections)."""
- if printer_id in self._clients:
- client = self._clients[printer_id]
- # Check staleness and update connected state if needed
- return client.check_staleness()
- return False
- def get_client(self, printer_id: int) -> BambuMQTTClient | None:
- """Get the MQTT client for a printer."""
- return self._clients.get(printer_id)
- def mark_printer_offline(self, printer_id: int):
- """Mark a printer as offline and trigger status callback.
- This is used when we know the printer power was cut (e.g., smart plug turned off)
- to immediately update the UI without waiting for MQTT timeout.
- """
- import logging
- logger = logging.getLogger(__name__)
- if printer_id in self._clients:
- client = self._clients[printer_id]
- if client.state.connected:
- logger.info("Marking printer %s as offline (smart plug power off)", printer_id)
- client.state.connected = False
- client.state.state = "unknown"
- # Trigger the status change callback to broadcast via WebSocket
- if self._on_status_change:
- self._schedule_async(self._on_status_change(printer_id, client.state))
- def start_print(
- self,
- printer_id: int,
- filename: str,
- plate_id: int = 1,
- ams_mapping: list[int] | None = None,
- bed_levelling: bool = True,
- flow_cali: bool = False,
- vibration_cali: bool = True,
- layer_inspect: bool = False,
- timelapse: bool = False,
- use_ams: bool = True,
- ) -> bool:
- """Start a print on a connected printer."""
- caller = traceback.extract_stack(limit=3)[0]
- logger.info(
- "PRINT COMMAND: printer=%s, file=%s, caller=%s:%s:%s",
- printer_id,
- filename,
- caller.filename.split("/")[-1],
- caller.lineno,
- caller.name,
- )
- if printer_id in self._clients:
- return self._clients[printer_id].start_print(
- filename,
- plate_id,
- ams_mapping=ams_mapping,
- timelapse=timelapse,
- bed_levelling=bed_levelling,
- flow_cali=flow_cali,
- vibration_cali=vibration_cali,
- layer_inspect=layer_inspect,
- use_ams=use_ams,
- )
- return False
- def stop_print(self, printer_id: int) -> bool:
- """Stop the current print on a connected printer."""
- if printer_id in self._clients:
- return self._clients[printer_id].stop_print()
- return False
- async def wait_for_cooldown(
- self,
- printer_id: int,
- target_temp: float = 50.0,
- timeout: int = 600,
- check_interval: int = 10,
- ) -> bool:
- """Wait for the nozzle to cool down to a safe temperature.
- Args:
- printer_id: The printer to monitor
- target_temp: Target temperature to wait for (default 50°C)
- timeout: Maximum seconds to wait (default 600s = 10 min)
- check_interval: Seconds between temperature checks (default 10s)
- Returns:
- True if cooled down, False if timeout or not connected
- """
- import logging
- logger = logging.getLogger(__name__)
- elapsed = 0
- while elapsed < timeout:
- state = self.get_status(printer_id)
- if not state or not state.connected:
- logger.warning("Printer %s disconnected during cooldown wait", printer_id)
- return False
- # Check nozzle temperature (and nozzle_2 for dual extruders)
- nozzle_temp = state.temperatures.get("nozzle", 0)
- nozzle_2_temp = state.temperatures.get("nozzle_2", 0)
- max_temp = max(nozzle_temp, nozzle_2_temp)
- if max_temp <= target_temp:
- logger.info("Printer %s cooled down to %s°C", printer_id, max_temp)
- return True
- logger.debug("Printer %s nozzle at %s°C, waiting for %s°C...", printer_id, max_temp, target_temp)
- await asyncio.sleep(check_interval)
- elapsed += check_interval
- logger.warning("Printer %s cooldown timeout after %ss", printer_id, timeout)
- return False
- def enable_logging(self, printer_id: int, enabled: bool = True) -> bool:
- """Enable or disable MQTT logging for a printer."""
- if printer_id in self._clients:
- self._clients[printer_id].enable_logging(enabled)
- return True
- return False
- def get_logs(self, printer_id: int) -> list[MQTTLogEntry]:
- """Get MQTT logs for a printer."""
- if printer_id in self._clients:
- return self._clients[printer_id].get_logs()
- return []
- def clear_logs(self, printer_id: int) -> bool:
- """Clear MQTT logs for a printer."""
- if printer_id in self._clients:
- self._clients[printer_id].clear_logs()
- return True
- return False
- def is_logging_enabled(self, printer_id: int) -> bool:
- """Check if logging is enabled for a printer."""
- if printer_id in self._clients:
- return self._clients[printer_id].logging_enabled
- return False
- def send_drying_command(
- self,
- printer_id: int,
- ams_id: int,
- temp: int,
- duration: int,
- mode: int = 1,
- filament: str = "",
- rotate_tray: bool = False,
- ) -> bool:
- """Send AMS drying command to printer."""
- if printer_id not in self._clients:
- return False
- return self._clients[printer_id].send_drying_command(ams_id, temp, duration, mode, filament, rotate_tray)
- def request_status_update(self, printer_id: int) -> bool:
- """Request a full status update from the printer.
- This sends a 'pushall' command to get the latest data including nozzle info.
- """
- if printer_id in self._clients:
- return self._clients[printer_id].request_status_update()
- return False
- async def test_connection(
- self,
- ip_address: str,
- serial_number: str,
- access_code: str,
- ) -> dict:
- """Test connection to a printer without persisting."""
- client = BambuMQTTClient(
- ip_address=ip_address,
- serial_number=serial_number,
- access_code=access_code,
- )
- try:
- client.connect()
- await asyncio.sleep(2)
- result = {
- "success": client.state.connected,
- "state": client.state.state if client.state.connected else None,
- "model": client.state.raw_data.get("device_model"),
- }
- finally:
- client.disconnect()
- return result
- def get_derived_status_name(state: PrinterState, model: str | None = None) -> str | None:
- """
- Compute a human-readable status name based on printer state.
- Uses stg_cur when available, otherwise derives status from temperature data
- when the printer is heating before a print starts.
- Args:
- state: The printer state to analyze
- model: Optional printer model for model-specific workarounds
- """
- # Firmware bug: some models (A1, P1P, P1S) report stg_cur=0 when not printing.
- # stg_cur=0 maps to "Printing" in STAGE_NAMES, which incorrectly overrides the
- # real state (IDLE, FINISH, FAILED, etc.). Only trust stg_cur when the printer
- # is actually in an active print state (RUNNING or PAUSE).
- if state.state not in ("RUNNING", "PAUSE") and state.stg_cur == 0 and has_stg_cur_idle_bug(model):
- return None
- # If we have a valid calibration stage, use it
- # X1 models use -1 for idle, A1/P1 models use 255 for idle
- # Valid stage numbers are 0-254
- if 0 <= state.stg_cur < 255:
- return get_stage_name(state.stg_cur)
- # If not in RUNNING state, no derived status needed
- if state.state != "RUNNING":
- return None
- # Check if we're in an early phase where temperatures are heating
- temps = state.temperatures or {}
- progress = state.progress or 0
- # Only derive heating status when progress is very low (< 2%)
- # This indicates we're in the preparation phase, not actually printing
- if progress >= 2:
- return None
- # Check bed temperature - if target is set and current is significantly below
- bed_temp = temps.get("bed", 0)
- bed_target = temps.get("bed_target", 0)
- # Check nozzle temperature
- nozzle_temp = temps.get("nozzle", 0)
- nozzle_target = temps.get("nozzle_target", 0)
- # Temperature thresholds: consider "heating" if more than 10°C below target
- TEMP_THRESHOLD = 10
- # Determine what's heating (prioritize bed since it takes longer)
- if bed_target > 30 and (bed_target - bed_temp) > TEMP_THRESHOLD:
- return "Heating heatbed"
- elif nozzle_target > 30 and (nozzle_target - nozzle_temp) > TEMP_THRESHOLD:
- return "Heating nozzle"
- # If targets are set but we're close to them, we might be in final prep
- if bed_target > 30 or nozzle_target > 30:
- if progress == 0 and state.layer_num == 0:
- return "Preparing"
- return None
- _PLATE_ID_RE = re.compile(r"plate_(\d+)\.gcode")
- def parse_plate_id(gcode_file: str | None) -> int | None:
- """Extract the 1-indexed plate number from a Bambu gcode_file path.
- Returns None when the path is missing or has no `plate_N.gcode` segment.
- Shared by the REST status route and the WebSocket push path so both agree
- on the value sent to the frontend (#881 follow-up).
- """
- if not gcode_file:
- return None
- match = _PLATE_ID_RE.search(gcode_file)
- return int(match.group(1)) if match else None
- def resolve_plate_id(state) -> int | None:
- """Resolve the active plate number from a PrinterState.
- Some firmware versions (e.g. P1S 01.10.00.00, #1166) put only the .3mf
- filename in print.gcode_file, so parse_plate_id() returns None and the
- printer card falls back to plate 1 — wrong thumbnail. When Bambuddy
- dispatched the print itself we already know the right plate, so we prefer
- that over the gcode_file echo. The subtask check prevents stale values
- from a previous Bambuddy-dispatched print bleeding into a Studio-direct
- print on the same printer.
- """
- dispatched_plate = getattr(state, "dispatched_plate_id", None)
- dispatched_subtask = getattr(state, "dispatched_subtask", None)
- if (
- dispatched_plate is not None
- and dispatched_subtask is not None
- and state.subtask_name
- and dispatched_subtask == state.subtask_name
- ):
- return dispatched_plate
- return parse_plate_id(state.gcode_file)
- def printer_state_to_dict(state: PrinterState, printer_id: int | None = None, model: str | None = None) -> dict:
- """Convert PrinterState to a JSON-serializable dict.
- Args:
- state: The printer state to convert
- printer_id: Optional printer ID for generating cover URLs
- model: Optional printer model for filtering unsupported features
- """
- # Parse AMS data from raw_data
- ams_units = []
- vt_tray = []
- raw_data = state.raw_data or {}
- # Build K-profile lookup map: cali_idx -> k_value
- kprofile_map: dict[int, float] = {}
- for kp in state.kprofiles or []:
- if kp.slot_id is not None and kp.k_value:
- try:
- kprofile_map[kp.slot_id] = float(kp.k_value)
- except (ValueError, TypeError):
- pass # Skip K-profile entries with unparseable values
- if "ams" in raw_data and isinstance(raw_data["ams"], list):
- for ams_data in raw_data["ams"]:
- trays = []
- for tray in ams_data.get("tray", []):
- tag_uid = tray.get("tag_uid")
- if tag_uid in ("", "0000000000000000"):
- tag_uid = None
- tray_uuid = tray.get("tray_uuid")
- if tray_uuid in ("", "00000000000000000000000000000000"):
- tray_uuid = None
- # Get K value: first try tray's k field, then lookup from K-profiles
- k_value = tray.get("k")
- cali_idx = tray.get("cali_idx")
- if k_value is None and cali_idx is not None and cali_idx in kprofile_map:
- k_value = kprofile_map[cali_idx]
- # P1S / A1 Mini physically-empty-slot signal (#1322 follow-up by
- # @RosdasHH): for a truly empty slot the firmware sends only
- # {"id": N} — no state, no tray_type, no anything else. Treat
- # that as the firmware's "no spool" indicator (state=9) so the
- # assign-spool path in inventory.py can short-circuit a MQTT
- # publish the firmware would silently drop anyway. The
- # post-"Reset Slot" A1 Mini BMCU case sends a populated payload
- # (state=3, tray_type="") — different shape, doesn't match this
- # guard, still attempts the MQTT push per the #1322 fix.
- state_val = tray.get("state")
- if state_val is None and len(tray) == 1 and "id" in tray:
- state_val = 9
- trays.append(
- {
- "id": int(tray.get("id", 0)),
- "tray_color": tray.get("tray_color"),
- "tray_type": tray.get("tray_type"),
- "tray_sub_brands": tray.get("tray_sub_brands"),
- "tray_id_name": tray.get("tray_id_name"),
- "tray_info_idx": tray.get("tray_info_idx"),
- "remain": tray.get("remain", 0),
- "k": k_value,
- "cali_idx": cali_idx,
- "tag_uid": tag_uid,
- "tray_uuid": tray_uuid,
- "nozzle_temp_min": tray.get("nozzle_temp_min"),
- "nozzle_temp_max": tray.get("nozzle_temp_max"),
- "drying_temp": tray.get("drying_temp"),
- "drying_time": tray.get("drying_time"),
- "state": state_val,
- }
- )
- # Prefer humidity_raw (actual percentage) over humidity (index 1-5)
- humidity_raw = ams_data.get("humidity_raw")
- humidity_idx = ams_data.get("humidity")
- humidity_value = None
- if humidity_raw is not None:
- try:
- humidity_value = int(humidity_raw)
- except (ValueError, TypeError):
- pass # Skip unparseable humidity; will try index fallback
- # Fall back to index if no raw value (index is 1-5, not percentage)
- if humidity_value is None and humidity_idx is not None:
- try:
- humidity_value = int(humidity_idx)
- except (ValueError, TypeError):
- pass # Skip unparseable humidity index; humidity remains None
- # AMS-HT has 1 tray, regular AMS has 4 trays
- is_ams_ht = len(trays) == 1
- ams_units.append(
- {
- "id": int(ams_data.get("id", 0)),
- "humidity": humidity_value,
- "temp": ams_data.get("temp"),
- "is_ams_ht": is_ams_ht,
- "tray": trays,
- # Serial number: Bambu MQTT uses "sn" key on AMS unit objects
- "serial_number": str(ams_data.get("sn") or ams_data.get("serial_number") or ""),
- # Firmware version: populated by _handle_version_info from get_version
- "sw_ver": str(ams_data.get("sw_ver") or ""),
- # Drying: dry_time > 0 means drying is active (minutes remaining)
- "dry_time": int(ams_data.get("dry_time") or 0),
- # Drying status from info hex bits (0=Off, 1=Checking, 2=Drying, 3=Cooling, etc.)
- "dry_status": int(ams_data.get("dry_status") or 0),
- "dry_sub_status": int(ams_data.get("dry_sub_status") or 0),
- # Cannot-dry reasons from firmware (e.g. 1=InsufficientPower, 8=NeedPluginPower)
- "dry_sf_reason": list(ams_data.get("dry_sf_reason") or []),
- # Module type: "ams", "n3f", "n3s" (from get_version)
- "module_type": str(ams_data.get("module_type") or ""),
- }
- )
- # Parse virtual tray (external spool) — now a list
- if "vt_tray" in raw_data:
- vt_tray_raw = raw_data["vt_tray"]
- # Defensive: MQTT sends vt_tray as a dict; normalize to list
- if isinstance(vt_tray_raw, dict):
- vt_tray_raw = [vt_tray_raw]
- elif not isinstance(vt_tray_raw, list):
- vt_tray_raw = []
- for vt_data in vt_tray_raw:
- vt_tag_uid = vt_data.get("tag_uid")
- if vt_tag_uid in ("", "0000000000000000"):
- vt_tag_uid = None
- vt_tray_uuid = vt_data.get("tray_uuid")
- if vt_tray_uuid in ("", "00000000000000000000000000000000"):
- vt_tray_uuid = None
- # Get K value for vt_tray
- vt_k_value = vt_data.get("k")
- vt_cali_idx = vt_data.get("cali_idx")
- if vt_k_value is None and vt_cali_idx is not None and vt_cali_idx in kprofile_map:
- vt_k_value = kprofile_map[vt_cali_idx]
- tray_id = int(vt_data.get("id", 254))
- vt_tray.append(
- {
- "id": tray_id,
- "tray_color": vt_data.get("tray_color"),
- "tray_type": vt_data.get("tray_type"),
- "tray_sub_brands": vt_data.get("tray_sub_brands"),
- "tray_id_name": vt_data.get("tray_id_name"),
- "tray_info_idx": vt_data.get("tray_info_idx"),
- "remain": vt_data.get("remain", 0),
- "k": vt_k_value,
- "cali_idx": vt_cali_idx,
- "tag_uid": vt_tag_uid,
- "tray_uuid": vt_tray_uuid,
- "nozzle_temp_min": vt_data.get("nozzle_temp_min"),
- "nozzle_temp_max": vt_data.get("nozzle_temp_max"),
- }
- )
- # Get ams_extruder_map from raw_data (populated by MQTT handler from AMS info field)
- ams_extruder_map = raw_data.get("ams_extruder_map", {})
- # Filter out chamber temp for models that don't have a real sensor
- # P1P, P1S, A1, A1Mini report meaningless chamber_temper values
- temperatures = state.temperatures
- if not supports_chamber_temp(model):
- temperatures = {
- k: v for k, v in temperatures.items() if k not in ("chamber", "chamber_target", "chamber_heating")
- }
- result = {
- "connected": state.connected,
- "state": state.state,
- "current_print": state.current_print,
- "subtask_name": state.subtask_name,
- "gcode_file": state.gcode_file,
- "progress": state.progress,
- "remaining_time": state.remaining_time,
- "layer_num": state.layer_num,
- "total_layers": state.total_layers,
- "temperatures": temperatures,
- "hms_errors": [
- {"code": e.code, "attr": e.attr, "module": e.module, "severity": e.severity}
- for e in (state.hms_errors or [])
- ],
- # AMS data for filament colors
- "ams": ams_units if ams_units else None,
- "vt_tray": vt_tray,
- # AMS status for filament change tracking
- "ams_status_main": state.ams_status_main,
- "ams_status_sub": state.ams_status_sub,
- "tray_now": state.tray_now,
- # Per-AMS extruder map: {ams_id: extruder_id} where 0=right, 1=left
- "ams_extruder_map": ams_extruder_map,
- # WiFi signal strength
- "wifi_signal": state.wifi_signal,
- "wired_network": state.wired_network,
- "door_open": state.door_open,
- # Calibration stage tracking
- "stg_cur": state.stg_cur,
- "stg_cur_name": get_derived_status_name(state, model),
- # Printable objects count for skip objects feature
- "printable_objects_count": len(state.printable_objects),
- # Fan speeds (0-100 percentage, None if not available)
- "cooling_fan_speed": state.cooling_fan_speed,
- "big_fan1_speed": state.big_fan1_speed,
- "big_fan2_speed": state.big_fan2_speed,
- "heatbreak_fan_speed": state.heatbreak_fan_speed,
- # Chamber light state
- "chamber_light": state.chamber_light,
- # Active extruder for dual-nozzle printers (0=right, 1=left)
- "active_extruder": state.active_extruder,
- # Print speed mode (1=silent, 2=standard, 3=sport, 4=ludicrous)
- "speed_level": state.speed_level,
- # H2C nozzle rack (tool-changer dock positions)
- # Map raw MQTT field names (type/diameter) to schema names (nozzle_type/nozzle_diameter)
- "nozzle_rack": [
- {
- "id": n.get("id", 0),
- "nozzle_type": n.get("type", ""),
- "nozzle_diameter": n.get("diameter", ""),
- "wear": n.get("wear"),
- "stat": n.get("stat"),
- "max_temp": n.get("max_temp", 0),
- "serial_number": n.get("serial_number", ""),
- "filament_color": n.get("filament_color", ""),
- "filament_id": n.get("filament_id", ""),
- }
- for n in (state.nozzle_rack or [])
- ],
- # AMS drying support
- "supports_drying": supports_drying(model, state.firmware_version),
- # 1-indexed plate number parsed from gcode_file (e.g. /Metadata/plate_2.gcode).
- # Pushed via WebSocket so the printer card picks up plate transitions within
- # a multi-plate 3MF without waiting for the 30 s REST poll (#881 follow-up).
- # current_archive_id is intentionally REST-only — it's stable for the life
- # of a print and needs a DB lookup the WebSocket path shouldn't pay for.
- "current_plate_id": resolve_plate_id(state),
- # Plate-clear gate (#939). Lives on the PrinterManager rather than PrinterState,
- # so surface it here — without this, WebSocket merges drop the flag and the
- # "Clear Plate" button only appears when the 30 s REST fallback poll runs.
- "awaiting_plate_clear": printer_manager.is_awaiting_plate_clear(printer_id) if printer_id else False,
- }
- # Add cover URL if there's an active print and printer_id is provided
- # Include PAUSE state so skip objects modal can show cover
- if printer_id and state.state in ("RUNNING", "PAUSE") and state.gcode_file:
- result["cover_url"] = f"/api/v1/printers/{printer_id}/cover"
- else:
- result["cover_url"] = None
- # Surface the display name + model so WS consumers (gcode viewer printer
- # selector) can render proper labels on the initial snapshot without racing
- # a separate /api/v1/printers fetch (#963 follow-up). PrinterInfo only
- # carries name/serial_number; the model comes through via the `model` arg.
- if printer_id:
- _printer_info = printer_manager.get_printer(printer_id)
- if _printer_info is not None:
- result["name"] = _printer_info.name
- if model:
- result["model"] = model
- return result
- # Global printer manager instance
- printer_manager = PrinterManager()
- async def init_printer_connections(db: AsyncSession):
- """Initialize connections to all active printers."""
- result = await db.execute(select(Printer).where(Printer.is_active.is_(True)))
- printers = result.scalars().all()
- for printer in printers:
- await printer_manager.connect_printer(printer)
|