| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406 |
- """Print scheduler service - processes the print queue."""
- import asyncio
- import json
- import logging
- import time
- from datetime import datetime, timezone
- from pathlib import Path
- from sqlalchemy import func, select
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy.orm import selectinload
- from backend.app.core.config import settings
- from backend.app.core.database import async_session, run_with_retry
- from backend.app.models.archive import PrintArchive
- from backend.app.models.library import LibraryFile
- from backend.app.models.print_queue import PrintQueueItem
- from backend.app.models.printer import Printer
- from backend.app.models.settings import Settings
- from backend.app.models.smart_plug import SmartPlug
- from backend.app.models.spool_assignment import SpoolAssignment
- from backend.app.models.spoolman_slot_assignment import SpoolmanSlotAssignment
- from backend.app.services.bambu_ftp import (
- cache_3mf_download,
- delete_file_async,
- get_ftp_retry_settings,
- upload_file_async,
- with_ftp_retry,
- )
- from backend.app.services.filament_deficit import compute_deficit_for_queue_item
- from backend.app.services.notification_service import notification_service
- from backend.app.services.printer_manager import printer_manager, supports_drying
- from backend.app.services.smart_plug_manager import smart_plug_manager
- from backend.app.utils.printer_models import normalize_printer_model
- logger = logging.getLogger(__name__)
- # Bambu firmware states that mean the project_file has actually been accepted
- # and the printer is now processing / running / paused mid-print. Used by the
- # dispatch watchdog (#1370): a transition into one of these states means the
- # print landed, anything else (e.g. FINISH -> IDLE after the user dismisses
- # a post-print prompt) is NOT a valid "command landed" signal even though the
- # state value did change. SLICING is included because some firmwares park
- # briefly in SLICING between PREPARE and RUNNING while parsing the g-code.
- _ACTIVE_PRINT_STATES: frozenset[str] = frozenset({"PREPARE", "SLICING", "RUNNING", "PAUSE"})
- # Filament type equivalence groups — types within the same group are
- # interchangeable on the printer side (Bambu Lab firmware treats them as compatible).
- _FILAMENT_TYPE_GROUPS: list[list[str]] = [
- ["PA-CF", "PA12-CF", "PAHT-CF"],
- ]
- _FILAMENT_EQUIV_MAP: dict[str, str] = {}
- for _group in _FILAMENT_TYPE_GROUPS:
- _canonical = _group[0].upper()
- for _t in _group:
- _FILAMENT_EQUIV_MAP[_t.upper()] = _canonical
- def _canonical_filament_type(ftype: str) -> str:
- """Return canonical type for equivalence matching."""
- upper = ftype.upper()
- return _FILAMENT_EQUIV_MAP.get(upper, upper)
- class PrintScheduler:
- """Background scheduler that processes the print queue."""
- # Built-in drying presets per filament type (from BambuStudio filament profiles)
- # Format: { n3f_temp, n3s_temp, n3f_hours, n3s_hours }
- DEFAULT_DRYING_PRESETS: dict[str, dict[str, int]] = {
- "PLA": {"n3f": 45, "n3s": 45, "n3f_hours": 12, "n3s_hours": 12},
- "PETG": {"n3f": 65, "n3s": 65, "n3f_hours": 12, "n3s_hours": 12},
- "TPU": {"n3f": 65, "n3s": 75, "n3f_hours": 12, "n3s_hours": 18},
- "ABS": {"n3f": 65, "n3s": 80, "n3f_hours": 12, "n3s_hours": 8},
- "ASA": {"n3f": 65, "n3s": 80, "n3f_hours": 12, "n3s_hours": 8},
- "PA": {"n3f": 65, "n3s": 85, "n3f_hours": 12, "n3s_hours": 12},
- "PC": {"n3f": 65, "n3s": 80, "n3f_hours": 12, "n3s_hours": 8},
- "PVA": {"n3f": 65, "n3s": 85, "n3f_hours": 12, "n3s_hours": 18},
- }
- def __init__(self):
- self._running = False
- self._check_interval = 30 # seconds
- self._power_on_wait_time = 180 # seconds to wait for printer after power on (3 min)
- self._power_on_check_interval = 10 # seconds between connection checks
- self._min_drying_seconds = 1800 # 30 minutes minimum before humidity re-check can stop drying
- # Track which printers are currently auto-drying (printer_id -> start timestamp)
- self._drying_in_progress: dict[int, float] = {}
- # Defensive in-memory dispatch hold (#1157): a printer that just received
- # a project_file command must not get a second dispatch until either it
- # transitions out of pre_state OR the hard timeout expires. The H2D Pro
- # can take 80–210 s to flip FINISH→PREPARE after project_file, and
- # during that window the DB busy_printers seed is empirically unreliable
- # (multi-plate batches double-/triple-dispatched onto the same printer
- # 30 s apart). Keyed by printer_id; cleared by the watchdog on success
- # or revert.
- # printer_id -> (monotonic_started_at, pre_state, pre_subtask_id)
- self._dispatch_holds: dict[int, tuple[float, str, str | None]] = {}
- # Minimum cooldown between dispatches to the same printer (covers the
- # H2D's project_file digestion window).
- self._dispatch_min_cooldown = 60.0
- # Hard timeout — drop the hold even if we never observed a transition,
- # so a lost MQTT session can't lock a printer out of the queue forever.
- # Matches the watchdog timeout (90 s) plus a safety margin so the
- # watchdog runs first on the unhappy path.
- self._dispatch_max_hold = 180.0
- async def run(self):
- """Main loop - check queue every interval."""
- self._running = True
- logger.info("Print scheduler started")
- while self._running:
- try:
- await self.check_queue()
- except Exception as e:
- logger.error("Scheduler error: %s", e)
- await asyncio.sleep(self._check_interval)
- def stop(self):
- """Stop the scheduler."""
- self._running = False
- logger.info("Print scheduler stopped")
- async def check_queue(self):
- """Check for prints ready to start."""
- async with async_session() as db:
- # Check if shortest-job-first scheduling is enabled
- sjf_enabled = await self._get_bool_setting(db, "queue_shortest_first")
- # Get all pending items, ordered by printer and position (or SJF order)
- if sjf_enabled:
- # SJF: group by printer (and target_model for model-based jobs),
- # then items already jumped get top priority (starvation guard),
- # then sort by print_time ascending. Items with no print time go last.
- result = await db.execute(
- select(PrintQueueItem)
- .where(PrintQueueItem.status == "pending")
- .order_by(
- PrintQueueItem.printer_id,
- PrintQueueItem.target_model,
- PrintQueueItem.been_jumped.desc(),
- PrintQueueItem.print_time_seconds.asc().nullslast(),
- PrintQueueItem.position,
- )
- )
- else:
- result = await db.execute(
- select(PrintQueueItem)
- .where(PrintQueueItem.status == "pending")
- .order_by(PrintQueueItem.printer_id, PrintQueueItem.position)
- )
- items = list(result.scalars().all())
- # Read plate-clear setting once per queue check
- require_plate_clear = await self._get_bool_setting(db, "require_plate_clear", default=True)
- if not items:
- # No pending items — still check auto-drying on idle printers
- await self._check_auto_drying(db, [], set(), require_plate_clear=require_plate_clear)
- return
- logger.info(
- "Queue check: found %d pending items: %s",
- len(items),
- [(i.id, i.printer_id, i.archive_id, i.library_file_id) for i in items],
- )
- # Seed busy_printers with printers that already have an item in 'printing'
- # status. _is_printer_idle() alone is not sufficient as a dispatch gate —
- # on H2D / P1 series the MQTT state transition from IDLE to RUNNING can
- # lag several seconds behind the print command, so the next check_queue
- # tick still sees IDLE and would double-dispatch onto the same printer.
- # Without this guard, two pending items targeting the same printer
- # (e.g. a batch with quantity>1) both end up in 'printing' status —
- # surfaced via the "BUG: Multiple queue items" warning in on_print_complete.
- busy_result = await db.execute(
- select(PrintQueueItem.printer_id)
- .where(PrintQueueItem.status == "printing")
- .where(PrintQueueItem.printer_id.is_not(None))
- )
- busy_printers: set[int] = {pid for (pid,) in busy_result.all() if pid is not None}
- # Defense-in-depth (#1157): augment busy_printers with any printer
- # still in its post-dispatch hold window. Empirically, the DB seed
- # above can miss in-flight items in a multi-plate batch — same-file
- # plates were being dispatched 30 s apart while the H2D was still
- # digesting the first project_file. The hold is keyed in-memory and
- # released by the watchdog on the success path, so it adds a layer
- # that doesn't depend on DB row visibility or completion-callback
- # timing.
- for held_printer_id in list(self._dispatch_holds.keys()):
- if self._printer_in_dispatch_hold(held_printer_id):
- busy_printers.add(held_printer_id)
- # Log skip reasons once per queue check (not per item)
- skip_reasons: dict[str, int] = {}
- for item in items:
- # Check scheduled time first (scheduled_time is stored in UTC from ISO string)
- if item.scheduled_time:
- sched = item.scheduled_time
- if sched.tzinfo is None:
- sched = sched.replace(tzinfo=timezone.utc)
- if sched > datetime.now(timezone.utc):
- skip_reasons["scheduled_future"] = skip_reasons.get("scheduled_future", 0) + 1
- continue
- # Skip items that require manual start
- if item.manual_start:
- skip_reasons["manual_start"] = skip_reasons.get("manual_start", 0) + 1
- continue
- if item.printer_id:
- # Specific printer assignment (existing behavior)
- if item.printer_id in busy_printers:
- continue
- # Check if printer is idle
- printer_idle = self._is_printer_idle(item.printer_id, require_plate_clear)
- printer_connected = printer_manager.is_connected(item.printer_id)
- # If printer not connected, try to power on via smart plug
- if not printer_connected:
- plugs = await self._get_smart_plugs(db, item.printer_id)
- auto_on_plugs = [p for p in plugs if p.auto_on and p.enabled]
- if auto_on_plugs:
- logger.info("Printer %s offline, attempting to power on via smart plug(s)", item.printer_id)
- # Power on using the first auto_on plug (the printer power plug)
- powered_on = await self._power_on_and_wait(auto_on_plugs[0], item.printer_id, db)
- if powered_on:
- # Also turn on any remaining auto_on plugs (e.g., filter)
- for extra_plug in auto_on_plugs[1:]:
- try:
- service = await smart_plug_manager.get_service_for_plug(extra_plug, db)
- await service.turn_on(extra_plug)
- logger.info(
- "Also powered on plug '%s' for printer %s", extra_plug.name, item.printer_id
- )
- except Exception as e:
- logger.warning("Failed to power on extra plug '%s': %s", extra_plug.name, e)
- printer_connected = True
- printer_idle = self._is_printer_idle(item.printer_id, require_plate_clear)
- else:
- logger.warning("Could not power on printer %s via smart plug", item.printer_id)
- busy_printers.add(item.printer_id)
- continue
- else:
- # No plug or auto_on disabled
- busy_printers.add(item.printer_id)
- continue
- # Check if printer is idle (busy with another print)
- if not printer_idle:
- # If printer is drying (not truly busy), handle based on queue_drying_block
- if self._drying_in_progress.get(item.printer_id):
- block_for_drying = await self._get_bool_setting(db, "queue_drying_block")
- if block_for_drying:
- # Drying blocks queue — skip this printer
- busy_printers.add(item.printer_id)
- continue
- else:
- # Print takes priority — stop drying
- await self._stop_drying(item.printer_id)
- # Re-check idle after stopping drying
- printer_idle = self._is_printer_idle(item.printer_id, require_plate_clear)
- if not printer_idle:
- busy_printers.add(item.printer_id)
- continue
- else:
- busy_printers.add(item.printer_id)
- continue
- # Check condition (previous print success)
- if item.require_previous_success:
- if not await self._check_previous_success(db, item):
- item.status = "skipped"
- item.error_message = "Previous print failed or was aborted"
- item.completed_at = datetime.now(timezone.utc)
- await db.commit()
- logger.info("Skipped queue item %s - previous print failed", item.id)
- # Send notification
- job_name = await self._get_job_name(db, item)
- printer = await self._get_printer(db, item.printer_id)
- await notification_service.on_queue_job_skipped(
- job_name=job_name,
- printer_id=item.printer_id,
- printer_name=printer.name if printer else "Unknown",
- reason="Previous print failed or was aborted",
- db=db,
- )
- continue
- # Compute AMS mapping if not already set
- if not item.ams_mapping:
- computed_mapping = await self._compute_ams_mapping_for_printer(db, item.printer_id, item)
- if computed_mapping:
- item.ams_mapping = json.dumps(computed_mapping)
- logger.info(
- f"Queue item {item.id}: Computed AMS mapping for printer {item.printer_id}: {computed_mapping}"
- )
- await db.commit()
- # Filament-deficit pre-dispatch check (#1496). If the
- # assigned spool can't satisfy any required slot grams,
- # promote the item to manual_start so the user must
- # acknowledge via the ▶ button (which re-checks live).
- if await self._block_on_filament_deficit(db, item):
- continue
- # Start the print
- await self._start_print(db, item)
- busy_printers.add(item.printer_id)
- # SJF starvation guard: mark items that were jumped
- if sjf_enabled and item.print_time_seconds is not None:
- for other in items:
- if (
- other.id != item.id
- and other.status == "pending"
- and other.printer_id == item.printer_id
- and not other.been_jumped
- and other.position < item.position
- and (
- other.print_time_seconds is None
- or other.print_time_seconds > item.print_time_seconds
- )
- ):
- other.been_jumped = True
- await db.commit()
- elif item.target_model:
- # Model-based assignment - find any idle printer of matching model
- # Parse required filament types if present
- required_types = None
- if item.required_filament_types:
- try:
- required_types = json.loads(item.required_filament_types)
- except json.JSONDecodeError:
- pass # Ignore malformed filament types; treat as no constraint
- # Parse filament overrides if present
- filament_overrides = None
- if item.filament_overrides:
- try:
- filament_overrides = json.loads(item.filament_overrides)
- except json.JSONDecodeError:
- pass
- # If overrides exist, use override types for validation instead
- effective_types = required_types
- if filament_overrides:
- override_types = sorted({o["type"] for o in filament_overrides if "type" in o})
- if override_types:
- # Merge: keep original types for non-overridden slots, add override types
- effective_types = sorted(set(required_types or []) | set(override_types))
- printer_id, waiting_reason = await self._find_idle_printer_for_model(
- db,
- item.target_model,
- busy_printers,
- effective_types,
- item.target_location,
- filament_overrides=filament_overrides,
- require_plate_clear=require_plate_clear,
- )
- # Update waiting_reason if changed and send notification when first waiting
- if item.waiting_reason != waiting_reason:
- was_waiting = item.waiting_reason is not None
- item.waiting_reason = waiting_reason
- await db.commit()
- # Send waiting notification only when transitioning to waiting state
- # and the reason requires user action (not just "all printers busy")
- if waiting_reason and not was_waiting and not self._is_busy_only(waiting_reason):
- job_name = await self._get_job_name(db, item)
- await notification_service.on_queue_job_waiting(
- job_name=job_name,
- target_model=item.target_model,
- waiting_reason=waiting_reason,
- db=db,
- )
- if printer_id:
- # Check condition (previous print success) before assigning
- if item.require_previous_success:
- if not await self._check_previous_success(db, item):
- item.status = "skipped"
- item.error_message = "Previous print failed or was aborted"
- item.completed_at = datetime.now(timezone.utc)
- await db.commit()
- logger.info("Skipped queue item %s - previous print failed", item.id)
- # Send notification
- job_name = await self._get_job_name(db, item)
- printer = await self._get_printer(db, printer_id)
- await notification_service.on_queue_job_skipped(
- job_name=job_name,
- printer_id=printer_id,
- printer_name=printer.name if printer else "Unknown",
- reason="Previous print failed or was aborted",
- db=db,
- )
- continue
- # Assign printer and start - clear waiting reason
- item.printer_id = printer_id
- item.waiting_reason = None
- logger.info("Model-based assignment: queue item %s assigned to printer %s", item.id, printer_id)
- # Send assignment notification
- job_name = await self._get_job_name(db, item)
- printer = await self._get_printer(db, printer_id)
- await notification_service.on_queue_job_assigned(
- job_name=job_name,
- printer_id=printer_id,
- printer_name=printer.name if printer else "Unknown",
- target_model=item.target_model,
- db=db,
- )
- # Compute AMS mapping for the assigned printer if not already set
- # This is critical for model-based jobs where mapping wasn't computed upfront
- if not item.ams_mapping:
- computed_mapping = await self._compute_ams_mapping_for_printer(db, printer_id, item)
- if computed_mapping:
- item.ams_mapping = json.dumps(computed_mapping)
- logger.info(
- f"Queue item {item.id}: Computed AMS mapping for printer {printer_id}: {computed_mapping}"
- )
- await db.commit()
- # Filament-deficit pre-dispatch check (#1496).
- if await self._block_on_filament_deficit(db, item):
- continue
- await self._start_print(db, item)
- busy_printers.add(printer_id)
- # SJF starvation guard: mark model-based items that were jumped
- if sjf_enabled and item.print_time_seconds is not None:
- for other in items:
- if (
- other.id != item.id
- and other.status == "pending"
- and other.printer_id is None
- and other.target_model
- and other.target_model.upper() == item.target_model.upper()
- and not other.been_jumped
- and other.position < item.position
- and (
- other.print_time_seconds is None
- or other.print_time_seconds > item.print_time_seconds
- )
- ):
- other.been_jumped = True
- await db.commit()
- # Log summary of skip reasons (helps diagnose why queue items aren't starting)
- if skip_reasons:
- logger.info("Queue skip summary: %s", skip_reasons)
- if busy_printers:
- # Log why each printer was busy (first time it was checked)
- for pid in busy_printers:
- state = printer_manager.get_status(pid)
- connected = printer_manager.is_connected(pid)
- awaiting = printer_manager.is_awaiting_plate_clear(pid)
- state_name = state.state if state else "NO_STATUS"
- logger.info(
- "Queue: printer %d not available — connected=%s, state=%s, awaiting_plate_clear=%s",
- pid,
- connected,
- state_name,
- awaiting,
- )
- # Auto-drying: start drying on idle printers that have no pending queue items
- await self._check_auto_drying(db, items, busy_printers, require_plate_clear=require_plate_clear)
- async def _find_idle_printer_for_model(
- self,
- db: AsyncSession,
- model: str,
- exclude_ids: set[int],
- required_filament_types: list[str] | None = None,
- target_location: str | None = None,
- filament_overrides: list[dict] | None = None,
- require_plate_clear: bool = True,
- ) -> tuple[int | None, str | None]:
- """Find an idle, connected printer matching the model with compatible filaments.
- Args:
- db: Database session
- model: Printer model to match (e.g., "X1C", "P1S")
- exclude_ids: Printer IDs to exclude (already busy)
- required_filament_types: Optional list of filament types needed (e.g., ["PLA", "PETG"])
- If provided, only printers with all required types loaded will match.
- target_location: Optional location filter. If provided, only printers in this location are considered.
- filament_overrides: Optional list of override dicts. Each entry may include
- ``force_color_match: true`` to require an exact type+color match
- on the printer for that slot. Without the flag the existing
- colour-preference logic applies.
- Returns:
- Tuple of (printer_id, waiting_reason):
- - (printer_id, None) if a matching printer was found
- - (None, reason) if no printer is available, with explanation
- """
- # Normalize model name and use case-insensitive matching
- normalized_model = normalize_printer_model(model) or model
- query = (
- select(Printer)
- .where(func.lower(Printer.model) == normalized_model.lower())
- .where(Printer.is_active == True) # noqa: E712
- )
- # Add location filter if specified
- if target_location:
- query = query.where(Printer.location == target_location)
- result = await db.execute(query)
- printers = list(result.scalars().all())
- location_suffix = f" in {target_location}" if target_location else ""
- if not printers:
- return None, f"No active {normalized_model} printers{location_suffix} configured"
- # Separate force-matched overrides from preference-only overrides
- force_overrides = [o for o in (filament_overrides or []) if o.get("force_color_match")]
- pref_overrides = [o for o in (filament_overrides or []) if not o.get("force_color_match")]
- # Track reasons for skipping printers
- printers_busy = []
- printers_offline = []
- printers_missing_filament: list[tuple[str, list[str]]] = []
- candidates: list[tuple[int, int]] = [] # (printer_id, color_match_count)
- for printer in printers:
- if printer.id in exclude_ids:
- # Printer is already claimed by another job in this scheduling run.
- # For force-color jobs, still check if the color would match — if not,
- # report it as a color mismatch rather than plain "Busy" so the user
- # knows the job needs a filament change, not just to wait for availability.
- if force_overrides and not pref_overrides:
- missing_colors = self._get_missing_force_color_slots(printer.id, force_overrides)
- if missing_colors:
- printers_missing_filament.append((printer.name, missing_colors))
- continue
- printers_busy.append(printer.name)
- continue
- is_connected = printer_manager.is_connected(printer.id)
- is_idle = self._is_printer_idle(printer.id, require_plate_clear) if is_connected else False
- if not is_connected:
- printers_offline.append(printer.name)
- continue
- if not is_idle:
- # Printer is currently printing. For force-color jobs, check whether the
- # loaded color would satisfy the requirement — if not, surface it as a
- # color-mismatch reason rather than plain "Busy" so the user understands
- # that the job is waiting for a filament change, not just printer availability.
- if force_overrides and not pref_overrides:
- missing_colors = self._get_missing_force_color_slots(printer.id, force_overrides)
- if missing_colors:
- printers_missing_filament.append((printer.name, missing_colors))
- logger.debug(
- "Printer %s (%s) is busy but also has wrong force-color: %s",
- printer.id,
- printer.name,
- missing_colors,
- )
- continue
- printers_busy.append(printer.name)
- continue
- # Validate filament compatibility if required types are specified
- if required_filament_types:
- missing = self._get_missing_filament_types(printer.id, required_filament_types)
- if missing:
- # When force_overrides are present, enrich missing entries with color info
- # so the "Waiting on" message includes "TYPE (color)" instead of just "TYPE"
- if force_overrides:
- force_color_map = {
- (o.get("type") or "").upper(): o.get("color_name") or o.get("color", "?")
- for o in force_overrides
- }
- missing_enriched = [
- f"{t} ({force_color_map[t_upper]})" if (t_upper := t.upper()) in force_color_map else t
- for t in missing
- ]
- printers_missing_filament.append((printer.name, missing_enriched))
- else:
- printers_missing_filament.append((printer.name, missing))
- logger.debug("Skipping printer %s (%s) - missing filaments: %s", printer.id, printer.name, missing)
- continue
- # Force color match: ALL flagged slots must have an exact type+color match
- if force_overrides:
- missing_colors = self._get_missing_force_color_slots(printer.id, force_overrides)
- if missing_colors:
- printers_missing_filament.append((printer.name, missing_colors))
- logger.debug(
- "Skipping printer %s (%s) - missing force-matched colors: %s",
- printer.id,
- printer.name,
- missing_colors,
- )
- continue
- # If preference-only overrides exist, rank by color matches (existing behaviour)
- if pref_overrides:
- color_matches = self._count_override_color_matches(printer.id, pref_overrides)
- if color_matches > 0:
- candidates.append((printer.id, color_matches))
- else:
- override_colors = [f"{o.get('type', '?')} ({o.get('color', '?')})" for o in pref_overrides]
- printers_missing_filament.append((printer.name, override_colors))
- logger.debug("Skipping printer %s (%s) - no matching override colors", printer.id, printer.name)
- continue
- elif force_overrides:
- # Passed all force checks — immediately eligible (no preference ordering needed)
- return printer.id, None
- else:
- # No overrides at all - take first available (existing behavior)
- return printer.id, None
- # If we have candidates from preference override matching, pick the one with most color matches
- if candidates:
- candidates.sort(key=lambda c: c[1], reverse=True)
- return candidates[0][0], None
- # Build waiting reason from what we found
- reasons = []
- if printers_missing_filament:
- # Filament/color mismatch is most actionable - show first
- if force_overrides and not pref_overrides:
- # All mismatches are force-color failures — use descriptive message only;
- # but only if there are no busy printers that DO have the matching color.
- # If a printer has the right color but is busy, surface "Busy" instead so
- # the user knows the job will start automatically once that printer is free.
- if not printers_busy:
- all_missing = sorted({c for _, cols in printers_missing_filament for c in cols})
- return None, f"No matching material/color. Waiting on {', '.join(all_missing)}"
- # else: fall through — printers_busy will be appended below
- else:
- names_and_missing = [
- f"{name} (needs {', '.join(missing)})" for name, missing in printers_missing_filament
- ]
- reasons.append(f"Waiting for filament: {'; '.join(names_and_missing)}")
- if printers_busy:
- reasons.append(f"Busy: {', '.join(printers_busy)}")
- if printers_offline:
- reasons.append(f"Offline: {', '.join(printers_offline)}")
- return None, " | ".join(reasons) if reasons else f"No available {model} printers{location_suffix}"
- @staticmethod
- def _is_busy_only(waiting_reason: str) -> bool:
- """Check if the waiting reason only contains 'Busy' entries.
- When all matching printers are simply busy printing, the queued job
- will start automatically once a printer finishes — no user action
- is required, so we skip the notification.
- """
- parts = [p.strip() for p in waiting_reason.split(" | ")]
- return all(p.startswith("Busy:") for p in parts)
- def _get_missing_force_color_slots(self, printer_id: int, force_overrides: list[dict]) -> list[str]:
- """Return descriptive strings for force_color_match slots not satisfied by the printer.
- Each entry in ``force_overrides`` must have ``type`` and ``color`` fields and is expected
- to carry ``force_color_match: True``. The printer must have **every** such slot loaded
- with an exact type+color match.
- Returns:
- List of ``"TYPE (color)"`` strings for unmatched slots (empty list means all match).
- """
- status = printer_manager.get_status(printer_id)
- if not status:
- return [f"{o.get('type', '?')} ({o.get('color_name') or o.get('color', '?')})" for o in force_overrides]
- # Build set of loaded type+colour pairs from AMS and external spool
- loaded: set[tuple[str, str]] = set()
- for ams_unit in status.raw_data.get("ams", []):
- for tray in ams_unit.get("tray", []):
- tray_type = tray.get("tray_type")
- tray_color = tray.get("tray_color", "")
- if tray_type:
- color_norm = tray_color.replace("#", "").lower()[:6]
- loaded.add((_canonical_filament_type(tray_type), color_norm))
- for vt in status.raw_data.get("vt_tray") or []:
- vt_type = vt.get("tray_type")
- if vt_type:
- color_norm = (vt.get("tray_color", "") or "").replace("#", "").lower()[:6]
- loaded.add((_canonical_filament_type(vt_type), color_norm))
- missing = []
- for o in force_overrides:
- o_type = _canonical_filament_type(o.get("type") or "")
- o_color = (o.get("color") or "").replace("#", "").lower()[:6]
- if (o_type, o_color) not in loaded:
- color_label = o.get("color_name") or o.get("color", "?")
- missing.append(f"{o_type} ({color_label})")
- return missing
- def _get_missing_filament_types(self, printer_id: int, required_types: list[str]) -> list[str]:
- """Get the list of required filament types that are not loaded on the printer.
- Args:
- printer_id: The printer ID
- required_types: List of filament types needed (e.g., ["PLA", "PETG"])
- Returns:
- List of missing filament types (empty if all are loaded)
- """
- status = printer_manager.get_status(printer_id)
- if not status:
- return required_types # Can't determine, assume all missing
- # Collect all filament types loaded on this printer (AMS units + external spool)
- # Use canonical types so equivalence groups (e.g. PA-CF/PA12-CF/PAHT-CF) match.
- loaded_types: set[str] = set()
- # Check AMS units (stored in raw_data["ams"])
- ams_data = status.raw_data.get("ams", [])
- if ams_data:
- for ams_unit in ams_data:
- for tray in ams_unit.get("tray", []):
- tray_type = tray.get("tray_type")
- if tray_type:
- loaded_types.add(_canonical_filament_type(tray_type))
- # Check external spool(s) (virtual tray, stored in raw_data["vt_tray"] as list)
- for vt in status.raw_data.get("vt_tray") or []:
- vt_type = vt.get("tray_type")
- if vt_type:
- loaded_types.add(_canonical_filament_type(vt_type))
- # Find which required types are missing (using canonical type for equivalence)
- missing = []
- for req_type in required_types:
- if _canonical_filament_type(req_type) not in loaded_types:
- missing.append(req_type)
- return missing
- def _count_override_color_matches(self, printer_id: int, overrides: list[dict]) -> int:
- """Count how many filament overrides have an exact color match on the printer.
- Used to prefer printers that already have the desired override colors loaded.
- """
- status = printer_manager.get_status(printer_id)
- if not status:
- return 0
- # Collect loaded filaments' type+color pairs
- loaded: set[tuple[str, str]] = set()
- for ams_unit in status.raw_data.get("ams", []):
- for tray in ams_unit.get("tray", []):
- tray_type = tray.get("tray_type")
- tray_color = tray.get("tray_color", "")
- if tray_type:
- color_norm = tray_color.replace("#", "").lower()[:6]
- loaded.add((tray_type.upper(), color_norm))
- for vt in status.raw_data.get("vt_tray") or []:
- vt_type = vt.get("tray_type")
- if vt_type:
- color_norm = (vt.get("tray_color", "") or "").replace("#", "").lower()[:6]
- loaded.add((vt_type.upper(), color_norm))
- matches = 0
- for o in overrides:
- o_type = (o.get("type") or "").upper()
- o_color = (o.get("color") or "").replace("#", "").lower()[:6]
- if (o_type, o_color) in loaded:
- matches += 1
- return matches
- async def _compute_ams_mapping_for_printer(
- self, db: AsyncSession, printer_id: int, item: PrintQueueItem
- ) -> list[int] | None:
- """Compute AMS mapping for a printer based on filament requirements.
- Called when a queue item has no ams_mapping set — either for model-based
- items after printer assignment, or printer-specific items (e.g. from VP).
- Args:
- db: Database session
- printer_id: The assigned printer ID
- item: The queue item (contains archive_id or library_file_id)
- Returns:
- AMS mapping array or None if no mapping needed/possible
- """
- # Get printer status
- status = printer_manager.get_status(printer_id)
- if not status:
- logger.warning("Cannot compute AMS mapping: printer %s status unavailable", printer_id)
- return None
- # Get filament requirements from source file
- filament_reqs = await self._get_filament_requirements(db, item)
- if not filament_reqs:
- # When the 3MF can't be read but force-color overrides are present, build a
- # direct mapping from the overrides so the printer uses the correct AMS slot.
- if item.filament_overrides:
- try:
- overrides = json.loads(item.filament_overrides)
- force_overrides = [o for o in overrides if o.get("force_color_match")]
- if force_overrides:
- logger.info(
- "Queue item %s: No filament reqs from 3MF; building AMS mapping from %d "
- "force-color override(s)",
- item.id,
- len(force_overrides),
- )
- return self._build_override_direct_mapping(force_overrides, status)
- except (json.JSONDecodeError, KeyError, TypeError) as e:
- logger.warning("Queue item %s: Force-color fallback mapping failed: %s", item.id, e)
- logger.debug("No filament requirements found for queue item %s", item.id)
- return None
- # Apply filament overrides if present
- if item.filament_overrides:
- try:
- overrides = json.loads(item.filament_overrides)
- override_map = {o["slot_id"]: o for o in overrides}
- for req in filament_reqs:
- if req["slot_id"] in override_map:
- override = override_map[req["slot_id"]]
- req["type"] = override["type"]
- req["color"] = override["color"]
- # Clear tray_info_idx so matching uses type+color instead of
- # the original 3MF's tray_info_idx (which would match the old filament)
- req["tray_info_idx"] = ""
- logger.debug(
- "Queue item %s: Override slot %d -> %s %s",
- item.id,
- req["slot_id"],
- override["type"],
- override["color"],
- )
- except (json.JSONDecodeError, KeyError, TypeError) as e:
- logger.warning("Failed to apply filament overrides for queue item %s: %s", item.id, e)
- # Build loaded filaments from printer status
- loaded_filaments = self._build_loaded_filaments(status)
- if not loaded_filaments:
- logger.debug("No filaments loaded on printer %s", printer_id)
- return None
- # Check if user prefers lowest remaining filament when multiple spools match
- prefer_lowest = await self._get_bool_setting(db, "prefer_lowest_filament")
- # When the preference is on, surface Bambuddy's inventory-side
- # remaining for each slot that's bound to a tracked spool, so the
- # sort beats the MQTT-only blind spot (#1508). Skip the lookup
- # entirely when the preference is off — no behaviour change for
- # users who haven't opted in.
- inventory_remain_overrides: dict[int, float] | None = None
- if prefer_lowest:
- inventory_remain_overrides = await self._build_inventory_remain_overrides(db, printer_id, loaded_filaments)
- # Compute mapping: match required filaments to available slots
- return self._match_filaments_to_slots(
- filament_reqs, loaded_filaments, prefer_lowest, inventory_remain_overrides
- )
- def _build_override_direct_mapping(self, force_overrides: list[dict], status) -> list[int] | None:
- """Build an AMS mapping directly from force-color overrides without a 3MF.
- Used when ``_get_filament_requirements`` returns nothing (e.g. the 3MF's
- slice_info is missing or unreadable) but ``force_color_match`` overrides
- are present. Each override's ``slot_id``, ``type``, and ``color`` are
- treated as the filament requirement for that slot and matched against the
- current AMS state of the printer.
- Returns the same format as ``_match_filaments_to_slots``, or None when
- the AMS has no loaded filaments.
- """
- loaded = self._build_loaded_filaments(status)
- if not loaded:
- return None
- reqs = [
- {
- "slot_id": o["slot_id"],
- "type": o.get("type", ""),
- "color": o.get("color", ""),
- "tray_info_idx": "",
- }
- for o in force_overrides
- ]
- return self._match_filaments_to_slots(reqs, loaded)
- async def _get_filament_requirements(self, db: AsyncSession, item: PrintQueueItem) -> list[dict] | None:
- """Resolve the queue item's source 3MF and parse the per-slot
- filament requirements out of it. Thin DB-resolver wrapper around
- ``filament_requirements.extract_filament_requirements`` so the VP
- queue-mode write path (#1188) can reuse the same parser at upload
- time.
- """
- from backend.app.services.filament_requirements import extract_filament_requirements
- file_path: Path | None = None
- if item.archive_id:
- result = await db.execute(select(PrintArchive).where(PrintArchive.id == item.archive_id))
- archive = result.scalar_one_or_none()
- if archive:
- file_path = settings.base_dir / archive.file_path
- elif item.library_file_id:
- result = await db.execute(LibraryFile.active().where(LibraryFile.id == item.library_file_id))
- library_file = result.scalar_one_or_none()
- if library_file:
- lib_path = Path(library_file.file_path)
- file_path = lib_path if lib_path.is_absolute() else settings.base_dir / library_file.file_path
- if not file_path or not file_path.exists():
- return None
- filaments = extract_filament_requirements(file_path, plate_id=item.plate_id)
- return filaments if filaments else None
- def _build_loaded_filaments(self, status) -> list[dict]:
- """Build list of loaded filaments from printer status.
- Args:
- status: PrinterState from printer_manager
- Returns:
- List of loaded filament dicts with type, color, ams_id, tray_id, global_tray_id
- """
- filaments = []
- # Get ams_extruder_map for dual-nozzle printers (H2D, H2D Pro)
- ams_extruder_map = status.raw_data.get("ams_extruder_map", {})
- # Parse AMS units from raw_data
- ams_data = status.raw_data.get("ams", [])
- for ams_unit in ams_data:
- ams_id = int(ams_unit.get("id", 0))
- trays = ams_unit.get("tray", [])
- is_ht = len(trays) == 1 # AMS-HT has single tray
- for tray in trays:
- tray_type = tray.get("tray_type")
- if tray_type:
- tray_id = int(tray.get("id", 0))
- tray_color = tray.get("tray_color", "")
- # tray_info_idx identifies the specific spool (e.g., "GFA00", "P4d64437")
- tray_info_idx = tray.get("tray_info_idx", "")
- # Normalize color: remove alpha, add hash
- color = self._normalize_color(tray_color)
- # Calculate global tray ID
- # AMS-HT units have IDs starting at 128 with a single tray
- global_tray_id = ams_id if ams_id >= 128 else ams_id * 4 + tray_id
- filaments.append(
- {
- "type": tray_type,
- "color": color,
- "tray_info_idx": tray_info_idx,
- "ams_id": ams_id,
- "tray_id": tray_id,
- "is_ht": is_ht,
- "is_external": False,
- "global_tray_id": global_tray_id,
- "extruder_id": ams_extruder_map.get(str(ams_id)),
- "remain": tray.get("remain", -1),
- }
- )
- # Check external spool(s) (vt_tray is a list)
- for idx, vt in enumerate(status.raw_data.get("vt_tray") or []):
- if vt.get("tray_type"):
- color = self._normalize_color(vt.get("tray_color", ""))
- tray_id = int(vt.get("id", 254))
- filaments.append(
- {
- "type": vt["tray_type"],
- "color": color,
- "tray_info_idx": vt.get("tray_info_idx", ""),
- "ams_id": -1,
- "tray_id": idx,
- "is_ht": False,
- "is_external": True,
- "global_tray_id": tray_id,
- "extruder_id": (255 - tray_id) if ams_extruder_map else None,
- "remain": vt.get("remain", -1),
- }
- )
- return filaments
- def _normalize_color(self, color: str | None) -> str:
- """Normalize color to #RRGGBB format."""
- if not color:
- return "#808080"
- hex_color = color.replace("#", "")[:6]
- return f"#{hex_color}"
- def _normalize_color_for_compare(self, color: str | None) -> str:
- """Normalize color for comparison (lowercase, no hash)."""
- if not color:
- return ""
- return color.replace("#", "").lower()[:6]
- def _colors_are_similar(self, color1: str | None, color2: str | None, threshold: int = 40) -> bool:
- """Check if two colors are visually similar within a threshold."""
- hex1 = self._normalize_color_for_compare(color1)
- hex2 = self._normalize_color_for_compare(color2)
- if not hex1 or not hex2 or len(hex1) < 6 or len(hex2) < 6:
- return False
- try:
- r1 = int(hex1[0:2], 16)
- g1 = int(hex1[2:4], 16)
- b1 = int(hex1[4:6], 16)
- r2 = int(hex2[0:2], 16)
- g2 = int(hex2[2:4], 16)
- b2 = int(hex2[4:6], 16)
- return abs(r1 - r2) <= threshold and abs(g1 - g2) <= threshold and abs(b1 - b2) <= threshold
- except ValueError:
- return False
- async def _build_inventory_remain_overrides(
- self, db: AsyncSession, printer_id: int, loaded: list[dict]
- ) -> dict[int, float]:
- """Return ``{global_tray_id: remaining_grams}`` for AMS slots the user
- has bound to an inventory spool — Bambuddy-side or Spoolman-side.
- The MQTT ``remain`` field on a tray is the printer firmware's
- RFID-decremented value, which has two limitations the "Prefer Lowest
- Remaining Filament" feature has been ignoring (#1508):
- - it's only meaningful for Bambu RFID spools; everything else reports
- ``-1`` (then clamped to a sentinel), so multiple non-RFID trays
- compare equal and the sort collapses to AMS-slot order — the user
- who's curating inventory weights gets the lower-slot pick instead
- of the lower-remaining pick;
- - even when set, it's the *printer's* counter, not Bambuddy's
- ``label_weight - weight_used`` (internal mode) or Spoolman's
- ``remaining_weight`` (Spoolman mode) — the two diverge any time the
- user re-spools, swaps cardboard, or runs a print outside Bambuddy.
- When the user has bound a spool to a slot, their own inventory
- tracking is authoritative; this helper surfaces that value so the
- sort can prefer it. Slots without a binding are absent from the
- returned map — the caller then falls back to MQTT ``remain`` for
- those, preserving the pre-#1508 behaviour for un-tracked spools.
- Returns an empty map on any failure (no inventory bindings, DB
- error, Spoolman unreachable). A best-effort lookup; "Prefer Lowest"
- is a preference, not a guarantee.
- """
- if not loaded:
- return {}
- # External / virtual-tray slots are tracked separately from AMS — skip
- # them so a VT-loaded spool doesn't accidentally inherit a tracked
- # AMS binding (the tables use ams_id 254/255 for VT, but the cross
- # match is fiddly and out of scope for this fix).
- tracked_slots = [(f["ams_id"], f["tray_id"], f["global_tray_id"]) for f in loaded if not f.get("is_external")]
- if not tracked_slots:
- return {}
- is_spoolman = await self._is_spoolman_mode(db)
- overrides: dict[int, float] = {}
- if is_spoolman:
- result = await db.execute(
- select(SpoolmanSlotAssignment).where(SpoolmanSlotAssignment.printer_id == printer_id)
- )
- assignments = list(result.scalars().all())
- by_slot = {(a.ams_id, a.tray_id): a.spoolman_spool_id for a in assignments}
- from backend.app.services.filament_deficit import _spoolman_remaining_grams
- for ams_id, tray_id, gtid in tracked_slots:
- spoolman_id = by_slot.get((ams_id, tray_id))
- if spoolman_id is None:
- continue
- grams = await _spoolman_remaining_grams(spoolman_id)
- if grams is not None:
- overrides[gtid] = grams
- return overrides
- # Internal inventory mode (default). selectinload matches the pattern
- # used elsewhere (inventory.py, spoolman.py routes) — a single query
- # plus an eager-loaded relationship rather than an explicit join, so
- # the row-attribute shape is exactly what those routes already rely on.
- result = await db.execute(
- select(SpoolAssignment)
- .options(selectinload(SpoolAssignment.spool))
- .where(SpoolAssignment.printer_id == printer_id)
- )
- assignments = list(result.scalars().all())
- by_slot = {(a.ams_id, a.tray_id): a.spool for a in assignments}
- for ams_id, tray_id, gtid in tracked_slots:
- spool = by_slot.get((ams_id, tray_id))
- if spool is None:
- continue
- label = float(spool.label_weight or 0)
- used = float(spool.weight_used or 0)
- overrides[gtid] = max(0.0, label - used)
- return overrides
- @staticmethod
- async def _is_spoolman_mode(db: AsyncSession) -> bool:
- """Mirror of ``filament_deficit._is_spoolman_mode`` — kept private
- here to avoid making this module import-dependent on that private
- helper's signature."""
- try:
- from backend.app.api.routes.settings import get_setting
- v = await get_setting(db, "spoolman_enabled")
- return bool(v) and v.lower() == "true"
- except Exception:
- return False
- @staticmethod
- def _slot_priority(ams_id: int | None, tray_id: int | None) -> int:
- """Deterministic slot-position tie-breaker for the prefer-lowest sort.
- Three bands, matched to the emission order in ``_build_loaded_filaments``
- so a tied sort produces the same physical-position order the pre-#1508
- stable sort did (preserves the regression-free baseline):
- - Regular AMS (``ams_id`` 0..7): ``ams_id * 4 + tray_id`` → 0..31
- - AMS-HT (``ams_id`` >= 128, single tray): ``1000 + (ams_id - 128) * 4``
- - External / VT (``ams_id`` < 0, or ``None``): ``10_000``
- Banding ensures regular AMS < AMS-HT < external on ties, regardless of
- what the raw ``ams_id`` happens to be (in particular, ``ams_id = -1``
- for VT must NOT sort to a negative number or it would beat AMS slot 0).
- """
- if ams_id is None or ams_id < 0:
- return 10_000
- if ams_id >= 128:
- return 1_000 + (ams_id - 128) * 4 + (tray_id or 0)
- return ams_id * 4 + (tray_id or 0)
- @staticmethod
- def _prefer_lowest_sort_key(f: dict, overrides: dict[int, float] | None) -> tuple[int, float, int]:
- """Sort key for the "Prefer Lowest Remaining Filament" preference.
- Two-tier ordering: inventory-tracked spools always sort BEFORE
- non-tracked spools (the user has told us they care about these
- specifically), then ascending by remaining within each tier, then
- ascending by AMS slot position as the deterministic tie-breaker.
- Tiers are flagged by the first tuple element (0 = inventory-tracked,
- 1 = MQTT-only / unknown). Cross-tier value comparisons never run
- because the tier flag dominates — which is what lets us mix grams
- (inventory) and percent (MQTT) without a unit conversion.
- Within the MQTT tier ``remain = -1`` (unknown) is mapped to 101 so
- spools the printer DOES know something about sort ahead of those
- it knows nothing about — preserves pre-#1508 behaviour for the
- no-inventory-binding case.
- Slot tie-breaker via ``_slot_priority`` so regular AMS < AMS-HT <
- external on ties, matching the legacy emission-order stable sort.
- """
- gtid = f.get("global_tray_id")
- slot_order = PrintScheduler._slot_priority(f.get("ams_id"), f.get("tray_id"))
- if overrides and gtid in overrides:
- return (0, overrides[gtid], slot_order)
- remain = f.get("remain", -1)
- return (1, float(remain) if remain is not None and remain >= 0 else 101.0, slot_order)
- def _match_filaments_to_slots(
- self,
- required: list[dict],
- loaded: list[dict],
- prefer_lowest: bool = False,
- inventory_remain_overrides: dict[int, float] | None = None,
- ) -> list[int] | None:
- """Match required filaments to loaded filaments and build AMS mapping.
- Priority: unique tray_info_idx match > exact color match > similar color match > type-only match
- The tray_info_idx is a filament type identifier stored in the 3MF file when the user
- slices (e.g., "GFA00" for generic PLA, "P4d64437" for custom presets). If the same
- tray_info_idx appears in only ONE available tray, we use that tray. If multiple trays
- have the same tray_info_idx (e.g., two spools of generic PLA), we fall back to color
- matching among those trays.
- Args:
- required: List of required filaments with slot_id, type, color, tray_info_idx
- loaded: List of loaded filaments with type, color, tray_info_idx, global_tray_id
- Returns:
- AMS mapping array (position = slot_id - 1, value = global_tray_id or -1)
- """
- if not required:
- return None
- # Track used trays to avoid duplicate assignment
- used_tray_ids: set[int] = set()
- comparisons = []
- for req in required:
- req_type = (req.get("type") or "").upper()
- req_color = req.get("color", "")
- req_tray_info_idx = req.get("tray_info_idx", "")
- # Find best match: unique tray_info_idx > exact color > similar color > type-only
- idx_match = None
- exact_match = None
- similar_match = None
- type_only_match = None
- # Get available trays (not already used)
- available = [f for f in loaded if f["global_tray_id"] not in used_tray_ids]
- # Nozzle-aware filtering: restrict to trays on the correct nozzle.
- # Hard filter — cross-nozzle assignment causes print failures
- # ("position of left hotend is abnormal"), so never fall back.
- req_nozzle_id = req.get("nozzle_id")
- if req_nozzle_id is not None:
- available = [f for f in available if f.get("extruder_id") == req_nozzle_id]
- # Sort by remaining filament (ascending) so lowest-remain spool wins .find().
- # Inventory-tracked spools sort before MQTT-only ones (#1508); see
- # _prefer_lowest_sort_key for the full rationale.
- if prefer_lowest:
- available.sort(key=lambda f: self._prefer_lowest_sort_key(f, inventory_remain_overrides))
- # Check if tray_info_idx is unique among available trays
- if req_tray_info_idx:
- idx_matches = [f for f in available if f.get("tray_info_idx") == req_tray_info_idx]
- if len(idx_matches) == 1:
- # Unique tray_info_idx - use it as definitive match
- idx_match = idx_matches[0]
- logger.debug(
- f"Matched filament slot {req.get('slot_id')} by unique tray_info_idx={req_tray_info_idx} "
- f"-> tray {idx_match['global_tray_id']}"
- )
- elif len(idx_matches) > 1:
- # Multiple trays with same tray_info_idx - use color matching among them
- logger.debug(
- f"Non-unique tray_info_idx={req_tray_info_idx} found in {len(idx_matches)} trays, "
- f"using color matching among trays: {[f['global_tray_id'] for f in idx_matches]}"
- )
- if prefer_lowest:
- idx_matches.sort(key=lambda f: self._prefer_lowest_sort_key(f, inventory_remain_overrides))
- # Use color matching within this subset
- for f in idx_matches:
- f_color = f.get("color", "")
- if self._normalize_color_for_compare(f_color) == self._normalize_color_for_compare(req_color):
- if not exact_match:
- exact_match = f
- elif self._colors_are_similar(f_color, req_color):
- if not similar_match:
- similar_match = f
- elif not type_only_match:
- type_only_match = f
- # If no idx_match yet, do standard type/color matching on all available trays
- if not idx_match and not exact_match and not similar_match and not type_only_match:
- for f in available:
- f_type = (f.get("type") or "").upper()
- if _canonical_filament_type(f_type) != _canonical_filament_type(req_type):
- continue
- # Type matches - check color
- f_color = f.get("color", "")
- if self._normalize_color_for_compare(f_color) == self._normalize_color_for_compare(req_color):
- if not exact_match:
- exact_match = f
- elif self._colors_are_similar(f_color, req_color):
- if not similar_match:
- similar_match = f
- elif not type_only_match:
- type_only_match = f
- match = idx_match or exact_match or similar_match or type_only_match
- if match:
- used_tray_ids.add(match["global_tray_id"])
- comparisons.append({"slot_id": req.get("slot_id", 0), "global_tray_id": match["global_tray_id"]})
- else:
- comparisons.append({"slot_id": req.get("slot_id", 0), "global_tray_id": -1})
- # Build mapping array
- if not comparisons:
- return None
- max_slot_id = max(c["slot_id"] for c in comparisons)
- if max_slot_id <= 0:
- return None
- mapping = [-1] * max_slot_id
- for c in comparisons:
- slot_id = c["slot_id"]
- if slot_id and slot_id > 0:
- mapping[slot_id - 1] = c["global_tray_id"]
- return mapping
- def _mark_printer_dispatched(
- self,
- printer_id: int,
- pre_state: str | None,
- pre_subtask_id: str | None,
- ) -> None:
- """Record that a print command was just sent to ``printer_id``.
- Held until either the watchdog observes a state/subtask transition
- (success path) or the hard timeout expires. See ``_dispatch_holds``.
- """
- if not pre_state:
- # No pre_state means we can't detect a transition — fall back to a
- # pure time-based hold using empty string as a sentinel that won't
- # match any real printer state.
- pre_state = ""
- self._dispatch_holds[printer_id] = (time.monotonic(), pre_state, pre_subtask_id)
- def _release_dispatch_hold(self, printer_id: int) -> None:
- """Drop the dispatch hold for ``printer_id`` (called by the watchdog)."""
- self._dispatch_holds.pop(printer_id, None)
- def _printer_in_dispatch_hold(self, printer_id: int) -> bool:
- """True if ``printer_id`` is still inside its post-dispatch hold window.
- Returns False (and clears the hold) once any of these are true:
- - hard timeout (``_dispatch_max_hold``) has elapsed
- - the printer has transitioned out of pre_state and we're past the
- minimum cooldown
- - the printer's subtask_id has advanced past pre_subtask_id and we're
- past the minimum cooldown
- Otherwise the printer is held — caller should treat it as busy.
- """
- entry = self._dispatch_holds.get(printer_id)
- if not entry:
- return False
- started_at, pre_state, pre_subtask_id = entry
- elapsed = time.monotonic() - started_at
- if elapsed >= self._dispatch_max_hold:
- self._dispatch_holds.pop(printer_id, None)
- return False
- # Without a pre_state we can't detect a transition — fall back to the
- # min cooldown alone, then drop the hold.
- if not pre_state:
- if elapsed >= self._dispatch_min_cooldown:
- self._dispatch_holds.pop(printer_id, None)
- return False
- return True
- status = printer_manager.get_status(printer_id)
- current_state = getattr(status, "state", None) if status else None
- current_subtask_id = getattr(status, "subtask_id", None) if status else None
- transitioned = (current_state is not None and current_state != pre_state) or (
- pre_subtask_id is not None and current_subtask_id is not None and current_subtask_id != pre_subtask_id
- )
- if transitioned and elapsed >= self._dispatch_min_cooldown:
- self._dispatch_holds.pop(printer_id, None)
- return False
- return True
- def _is_printer_idle(self, printer_id: int, require_plate_clear: bool = True) -> bool:
- """Check if a printer is connected and idle."""
- if not printer_manager.is_connected(printer_id):
- logger.debug("Printer %d: not connected", printer_id)
- return False
- state = printer_manager.get_status(printer_id)
- if not state:
- logger.debug("Printer %d: no status available", printer_id)
- return False
- # Plate-clear gate: if the printer finished/failed a previous print and the user
- # hasn't acknowledged the plate was cleared, the queue must not dispatch the next
- # job — even if the printer currently reports IDLE. After Auto Off cycles the
- # printer, it boots back into IDLE with no memory of the previous finish; without
- # the persisted awaiting flag we'd bypass the confirmation prompt (#961).
- if require_plate_clear and printer_manager.is_awaiting_plate_clear(printer_id):
- logger.debug(
- "Printer %d: not idle — awaiting plate-clear acknowledgment (state=%s)",
- printer_id,
- state.state,
- )
- return False
- idle = state.state in ("IDLE", "FINISH", "FAILED")
- if not idle:
- logger.debug("Printer %d: not idle — state=%s", printer_id, state.state)
- return idle
- async def _get_setting(self, db: AsyncSession, key: str) -> str | None:
- """Read a setting value from the database."""
- result = await db.execute(select(Settings).where(Settings.key == key))
- setting = result.scalar_one_or_none()
- return setting.value if setting else None
- async def _get_bool_setting(self, db: AsyncSession, key: str, default: bool = False) -> bool:
- """Read a boolean setting from the database."""
- result = await db.execute(select(Settings).where(Settings.key == key))
- setting = result.scalar_one_or_none()
- if setting:
- return setting.value.lower() == "true"
- return default
- async def _get_drying_presets(self, db: AsyncSession) -> dict[str, dict[str, int]]:
- """Get drying presets (user-configured or built-in defaults)."""
- result = await db.execute(select(Settings).where(Settings.key == "drying_presets"))
- setting = result.scalar_one_or_none()
- if setting and setting.value:
- try:
- presets = json.loads(setting.value)
- if isinstance(presets, dict) and presets:
- return presets
- except json.JSONDecodeError:
- pass
- return self.DEFAULT_DRYING_PRESETS
- def _get_conservative_drying_params(
- self, trays: list[dict], module_type: str, presets: dict[str, dict[str, int]]
- ) -> tuple[int, int, str] | None:
- """Get the most conservative drying params for mixed filament types in an AMS unit.
- Returns (temp, duration_hours, filament_type) or None if no drying-eligible filaments.
- """
- temp_key = module_type if module_type in ("n3f", "n3s") else "n3f"
- hours_key = f"{temp_key}_hours"
- min_temp = None
- max_hours = None
- filament_type = ""
- for tray in trays:
- tray_type = tray.get("tray_type", "")
- if not tray_type:
- continue
- # Normalize filament type for preset lookup (e.g., "PLA Basic" -> "PLA")
- base_type = tray_type.split()[0].upper()
- preset = presets.get(base_type)
- if not preset:
- continue
- temp = preset.get(temp_key, 55)
- hours = preset.get(hours_key, 12)
- # Conservative: lowest temp, longest duration
- if min_temp is None or temp < min_temp:
- min_temp = temp
- if max_hours is None or hours > max_hours:
- max_hours = hours
- if not filament_type:
- filament_type = base_type
- if min_temp is None:
- return None
- return (min_temp, max_hours or 12, filament_type)
- async def _check_auto_drying(
- self,
- db: AsyncSession,
- queue_items: list[PrintQueueItem],
- busy_printers: set[int],
- *,
- require_plate_clear: bool = True,
- ):
- """Start drying on idle printers based on humidity.
- Two modes (can both be enabled):
- - queue_drying_enabled: Dry between scheduled queue prints
- - ambient_drying_enabled: Dry any idle printer when humidity is high, regardless of queue
- """
- queue_drying_enabled = await self._get_bool_setting(db, "queue_drying_enabled")
- ambient_drying_enabled = await self._get_bool_setting(db, "ambient_drying_enabled")
- if not queue_drying_enabled and not ambient_drying_enabled:
- # Stop active drying on all printers if both features disabled
- if self._drying_in_progress:
- for pid in list(self._drying_in_progress):
- logger.info("Auto-drying: printer %d — stopping, auto-drying disabled", pid)
- await self._stop_drying(pid)
- return
- # Update drying state from printer status (handles backend restart)
- self._sync_drying_state()
- # Find printers with scheduled items (for queue drying mode)
- printers_with_scheduled: set[int] = set()
- printers_with_items: set[int] = set()
- for item in queue_items:
- if item.printer_id:
- printers_with_items.add(item.printer_id)
- if item.scheduled_time and not item.manual_start:
- printers_with_scheduled.add(item.printer_id)
- # If only queue mode is on and no printers have scheduled items, stop drying
- if not ambient_drying_enabled and not printers_with_scheduled:
- for pid in list(self._drying_in_progress):
- logger.info("Auto-drying: printer %d — stopping, no scheduled prints in queue", pid)
- await self._stop_drying(pid)
- return
- # Get humidity threshold
- result = await db.execute(select(Settings).where(Settings.key == "ams_humidity_fair"))
- setting = result.scalar_one_or_none()
- humidity_threshold = int(setting.value) if setting else 60
- # Get drying presets
- presets = await self._get_drying_presets(db)
- # Determine if drying should be skipped for printers with pending items
- block_for_drying = await self._get_bool_setting(db, "queue_drying_block")
- # Get all active printers
- all_printers = await db.execute(select(Printer).where(Printer.is_active.is_(True)))
- for printer in all_printers.scalars():
- pid = printer.id
- if pid in busy_printers:
- logger.debug("Auto-drying: printer %d skipped — busy", pid)
- continue
- # In queue-only mode, only dry printers that have scheduled prints
- if not ambient_drying_enabled and pid not in printers_with_scheduled:
- if self._drying_in_progress.get(pid):
- logger.info("Auto-drying: printer %d — stopping, no scheduled prints for this printer", pid)
- await self._stop_drying(pid)
- logger.debug("Auto-drying: printer %d skipped — no scheduled prints", pid)
- continue
- # When block mode is on, don't START new drying on printers with pending items.
- # But allow already-drying printers through so humidity auto-stop logic still runs.
- if block_for_drying and pid in printers_with_items and not self._drying_in_progress.get(pid):
- logger.debug("Auto-drying: printer %d skipped — has pending items (block mode)", pid)
- continue
- if not printer_manager.is_connected(pid):
- logger.debug("Auto-drying: printer %d skipped — not connected", pid)
- continue
- if not self._is_printer_idle(pid, require_plate_clear):
- logger.debug("Auto-drying: printer %d skipped — not idle", pid)
- continue
- # Check if this printer supports drying
- state = printer_manager.get_status(pid)
- if not state:
- logger.debug("Auto-drying: printer %d skipped — no state", pid)
- continue
- model = printer_manager.get_model(pid)
- firmware = state.firmware_version
- if not supports_drying(model, firmware):
- logger.debug("Auto-drying: printer %d skipped — model %s does not support drying", pid, model)
- continue
- # Check each AMS unit from raw_data
- ams_list = state.raw_data.get("ams", [])
- logger.debug("Auto-drying: printer %d — checking %d AMS units", pid, len(ams_list))
- for ams_data in ams_list:
- module_type = str(ams_data.get("module_type") or "")
- ams_id = int(ams_data.get("id", 0))
- # Only n3f/n3s support drying
- if module_type not in ("n3f", "n3s"):
- logger.debug("Auto-drying: printer %d AMS %d skipped — module_type=%s", pid, ams_id, module_type)
- continue
- dry_time = int(ams_data.get("dry_time") or 0)
- # Read humidity — prefer humidity_raw (actual %) over humidity (index 1-5)
- humidity = None
- h_raw = ams_data.get("humidity_raw")
- if h_raw is not None:
- try:
- humidity = int(h_raw)
- except (ValueError, TypeError):
- pass
- if humidity is None:
- h_idx = ams_data.get("humidity")
- if h_idx is not None:
- try:
- humidity = int(h_idx)
- except (ValueError, TypeError):
- pass
- # Already drying — check if humidity dropped below threshold (with minimum drying time)
- if dry_time > 0:
- if pid not in self._drying_in_progress:
- # Drying we didn't start (manual or from before restart) — track but don't stop
- self._drying_in_progress[pid] = time.monotonic()
- started_at = self._drying_in_progress[pid]
- elapsed = time.monotonic() - started_at
- if humidity is not None and humidity <= humidity_threshold and elapsed >= self._min_drying_seconds:
- logger.info(
- "Auto-drying: printer %d AMS %d — humidity %d%% <= threshold %d%% after %dm, stopping drying",
- pid,
- ams_id,
- humidity,
- humidity_threshold,
- int(elapsed / 60),
- )
- printer_manager.send_drying_command(pid, ams_id, temp=0, duration=0, mode=0)
- else:
- logger.debug(
- "Auto-drying: printer %d AMS %d — drying (%dm left, humidity %s%%, elapsed %dm/%dm min)",
- pid,
- ams_id,
- dry_time,
- humidity,
- int(elapsed / 60),
- self._min_drying_seconds // 60,
- )
- continue
- # Humidity below threshold — no need to start drying
- if humidity is None or humidity <= humidity_threshold:
- logger.debug(
- "Auto-drying: printer %d AMS %d skipped — humidity %s <= threshold %d",
- pid,
- ams_id,
- humidity,
- humidity_threshold,
- )
- continue
- # Check cannot-dry reasons (power constraints etc.)
- sf_reasons = ams_data.get("dry_sf_reason", [])
- if sf_reasons:
- logger.debug(
- "Auto-drying: printer %d AMS %d skipped — cannot dry reasons: %s",
- pid,
- ams_id,
- sf_reasons,
- )
- continue
- # Get conservative drying params for mixed filaments
- trays = ams_data.get("tray", [])
- params = self._get_conservative_drying_params(trays, module_type, presets)
- if not params:
- logger.debug(
- "Auto-drying: printer %d AMS %d skipped — no drying-eligible filaments in trays", pid, ams_id
- )
- continue
- temp, duration_hours, filament_type = params
- # Start drying
- logger.info(
- "Auto-drying: printer %d AMS %d — humidity %d%% > threshold %d%%, "
- "starting %s drying at %d°C for %dh",
- pid,
- ams_id,
- humidity,
- humidity_threshold,
- filament_type,
- temp,
- duration_hours,
- )
- success = printer_manager.send_drying_command(
- pid, ams_id, temp, duration_hours, mode=1, filament=filament_type
- )
- if success:
- self._drying_in_progress[pid] = time.monotonic()
- def _sync_drying_state(self):
- """Sync in-memory drying state with actual printer status.
- Handles backend restart — if a printer is drying but we don't know about it,
- update our state. If we think it's drying but it's not, clear it.
- """
- to_remove = []
- for pid in self._drying_in_progress:
- state = printer_manager.get_status(pid)
- if not state:
- to_remove.append(pid)
- continue
- # Check if any AMS unit is still drying
- ams_list = state.raw_data.get("ams", [])
- any_drying = any(int(a.get("dry_time") or 0) > 0 for a in ams_list)
- if not any_drying:
- to_remove.append(pid)
- for pid in to_remove:
- self._drying_in_progress.pop(pid, None)
- async def _stop_drying(self, printer_id: int):
- """Stop all active drying on a printer (print takes priority)."""
- state = printer_manager.get_status(printer_id)
- if not state:
- self._drying_in_progress.pop(printer_id, None)
- return
- ams_list = state.raw_data.get("ams", [])
- for ams_data in ams_list:
- dry_time = int(ams_data.get("dry_time") or 0)
- if dry_time > 0:
- ams_id = int(ams_data.get("id", 0))
- logger.info(
- "Auto-drying: stopping drying on printer %d AMS %d — print takes priority",
- printer_id,
- ams_id,
- )
- printer_manager.send_drying_command(printer_id, ams_id, 0, 0, mode=0)
- self._drying_in_progress.pop(printer_id, None)
- async def _get_smart_plugs(self, db: AsyncSession, printer_id: int) -> list[SmartPlug]:
- """Get all smart plugs associated with a printer."""
- result = await db.execute(select(SmartPlug).where(SmartPlug.printer_id == printer_id))
- return list(result.scalars().all())
- async def _power_on_and_wait(self, plug: SmartPlug, printer_id: int, db: AsyncSession) -> bool:
- """Turn on smart plug and wait for printer to connect.
- Returns True if printer connected successfully within timeout.
- """
- # Get the appropriate service for the plug type (Tasmota or Home Assistant)
- service = await smart_plug_manager.get_service_for_plug(plug, db)
- # Check current plug state
- status = await service.get_status(plug)
- if not status.get("reachable"):
- logger.warning("Smart plug '%s' is not reachable", plug.name)
- return False
- # Turn on if not already on
- if status.get("state") != "ON":
- success = await service.turn_on(plug)
- if not success:
- logger.warning("Failed to turn on smart plug '%s'", plug.name)
- return False
- logger.info("Powered on smart plug '%s' for printer %s", plug.name, printer_id)
- # Get printer from database for connection
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- logger.error("Printer %s not found in database", printer_id)
- return False
- # Wait for printer to boot (give it some time before trying to connect)
- logger.info("Waiting 30s for printer %s to boot...", printer_id)
- await asyncio.sleep(30)
- # Try to connect to the printer periodically
- elapsed = 30 # Already waited 30s
- while elapsed < self._power_on_wait_time:
- # Try to connect
- logger.info("Attempting to connect to printer %s...", printer_id)
- try:
- connected = await printer_manager.connect_printer(printer)
- if connected:
- logger.info("Printer %s connected after %ss", printer_id, elapsed)
- # Give it a moment to stabilize and get status
- await asyncio.sleep(5)
- return True
- except Exception as e:
- logger.debug("Connection attempt failed: %s", e)
- await asyncio.sleep(self._power_on_check_interval)
- elapsed += self._power_on_check_interval
- logger.debug("Waiting for printer %s to connect... (%ss)", printer_id, elapsed)
- logger.warning("Printer %s did not connect within %ss after power on", printer_id, self._power_on_wait_time)
- return False
- async def _check_previous_success(self, db: AsyncSession, item: PrintQueueItem) -> bool:
- """Check if the previous print on this printer succeeded."""
- # Find the most recent completed queue item for this printer
- result = await db.execute(
- select(PrintQueueItem)
- .where(PrintQueueItem.printer_id == item.printer_id)
- .where(PrintQueueItem.id != item.id)
- .where(PrintQueueItem.status.in_(["completed", "failed", "skipped", "aborted"]))
- .order_by(PrintQueueItem.completed_at.desc())
- .limit(1)
- )
- prev_item = result.scalar_one_or_none()
- # If no previous item, assume success (first in queue)
- if not prev_item:
- return True
- return prev_item.status == "completed"
- async def _power_off_if_needed(self, db: AsyncSession, item: PrintQueueItem):
- """Power off printer if auto_off_after is enabled (waits for cooldown)."""
- if not item.auto_off_after:
- return
- plugs = await self._get_smart_plugs(db, item.printer_id)
- plug_ids = [p.id for p in plugs if p.enabled]
- if plug_ids:
- logger.info("Auto-off: Waiting for printer %s to cool down before power off...", item.printer_id)
- # Wait for cooldown (up to 10 minutes)
- await printer_manager.wait_for_cooldown(item.printer_id, target_temp=50.0, timeout=600)
- # Re-fetch plugs in a fresh session after the long cooldown wait
- async with async_session() as new_db:
- for plug_id in plug_ids:
- try:
- result = await new_db.execute(select(SmartPlug).where(SmartPlug.id == plug_id))
- plug = result.scalar_one_or_none()
- if plug and plug.enabled:
- logger.info("Auto-off: Powering off plug '%s' for printer %s", plug.name, item.printer_id)
- service = await smart_plug_manager.get_service_for_plug(plug, new_db)
- await service.turn_off(plug)
- except Exception as e:
- logger.warning(
- "Auto-off: Failed to power off plug %s for printer %s: %s", plug_id, item.printer_id, e
- )
- async def _get_job_name(self, db: AsyncSession, item: PrintQueueItem) -> str:
- """Get a human-readable name for a queue item."""
- if item.archive_id:
- result = await db.execute(select(PrintArchive).where(PrintArchive.id == item.archive_id))
- archive = result.scalar_one_or_none()
- if archive:
- return archive.filename.replace(".gcode.3mf", "").replace(".3mf", "")
- if item.library_file_id:
- result = await db.execute(LibraryFile.active().where(LibraryFile.id == item.library_file_id))
- library_file = result.scalar_one_or_none()
- if library_file:
- return library_file.filename.replace(".gcode.3mf", "").replace(".3mf", "")
- return f"Job #{item.id}"
- async def _get_printer(self, db: AsyncSession, printer_id: int) -> Printer | None:
- """Get printer by ID."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- return result.scalar_one_or_none()
- async def _block_on_filament_deficit(
- self,
- db: AsyncSession,
- item: PrintQueueItem,
- ) -> bool:
- """Promote the item to manual_start when the assigned spool is short (#1496).
- Returns True when this dispatch attempt was blocked, False when the
- item is clear to start. A previously-flagged item whose spool has
- since been swapped to one with enough material clears the flag here
- so the next scheduler tick dispatches it.
- """
- try:
- deficit = await compute_deficit_for_queue_item(db, item)
- except Exception as e:
- # Never let a flaky deficit check wedge the queue — log and let
- # dispatch proceed. The PrintModal-side check still runs on the
- # manual paths.
- logger.warning("Filament deficit check failed for item %s: %s", item.id, e)
- return False
- if deficit:
- item.filament_short = True
- item.manual_start = True
- await db.commit()
- job_name = await self._get_job_name(db, item)
- printer = await self._get_printer(db, item.printer_id) if item.printer_id else None
- logger.info(
- "Queue item %s blocked on filament deficit (%d slot(s)) — promoted to manual_start",
- item.id,
- len(deficit),
- )
- try:
- await notification_service.on_queue_job_waiting(
- job_name=job_name,
- target_model=(printer.model if printer else "") or "",
- waiting_reason="filament_short",
- db=db,
- )
- except Exception as e:
- logger.debug("filament_short notification failed for item %s: %s", item.id, e)
- return True
- # No deficit — clear any stale flag from a previous tick.
- if item.filament_short:
- item.filament_short = False
- await db.commit()
- return False
- async def _start_print(self, db: AsyncSession, item: PrintQueueItem):
- """Upload file and start print for a queue item.
- Supports two sources:
- - archive_id: Print from an existing archive
- - library_file_id: Print from a library file (file manager)
- """
- logger.info("Starting queue item %s", item.id)
- # Get printer first (needed for both paths)
- result = await db.execute(select(Printer).where(Printer.id == item.printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- item.status = "failed"
- item.error_message = "Printer not found"
- item.completed_at = datetime.now(timezone.utc)
- await db.commit()
- logger.error("Queue item %s: Printer %s not found", item.id, item.printer_id)
- await self._power_off_if_needed(db, item)
- return
- # Check printer is connected
- if not printer_manager.is_connected(item.printer_id):
- item.status = "failed"
- item.error_message = "Printer not connected"
- item.completed_at = datetime.now(timezone.utc)
- await db.commit()
- logger.error("Queue item %s: Printer %s not connected", item.id, item.printer_id)
- await self._power_off_if_needed(db, item)
- return
- # Determine source: archive or library file
- archive = None
- library_file = None
- file_path = None
- filename = None
- if item.archive_id:
- # Print from archive
- result = await db.execute(select(PrintArchive).where(PrintArchive.id == item.archive_id))
- archive = result.scalar_one_or_none()
- if not archive:
- item.status = "failed"
- item.error_message = "Archive not found"
- item.completed_at = datetime.now(timezone.utc)
- await db.commit()
- logger.error("Queue item %s: Archive %s not found", item.id, item.archive_id)
- await self._power_off_if_needed(db, item)
- return
- file_path = settings.base_dir / archive.file_path
- filename = archive.filename
- elif item.library_file_id:
- # Print from library file (file manager)
- result = await db.execute(LibraryFile.active().where(LibraryFile.id == item.library_file_id))
- library_file = result.scalar_one_or_none()
- if not library_file:
- item.status = "failed"
- item.error_message = "Library file not found"
- item.completed_at = datetime.now(timezone.utc)
- await db.commit()
- logger.error("Queue item %s: Library file %s not found", item.id, item.library_file_id)
- await self._power_off_if_needed(db, item)
- return
- # Library files store absolute paths
- lib_path = Path(library_file.file_path)
- file_path = lib_path if lib_path.is_absolute() else settings.base_dir / library_file.file_path
- filename = library_file.filename
- # Create archive from library file so usage tracking has access to the 3MF
- try:
- from backend.app.services.archive import ArchiveService
- archive_service = ArchiveService(db)
- archive = await archive_service.archive_print(
- printer_id=item.printer_id,
- source_file=file_path,
- original_filename=filename,
- created_by_id=item.created_by_id,
- project_id=item.project_id,
- )
- if archive:
- item.archive_id = archive.id
- await db.flush()
- logger.info(
- "Queue item %s: Created archive %s from library file %s",
- item.id,
- archive.id,
- item.library_file_id,
- )
- except Exception as e:
- logger.warning("Queue item %s: Failed to create archive from library file: %s", item.id, e)
- else:
- # Neither archive nor library file specified
- item.status = "failed"
- item.error_message = "No source file specified"
- item.completed_at = datetime.now(timezone.utc)
- await db.commit()
- logger.error("Queue item %s: No archive_id or library_file_id specified", item.id)
- await self._power_off_if_needed(db, item)
- return
- # Check file exists on disk
- if not file_path.exists():
- item.status = "failed"
- item.error_message = "Source file not found on disk"
- item.completed_at = datetime.now(timezone.utc)
- await db.commit()
- logger.error("Queue item %s: File not found: %s", item.id, file_path)
- await self._power_off_if_needed(db, item)
- return
- # G-code injection for auto-print systems (#422)
- injected_path = None
- if item.gcode_injection:
- try:
- snippets_raw = await self._get_setting(db, "gcode_snippets")
- if snippets_raw:
- snippets = json.loads(snippets_raw)
- model_snippets = snippets.get(printer.model, {})
- start_gc = (model_snippets.get("start_gcode") or "").strip()
- end_gc = (model_snippets.get("end_gcode") or "").strip()
- if start_gc or end_gc:
- from backend.app.utils.threemf_tools import inject_gcode_into_3mf
- injected_path = inject_gcode_into_3mf(
- file_path, item.plate_id or 1, start_gc or None, end_gc or None
- )
- if injected_path:
- file_path = injected_path
- logger.info("Queue item %s: G-code injected for model %s", item.id, printer.model)
- else:
- logger.warning(
- "Queue item %s: G-code injection returned no result, using original", item.id
- )
- except Exception as e:
- logger.warning("Queue item %s: G-code injection failed, using original: %s", item.id, e)
- # Upload file to printer via FTP
- # Use a clean filename to avoid issues with double extensions like .gcode.3mf
- base_name = filename
- if base_name.endswith(".gcode.3mf"):
- base_name = base_name[:-10] # Remove .gcode.3mf
- elif base_name.endswith(".3mf"):
- base_name = base_name[:-4] # Remove .3mf
- remote_filename = f"{base_name}.3mf"
- # Sanitize: firmware parses ftp://{filename} as a URL, spaces break it
- remote_filename = remote_filename.replace(" ", "_")
- # Upload to root directory (not /cache/) - the start_print command references
- # files by name only (ftp://{filename}), so they must be in the root
- remote_path = f"/{remote_filename}"
- # Get FTP retry settings
- ftp_retry_enabled, ftp_retry_count, ftp_retry_delay, ftp_timeout = await get_ftp_retry_settings()
- logger.info(
- f"Queue item {item.id}: FTP upload starting - printer={printer.name} ({printer.model}), "
- f"ip={printer.ip_address}, file={remote_filename}, local_path={file_path}, "
- f"retry_enabled={ftp_retry_enabled}, retry_count={ftp_retry_count}, timeout={ftp_timeout}"
- )
- # Delete existing file if present (avoids 553 error on overwrite)
- try:
- logger.debug("Queue item %s: Deleting existing file %s if present...", item.id, remote_path)
- delete_result = await delete_file_async(
- printer.ip_address,
- printer.access_code,
- remote_path,
- socket_timeout=ftp_timeout,
- printer_model=printer.model,
- )
- logger.debug("Queue item %s: Delete result: %s", item.id, delete_result)
- except Exception as e:
- logger.debug("Queue item %s: Delete failed (may not exist): %s", item.id, e)
- try:
- if ftp_retry_enabled:
- uploaded = await with_ftp_retry(
- upload_file_async,
- printer.ip_address,
- printer.access_code,
- file_path,
- remote_path,
- socket_timeout=ftp_timeout,
- printer_model=printer.model,
- max_retries=ftp_retry_count,
- retry_delay=ftp_retry_delay,
- operation_name=f"Upload print to {printer.name}",
- )
- else:
- uploaded = await upload_file_async(
- printer.ip_address,
- printer.access_code,
- file_path,
- remote_path,
- socket_timeout=ftp_timeout,
- printer_model=printer.model,
- )
- except Exception as e:
- uploaded = False
- logger.error("Queue item %s: FTP error: %s (type: %s)", item.id, e, type(e).__name__)
- # Clean up injected temp file after upload attempt
- if injected_path and injected_path.exists():
- injected_path.unlink(missing_ok=True)
- if not uploaded:
- error_msg = (
- "Failed to upload file to printer. Check if SD card is inserted and properly formatted (FAT32/exFAT). "
- "See server logs for detailed diagnostics."
- )
- item.status = "failed"
- item.error_message = error_msg
- item.completed_at = datetime.now(timezone.utc)
- await db.commit()
- logger.error(
- f"Queue item {item.id}: FTP upload failed - printer={printer.name}, model={printer.model}, "
- f"ip={printer.ip_address}. Check logs above for storage diagnostics and specific error codes."
- )
- # Send failure notification
- await notification_service.on_queue_job_failed(
- job_name=filename.replace(".gcode.3mf", "").replace(".3mf", ""),
- printer_id=printer.id,
- printer_name=printer.name,
- reason="Failed to upload file to printer",
- db=db,
- )
- await self._power_off_if_needed(db, item)
- return
- # Parse AMS mapping if stored
- ams_mapping = None
- if item.ams_mapping:
- try:
- ams_mapping = json.loads(item.ams_mapping)
- except json.JSONDecodeError:
- logger.warning("Queue item %s: Invalid AMS mapping JSON, ignoring", item.id)
- # Register as expected print so we don't create a duplicate archive
- # Only applicable for archive-based prints
- if archive:
- from backend.app.main import register_expected_print
- register_expected_print(
- item.printer_id,
- remote_filename,
- archive.id,
- ams_mapping=ams_mapping,
- created_by_id=item.created_by_id,
- )
- # IMPORTANT: Set status to "printing" BEFORE sending the print command.
- # This prevents phantom reprints if the backend crashes/restarts after the
- # print command is sent but before the status update is committed.
- # If we crash after this commit but before start_print(), the item will be
- # in "printing" status without actually printing - but that's safer than
- # accidentally reprinting the same file hours later.
- item.status = "printing"
- item.started_at = datetime.now(timezone.utc)
- await db.commit()
- # Clear the awaiting-plate-clear flag now that we're starting a new print
- printer_manager.set_awaiting_plate_clear(item.printer_id, False)
- logger.info("Queue item %s: Status set to 'printing', sending print command...", item.id)
- # Capture state before dispatch so the watchdog can detect whether the
- # printer actually transitioned (#967). Also capture subtask_id so the
- # watchdog can recognise "command landed but state hasn't flipped yet"
- # on slow H2D transitions (#1078).
- pre_status = printer_manager.get_status(item.printer_id)
- pre_state = getattr(pre_status, "state", None) if pre_status else None
- pre_subtask_id = getattr(pre_status, "subtask_id", None) if pre_status else None
- pre_gcode_file = getattr(pre_status, "gcode_file", None) if pre_status else None
- # Start the print with AMS mapping, plate_id and print options
- started = printer_manager.start_print(
- item.printer_id,
- remote_filename,
- plate_id=item.plate_id or 1,
- ams_mapping=ams_mapping,
- bed_levelling=item.bed_levelling,
- flow_cali=item.flow_cali,
- vibration_cali=item.vibration_cali,
- layer_inspect=item.layer_inspect,
- timelapse=item.timelapse,
- use_ams=item.use_ams,
- )
- if started:
- logger.info("Queue item %s: Print started successfully - %s", item.id, filename)
- # Register the local 3MF in the cover-cache so /cover skips FTP
- # (#1166 follow-up). file_path was resolved earlier from either the
- # archive or the library file row.
- if file_path is not None:
- cache_3mf_download(item.printer_id, remote_filename, file_path)
- # Hold the printer against further dispatches until the watchdog
- # confirms the printer transitioned (or until the hard timeout).
- # Prevents multi-plate batches from triple-dispatching onto the
- # same H2D Pro while it digests the first project_file (#1157).
- self._mark_printer_dispatched(item.printer_id, pre_state, pre_subtask_id)
- # Watchdog: if the printer never transitions out of pre_state AND
- # never advances subtask_id, the MQTT publish was accepted locally but
- # didn't reach the printer (half-broken session — same shape as
- # #887/#936). Revert the queue item so the next dispatch can pick it
- # up instead of leaving it stuck in "printing" (#967). subtask_id
- # check avoids false reverts on slow H2D FINISH→PREPARE transitions
- # that would otherwise cause the item to re-dispatch as a reprint
- # of the just-finished job (#1078).
- if pre_state:
- asyncio.create_task(
- self._watchdog_print_start(
- item.id,
- item.printer_id,
- pre_state,
- pre_subtask_id,
- pre_gcode_file,
- )
- )
- # Get estimated time for notification
- estimated_time = None
- if archive and archive.print_time_seconds:
- estimated_time = archive.print_time_seconds
- elif library_file and library_file.print_time_seconds:
- estimated_time = library_file.print_time_seconds
- # Send job started notification
- await notification_service.on_queue_job_started(
- job_name=filename.replace(".gcode.3mf", "").replace(".3mf", ""),
- printer_id=printer.id,
- printer_name=printer.name,
- db=db,
- estimated_time=estimated_time,
- )
- # MQTT relay - publish queue job started
- try:
- from backend.app.services.mqtt_relay import mqtt_relay
- await mqtt_relay.on_queue_job_started(
- job_id=item.id,
- filename=filename,
- printer_id=printer.id,
- printer_name=printer.name,
- printer_serial=printer.serial_number,
- )
- except Exception:
- pass # Don't fail if MQTT fails
- else:
- # Clean up uploaded file from SD card to prevent phantom prints
- try:
- await delete_file_async(
- printer.ip_address,
- printer.access_code,
- remote_path,
- printer_model=printer.model,
- )
- except Exception:
- pass # Best-effort — don't fail the error handler
- # Print command failed - revert status
- item.status = "failed"
- item.error_message = "Failed to send print command to printer"
- item.completed_at = datetime.now(timezone.utc)
- await db.commit()
- logger.error(
- f"Queue item {item.id}: Failed to start print on {printer.name} ({printer.model}) - "
- f"printer_manager.start_print() returned False. "
- f"This may indicate: printer not connected, MQTT error, unsupported model configuration, or firmware issue. "
- f"Check printer status and backend logs for details."
- )
- # Send failure notification
- await notification_service.on_queue_job_failed(
- job_name=filename.replace(".gcode.3mf", "").replace(".3mf", ""),
- printer_id=printer.id,
- printer_name=printer.name,
- reason="Failed to send print command to printer - check printer connection and status",
- db=db,
- )
- await self._power_off_if_needed(db, item)
- @staticmethod
- async def _watchdog_print_start(
- queue_item_id: int,
- printer_id: int,
- pre_state: str,
- pre_subtask_id: str | None = None,
- pre_gcode_file: str | None = None,
- timeout: float = 90.0,
- poll_interval: float = 3.0,
- ) -> None:
- """Revert a queue item if the printer never acknowledges the start command.
- Bambuddy optimistically marks the queue item as "printing" right after the
- MQTT project_file publish succeeds locally. If the printer drops/ignores the
- command (half-broken MQTT session — #887/#936), the state never transitions
- and the item would otherwise stay stuck in "printing" forever (#967).
- Exit paths (printer picked up the job — no revert):
- - gcode_state changed from pre_state, OR
- - subtask_id advanced past pre_subtask_id — the printer echoes our
- per-dispatch identity back on push_status, so a subtask_id change is
- a definitive "command landed" signal even while state is still FINISH.
- H2D can sit at FINISH for ~50 s after accepting project_file before
- transitioning to PREPARE, which used to trip the state-only watchdog
- and caused the scheduler to revert + re-dispatch the item; the next
- successful dispatch then looked like a reprint of the just-finished
- job (#1078).
- Timeout raised from 45 s → 90 s as belt-and-braces for slow transitions
- that also don't emit an early subtask_id tick.
- """
- deadline = time.monotonic() + timeout
- last_status = None
- while time.monotonic() < deadline:
- await asyncio.sleep(poll_interval)
- status = printer_manager.get_status(printer_id)
- if not status:
- # Printer disconnected — don't mess with the DB. Drop the
- # in-memory dispatch hold too so a fresh dispatch can retry
- # once the printer comes back; the hard timeout would
- # otherwise hold the printer unnecessarily.
- scheduler._release_dispatch_hold(printer_id)
- return
- last_status = status
- if status.state in _ACTIVE_PRINT_STATES:
- # Printer is actively processing the job — release the
- # post-dispatch hold so the next pending item for this printer
- # can be evaluated normally. We do NOT accept arbitrary state
- # transitions: a printer going FINISH -> IDLE (user dismissed
- # the post-print prompt without accepting our project_file)
- # would otherwise look like "command landed" and leave the
- # queue item stuck in 'printing' forever (#1370).
- scheduler._release_dispatch_hold(printer_id)
- return
- if pre_subtask_id is not None and status.subtask_id is not None and status.subtask_id != pre_subtask_id:
- # Printer picked up the job (subtask_id advanced). H2D can
- # sit at FINISH for ~50 s after accepting project_file
- # before transitioning to PREPARE, but the subtask_id flips
- # to our submission_id almost immediately (#1078).
- scheduler._release_dispatch_hold(printer_id)
- return
- # No transition. Revert the item so the scheduler can retry.
- # Drop the in-memory hold so the retry isn't blocked by it.
- scheduler._release_dispatch_hold(printer_id)
- # Three outcomes from the revert attempt, each routed differently:
- # "reverted": row flipped from printing -> pending, run recovery
- # "already_moved_on": item.status != 'printing' (completed/cancelled by
- # on_print_complete or user). Skip recovery entirely
- # — the print clearly landed somewhere even if the
- # watchdog didn't see the active-state transition.
- # "revert_failed": SQLite contention exhausted retries. Still run
- # recovery so the MQTT session gets a fresh client_id
- # on the half-broken-session path.
- async def _do_revert(db):
- item = await db.get(PrintQueueItem, queue_item_id)
- if not item or item.status != "printing":
- return "already_moved_on"
- item.status = "pending"
- item.started_at = None
- await db.commit()
- return "reverted"
- try:
- revert_outcome = await run_with_retry(_do_revert, label=f"watchdog revert item={queue_item_id}")
- except Exception as e:
- logger.warning(
- "Queue item %s: failed to revert to 'pending' (printer %d): %s — "
- "scheduler may keep treating this item as in-flight",
- queue_item_id,
- printer_id,
- e,
- )
- revert_outcome = "revert_failed"
- if revert_outcome == "already_moved_on":
- # Preserves the pre-#1370 early-return: if on_print_complete (or any
- # other path) already moved the item past 'printing', don't run the
- # MQTT session-recovery logic below — a forced reconnect on a healthy
- # session breaks ongoing prints on the same printer.
- return
- if revert_outcome == "reverted":
- logger.warning(
- "Queue item %s: printer %d did not respond to print command within "
- "%.0fs (state still %s, subtask_id still %s) — reverted to 'pending' "
- "for retry (#967)",
- queue_item_id,
- printer_id,
- timeout,
- pre_state,
- pre_subtask_id,
- )
- # Same #1150 / #887/#936 discriminator as background_dispatch: if the
- # printer's gcode_file changed since pre-dispatch, the project_file
- # command landed and the printer is parsing — a forced reconnect
- # mid-parse triggers 0500_4003. If gcode_file is unchanged, the
- # publish was silently swallowed (#887/#936) and the original
- # force_reconnect recovery is what we want.
- client = printer_manager.get_client(printer_id)
- current_gcode_file = getattr(last_status, "gcode_file", None) if last_status else None
- publish_landed = current_gcode_file is not None and current_gcode_file != pre_gcode_file
- if publish_landed:
- logger.warning(
- "Queue item %s: gcode_file changed to %r (was %r) — printer "
- "received the command and is parsing slowly. Skipping forced "
- "MQTT reconnect to avoid 0500_4003 mid-parse (#1150).",
- queue_item_id,
- current_gcode_file,
- pre_gcode_file,
- )
- elif client and hasattr(client, "force_reconnect_stale_session"):
- client.force_reconnect_stale_session(
- f"queue print command unacknowledged after {timeout:.0f}s "
- f"(state still {pre_state}, gcode_file {current_gcode_file!r})"
- )
- # Global scheduler instance
- scheduler = PrintScheduler()
|