| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419 |
- import asyncio
- import logging
- import re
- import zipfile
- from fastapi import APIRouter, Depends, HTTPException, Query
- from fastapi.responses import Response
- from sqlalchemy import func, select
- from sqlalchemy.ext.asyncio import AsyncSession
- from backend.app.core.auth import RequireCameraStreamTokenIfAuthEnabled, RequirePermissionIfAuthEnabled
- from backend.app.core.config import settings
- from backend.app.core.database import get_db
- from backend.app.core.permissions import Permission
- from backend.app.models.ams_label import AmsLabel
- from backend.app.models.printer import Printer
- from backend.app.models.slot_preset import SlotPresetMapping
- from backend.app.schemas.printer import (
- AmsLabelBody,
- AMSTray,
- AMSUnit,
- DiagnosticRequest,
- FilaSwitchResponse,
- HMSErrorResponse,
- NozzleInfoResponse,
- NozzleRackSlot,
- PrinterCreate,
- PrinterDiagnosticResult,
- PrinterResponse,
- PrinterStatus,
- PrinterUpdate,
- PrintOptionsResponse,
- )
- from backend.app.services.bambu_ftp import (
- cache_3mf_download,
- delete_file_async,
- download_file_bytes_async,
- download_file_try_paths_async,
- get_cached_3mf,
- get_storage_info_async,
- list_files_async,
- )
- from backend.app.services.printer_diagnostic import run_connection_diagnostic
- from backend.app.services.printer_manager import (
- get_derived_status_name,
- printer_manager,
- resolve_plate_id,
- supports_chamber_temp,
- supports_drying,
- )
- from backend.app.utils.http import build_content_disposition
- logger = logging.getLogger(__name__)
- router = APIRouter(prefix="/printers", tags=["printers"])
- @router.get("/", response_model=list[PrinterResponse])
- async def list_printers(
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_READ),
- db: AsyncSession = Depends(get_db),
- ):
- """List all configured printers."""
- result = await db.execute(select(Printer).order_by(Printer.name))
- return list(result.scalars().all())
- @router.post("/", response_model=PrinterResponse)
- async def create_printer(
- printer_data: PrinterCreate,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CREATE),
- db: AsyncSession = Depends(get_db),
- ):
- """Add a new printer.
- Verifies the MQTT connection succeeds before persisting. A wrong access
- code or unreachable IP would otherwise create a printer row that shows
- as an empty / never-connecting card on the dashboard — those reports
- were turning into support tickets that all traced back to a mistyped
- access code.
- """
- # Check if serial number already exists
- result = await db.execute(select(Printer).where(Printer.serial_number == printer_data.serial_number))
- if result.scalar_one_or_none():
- raise HTTPException(400, "Printer with this serial number already exists")
- test_result = await printer_manager.test_connection(
- ip_address=printer_data.ip_address,
- serial_number=printer_data.serial_number,
- access_code=printer_data.access_code,
- )
- if not test_result.get("success"):
- # The frontend renders the user-facing message via i18n on `code`;
- # `message` is an English fallback for non-UI clients (curl / scripts).
- raise HTTPException(
- status_code=400,
- detail={
- "code": "printer_connection_failed",
- "message": (
- "Could not connect to the printer. Verify IP address, serial number, "
- "and access code, and confirm LAN-only mode is enabled. "
- "The printer was not added."
- ),
- },
- )
- printer = Printer(**printer_data.model_dump())
- db.add(printer)
- await db.commit()
- await db.refresh(printer)
- # Connect to the printer
- if printer.is_active:
- await printer_manager.connect_printer(printer)
- return printer
- @router.get("/usb-cameras")
- async def list_usb_cameras(
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_READ),
- ):
- """List available USB cameras connected to the system.
- Returns a list of detected V4L2 video devices with their info.
- Only works on Linux systems with V4L2 support.
- Returns:
- List of dicts with {device: str, name: str, capabilities: list, formats?: list}
- """
- from backend.app.services.external_camera import list_usb_cameras
- cameras = list_usb_cameras()
- return {"cameras": cameras}
- @router.get("/available-filaments")
- async def get_available_filaments(
- model: str = Query(..., description="Target printer model"),
- location: str | None = Query(None, description="Optional location filter"),
- _=RequirePermissionIfAuthEnabled(Permission.QUEUE_CREATE),
- db: AsyncSession = Depends(get_db),
- ):
- """Get deduplicated list of filaments loaded across all active printers of a given model.
- Used by the frontend to offer filament override options for model-based queue assignment.
- """
- from backend.app.utils.printer_models import normalize_printer_model, normalize_printer_model_id
- # Normalize model name
- normalized_model = normalize_printer_model(model) or normalize_printer_model_id(model) or model
- query = (
- select(Printer).where(func.lower(Printer.model) == normalized_model.lower()).where(Printer.is_active == True) # noqa: E712
- )
- if location:
- query = query.where(Printer.location == location)
- result = await db.execute(query)
- printers_list = list(result.scalars().all())
- if not printers_list:
- return []
- # Collect filaments from all matching printers
- # Dedup key includes extruder_id and tray_sub_brands so "PLA Basic" and "PLA Matte" appear separately
- seen: set[tuple[str, str, str, int | None]] = set() # (type_upper, color_normalized, sub_brands_upper, extruder_id)
- filaments = []
- for printer in printers_list:
- status = printer_manager.get_status(printer.id)
- if not status:
- continue
- # Get ams_extruder_map for dual-nozzle printers
- ams_extruder_map = status.raw_data.get("ams_extruder_map", {})
- # AMS trays
- for ams_unit in status.raw_data.get("ams", []):
- ams_id = str(ams_unit.get("id", 0))
- extruder_id = ams_extruder_map.get(ams_id)
- for tray in ams_unit.get("tray", []):
- tray_type = tray.get("tray_type")
- if not tray_type:
- continue
- tray_color = tray.get("tray_color", "")
- # Normalize color: remove alpha, add hash
- hex_color = tray_color.replace("#", "")[:6] if tray_color else "808080"
- color = f"#{hex_color}"
- tray_info_idx = tray.get("tray_info_idx", "")
- tray_sub_brands = tray.get("tray_sub_brands", "") or ""
- key = (tray_type.upper(), hex_color.lower(), tray_sub_brands.upper(), extruder_id)
- if key not in seen:
- seen.add(key)
- filaments.append(
- {
- "type": tray_type,
- "color": color,
- "tray_info_idx": tray_info_idx,
- "tray_sub_brands": tray_sub_brands,
- "extruder_id": extruder_id,
- }
- )
- # External spools (vt_tray)
- for vt in status.raw_data.get("vt_tray") or []:
- vt_type = vt.get("tray_type")
- if not vt_type:
- continue
- vt_color = vt.get("tray_color", "")
- hex_color = vt_color.replace("#", "")[:6] if vt_color else "808080"
- color = f"#{hex_color}"
- tray_info_idx = vt.get("tray_info_idx", "")
- tray_sub_brands = vt.get("tray_sub_brands", "") or ""
- vt_id = int(vt.get("id", 254))
- extruder_id = (255 - vt_id) if ams_extruder_map else None
- key = (vt_type.upper(), hex_color.lower(), tray_sub_brands.upper(), extruder_id)
- if key not in seen:
- seen.add(key)
- filaments.append(
- {
- "type": vt_type,
- "color": color,
- "tray_info_idx": tray_info_idx,
- "tray_sub_brands": tray_sub_brands,
- "extruder_id": extruder_id,
- }
- )
- return filaments
- @router.get("/developer-mode-warnings")
- async def get_developer_mode_warnings(
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_READ),
- db: AsyncSession = Depends(get_db),
- ):
- """Check if any connected printer lacks developer LAN mode."""
- result = await db.execute(select(Printer).where(Printer.is_active == True)) # noqa: E712
- printers = result.scalars().all()
- statuses = printer_manager.get_all_statuses()
- warnings = []
- for printer in printers:
- state = statuses.get(printer.id)
- if state and state.connected and state.developer_mode is False:
- warnings.append(
- {
- "printer_id": printer.id,
- "name": printer.name,
- }
- )
- return warnings
- @router.get("/{printer_id}", response_model=PrinterResponse)
- async def get_printer(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_READ),
- db: AsyncSession = Depends(get_db),
- ):
- """Get a specific printer."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- return printer
- @router.patch("/{printer_id}", response_model=PrinterResponse)
- async def update_printer(
- printer_id: int,
- printer_data: PrinterUpdate,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_UPDATE),
- db: AsyncSession = Depends(get_db),
- ):
- """Update a printer."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- update_data = printer_data.model_dump(exclude_unset=True)
- # Handle nested ROI object - flatten to individual columns
- if "plate_detection_roi" in update_data:
- roi = update_data.pop("plate_detection_roi")
- if roi:
- update_data["plate_detection_roi_x"] = roi.get("x")
- update_data["plate_detection_roi_y"] = roi.get("y")
- update_data["plate_detection_roi_w"] = roi.get("w")
- update_data["plate_detection_roi_h"] = roi.get("h")
- else:
- # Clear ROI if set to null
- update_data["plate_detection_roi_x"] = None
- update_data["plate_detection_roi_y"] = None
- update_data["plate_detection_roi_w"] = None
- update_data["plate_detection_roi_h"] = None
- for field, value in update_data.items():
- setattr(printer, field, value)
- await db.commit()
- await db.refresh(printer)
- # Reconnect if connection settings changed
- if any(k in update_data for k in ["ip_address", "access_code", "is_active"]):
- printer_manager.disconnect_printer(printer_id)
- if printer.is_active:
- await printer_manager.connect_printer(printer)
- return printer
- @router.delete("/{printer_id}")
- async def delete_printer(
- printer_id: int,
- delete_archives: bool = True,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_DELETE),
- db: AsyncSession = Depends(get_db),
- ):
- """Delete a printer.
- Args:
- printer_id: ID of the printer to delete
- delete_archives: If True (default), delete all print archives for this printer.
- If False, keep archives but remove their printer association.
- """
- from sqlalchemy import delete as sql_delete
- from backend.app.models.archive import PrintArchive
- from backend.app.models.maintenance import MaintenanceHistory, PrinterMaintenance
- from backend.app.models.spoolman_slot_assignment import SpoolmanSlotAssignment
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- printer_manager.disconnect_printer(printer_id)
- if delete_archives:
- # Delete all archives for this printer
- await db.execute(sql_delete(PrintArchive).where(PrintArchive.printer_id == printer_id))
- else:
- # Orphan the archives instead of deleting them
- from sqlalchemy import update
- await db.execute(update(PrintArchive).where(PrintArchive.printer_id == printer_id).values(printer_id=None))
- # Delete slot assignments for this printer (SQLite doesn't enforce FK cascades)
- await db.execute(sql_delete(SpoolmanSlotAssignment).where(SpoolmanSlotAssignment.printer_id == printer_id))
- # Delete maintenance history and items for this printer
- # (SQLite doesn't enforce FK cascades, so do it explicitly)
- maintenance_ids = (
- (await db.execute(select(PrinterMaintenance.id).where(PrinterMaintenance.printer_id == printer_id)))
- .scalars()
- .all()
- )
- if maintenance_ids:
- await db.execute(
- sql_delete(MaintenanceHistory).where(MaintenanceHistory.printer_maintenance_id.in_(maintenance_ids))
- )
- await db.execute(sql_delete(PrinterMaintenance).where(PrinterMaintenance.printer_id == printer_id))
- await db.delete(printer)
- await db.commit()
- return {"status": "deleted", "archives_deleted": delete_archives}
- @router.get("/{printer_id}/status", response_model=PrinterStatus)
- async def get_printer_status(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_READ),
- db: AsyncSession = Depends(get_db),
- ):
- """Get real-time status of a printer."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- state = printer_manager.get_status(printer_id)
- if not state:
- return PrinterStatus(
- id=printer_id,
- name=printer.name,
- connected=False,
- )
- # Determine cover URL if there's an active print (including paused)
- cover_url = None
- if state.state in ("RUNNING", "PAUSE") and state.gcode_file:
- cover_url = f"/api/v1/printers/{printer_id}/cover"
- # Convert HMS errors to response format
- hms_errors = [
- HMSErrorResponse(code=e.code, attr=e.attr, module=e.module, severity=e.severity)
- for e in (state.hms_errors or [])
- ]
- # Parse AMS data from raw_data
- ams_units = []
- vt_tray = []
- ams_exists = False
- raw_data = state.raw_data or {}
- # Build K-profile lookup map: cali_idx -> k_value
- # This allows looking up the calibrated K value for each AMS slot
- kprofile_map: dict[int, float] = {}
- for kp in state.kprofiles or []:
- if kp.slot_id is not None and kp.k_value:
- try:
- kprofile_map[kp.slot_id] = float(kp.k_value)
- except (ValueError, TypeError):
- pass # Skip K-profile entries with unparseable values
- if "ams" in raw_data and isinstance(raw_data["ams"], list):
- ams_exists = True
- for ams_data in raw_data["ams"]:
- # Skip if ams_data is not a dict (defensive check)
- if not isinstance(ams_data, dict):
- continue
- trays = []
- for tray_data in ams_data.get("tray", []):
- # Filter out empty/invalid tag values
- tag_uid = tray_data.get("tag_uid", "")
- if tag_uid in ("", "0000000000000000"):
- tag_uid = None
- tray_uuid = tray_data.get("tray_uuid", "")
- if tray_uuid in ("", "00000000000000000000000000000000"):
- tray_uuid = None
- # Get K value: first try tray's k field, then lookup from K-profiles
- k_value = tray_data.get("k")
- cali_idx = tray_data.get("cali_idx")
- if k_value is None and cali_idx is not None and cali_idx in kprofile_map:
- k_value = kprofile_map[cali_idx]
- trays.append(
- AMSTray(
- id=tray_data.get("id", 0),
- tray_color=tray_data.get("tray_color"),
- tray_type=tray_data.get("tray_type"),
- tray_sub_brands=tray_data.get("tray_sub_brands"),
- tray_id_name=tray_data.get("tray_id_name"),
- tray_info_idx=tray_data.get("tray_info_idx"),
- remain=tray_data.get("remain", 0),
- k=k_value,
- cali_idx=cali_idx,
- tag_uid=tag_uid,
- tray_uuid=tray_uuid,
- nozzle_temp_min=tray_data.get("nozzle_temp_min"),
- nozzle_temp_max=tray_data.get("nozzle_temp_max"),
- drying_temp=tray_data.get("drying_temp"),
- drying_time=tray_data.get("drying_time"),
- state=tray_data.get("state"),
- )
- )
- # Prefer humidity_raw (percentage) over humidity (index 1-5)
- # humidity_raw is the actual percentage value from the sensor
- humidity_raw = ams_data.get("humidity_raw")
- humidity_idx = ams_data.get("humidity")
- humidity_value = None
- if humidity_raw is not None:
- try:
- humidity_value = int(humidity_raw)
- except (ValueError, TypeError):
- pass # Skip unparseable humidity; will try index fallback
- if humidity_value is None and humidity_idx is not None:
- try:
- humidity_value = int(humidity_idx)
- except (ValueError, TypeError):
- pass # Skip unparseable humidity index; humidity remains None
- # AMS-HT has 1 tray, regular AMS has 4 trays
- is_ams_ht = len(trays) == 1
- ams_units.append(
- AMSUnit(
- id=ams_data.get("id", 0),
- humidity=humidity_value,
- temp=ams_data.get("temp"),
- is_ams_ht=is_ams_ht,
- tray=trays,
- # Serial number: Bambu MQTT uses "sn" key on AMS unit objects
- serial_number=str(ams_data.get("sn") or ams_data.get("serial_number") or ""),
- # Firmware version: populated by _handle_version_info from info.module ams/* entries
- sw_ver=str(ams_data.get("sw_ver") or ""),
- # Drying: dry_time > 0 means drying is active (minutes remaining)
- dry_time=int(ams_data.get("dry_time") or 0),
- module_type=str(ams_data.get("module_type") or ""),
- )
- )
- # Virtual tray (external spool holder) - comes from vt_tray in raw_data (list)
- if "vt_tray" in raw_data:
- for vt_data in raw_data["vt_tray"]:
- # Filter out empty/invalid tag values for vt_tray
- vt_tag_uid = vt_data.get("tag_uid", "")
- if vt_tag_uid in ("", "0000000000000000"):
- vt_tag_uid = None
- vt_tray_uuid = vt_data.get("tray_uuid", "")
- if vt_tray_uuid in ("", "00000000000000000000000000000000"):
- vt_tray_uuid = None
- # Get K value: first try tray's k field, then lookup from K-profiles
- vt_k_value = vt_data.get("k")
- vt_cali_idx = vt_data.get("cali_idx")
- if vt_k_value is None and vt_cali_idx is not None and vt_cali_idx in kprofile_map:
- vt_k_value = kprofile_map[vt_cali_idx]
- tray_id = int(vt_data.get("id", 254))
- vt_tray.append(
- AMSTray(
- id=tray_id,
- tray_color=vt_data.get("tray_color"),
- tray_type=vt_data.get("tray_type"),
- tray_sub_brands=vt_data.get("tray_sub_brands"),
- tray_id_name=vt_data.get("tray_id_name"),
- tray_info_idx=vt_data.get("tray_info_idx"),
- remain=vt_data.get("remain", 0),
- k=vt_k_value,
- cali_idx=vt_cali_idx,
- tag_uid=vt_tag_uid,
- tray_uuid=vt_tray_uuid,
- nozzle_temp_min=vt_data.get("nozzle_temp_min"),
- nozzle_temp_max=vt_data.get("nozzle_temp_max"),
- )
- )
- # Convert nozzle info to response format
- nozzles = [
- NozzleInfoResponse(
- nozzle_type=n.nozzle_type,
- nozzle_diameter=n.nozzle_diameter,
- )
- for n in (state.nozzles or [])
- ]
- # H2C nozzle rack (tool-changer dock positions)
- nozzle_rack = [
- NozzleRackSlot(
- id=n.get("id", 0),
- nozzle_type=n.get("type", ""),
- nozzle_diameter=n.get("diameter", ""),
- wear=n.get("wear"),
- stat=n.get("stat"),
- max_temp=n.get("max_temp", 0),
- serial_number=n.get("serial_number", ""),
- filament_color=n.get("filament_color", ""),
- filament_id=n.get("filament_id", ""),
- filament_type=n.get("filament_type", ""),
- )
- for n in (state.nozzle_rack or [])
- ]
- # Convert print options to response format
- print_options = PrintOptionsResponse(
- spaghetti_detector=state.print_options.spaghetti_detector,
- print_halt=state.print_options.print_halt,
- halt_print_sensitivity=state.print_options.halt_print_sensitivity,
- first_layer_inspector=state.print_options.first_layer_inspector,
- printing_monitor=state.print_options.printing_monitor,
- buildplate_marker_detector=state.print_options.buildplate_marker_detector,
- allow_skip_parts=state.print_options.allow_skip_parts,
- nozzle_clumping_detector=state.print_options.nozzle_clumping_detector,
- nozzle_clumping_sensitivity=state.print_options.nozzle_clumping_sensitivity,
- pileup_detector=state.print_options.pileup_detector,
- pileup_sensitivity=state.print_options.pileup_sensitivity,
- airprint_detector=state.print_options.airprint_detector,
- airprint_sensitivity=state.print_options.airprint_sensitivity,
- auto_recovery_step_loss=state.print_options.auto_recovery_step_loss,
- filament_tangle_detect=state.print_options.filament_tangle_detect,
- )
- # Get AMS mapping from raw_data (which AMS is connected to which nozzle)
- ams_mapping = raw_data.get("ams_mapping", [])
- # Get per-AMS extruder map from state attribute (not raw_data, to avoid race condition
- # where raw_data gets replaced during MQTT updates and ams_extruder_map is temporarily missing)
- ams_extruder_map = state.ams_extruder_map or {}
- logger.debug("API returning ams_mapping: %s, ams_extruder_map: %s", ams_mapping, ams_extruder_map)
- # tray_now from MQTT is already a global tray ID: (ams_id * 4) + slot_id
- # Per OpenBambuAPI docs: 254 = external spool, 255 = no filament, otherwise global tray ID
- # No conversion needed - just use the raw value directly
- tray_now = state.tray_now
- logger.debug("Using tray_now directly as global ID: %s", tray_now)
- # Filter out chamber temp for models that don't have a real sensor
- # P1P, P1S, A1, A1Mini report meaningless chamber_temper values
- temperatures = state.temperatures
- if not supports_chamber_temp(printer.model):
- temperatures = {
- k: v for k, v in temperatures.items() if k not in ("chamber", "chamber_target", "chamber_heating")
- }
- # Resolve the active print's archive + plate (#881 follow-up): lets the
- # printer card show the actual plate name for multi-plate 3MFs instead of
- # just the 3MF filename. Only attempted for active prints, since subtask_id
- # is only meaningful then.
- current_archive_id: int | None = None
- current_plate_id: int | None = None
- if state.state in ("RUNNING", "PAUSE"):
- current_plate_id = resolve_plate_id(state)
- if state.subtask_id:
- from backend.app.models.archive import PrintArchive
- archive_row = await db.execute(
- select(PrintArchive.id)
- .where(PrintArchive.subtask_id == state.subtask_id)
- .where(PrintArchive.printer_id == printer_id)
- .order_by(PrintArchive.created_at.desc())
- .limit(1)
- )
- current_archive_id = archive_row.scalar_one_or_none()
- return PrinterStatus(
- id=printer_id,
- name=printer.name,
- connected=state.connected,
- state=state.state,
- current_print=state.current_print,
- subtask_name=state.subtask_name,
- gcode_file=state.gcode_file,
- progress=state.progress,
- remaining_time=state.remaining_time,
- layer_num=state.layer_num,
- total_layers=state.total_layers,
- temperatures=temperatures,
- cover_url=cover_url,
- hms_errors=hms_errors,
- ams=ams_units,
- ams_exists=ams_exists,
- vt_tray=vt_tray,
- sdcard=state.sdcard,
- store_to_sdcard=state.store_to_sdcard,
- timelapse=state.timelapse,
- ipcam=state.ipcam,
- wifi_signal=state.wifi_signal,
- wired_network=state.wired_network,
- door_open=state.door_open,
- nozzles=nozzles,
- nozzle_rack=nozzle_rack,
- print_options=print_options,
- stg_cur=state.stg_cur,
- stg_cur_name=get_derived_status_name(state, printer.model),
- stg=state.stg,
- airduct_mode=state.airduct_mode,
- speed_level=state.speed_level,
- chamber_light=state.chamber_light,
- active_extruder=state.active_extruder,
- ams_mapping=ams_mapping,
- ams_extruder_map=ams_extruder_map,
- tray_now=tray_now,
- ams_status_main=state.ams_status_main,
- ams_status_sub=state.ams_status_sub,
- mc_print_sub_stage=state.mc_print_sub_stage,
- last_ams_update=state.last_ams_update,
- printable_objects_count=len(state.printable_objects),
- cooling_fan_speed=state.cooling_fan_speed,
- big_fan1_speed=state.big_fan1_speed,
- big_fan2_speed=state.big_fan2_speed,
- heatbreak_fan_speed=state.heatbreak_fan_speed,
- firmware_version=state.firmware_version,
- developer_mode=state.developer_mode if state else None,
- awaiting_plate_clear=printer_manager.is_awaiting_plate_clear(printer_id),
- supports_drying=supports_drying(printer.model, state.firmware_version),
- current_archive_id=current_archive_id,
- current_plate_id=current_plate_id,
- fila_switch=(
- FilaSwitchResponse(
- installed=state.fila_switch.installed,
- in_slots=list(state.fila_switch.in_slots),
- out_extruders=list(state.fila_switch.out_extruders),
- stat=state.fila_switch.stat,
- info=state.fila_switch.info,
- )
- if state.fila_switch and state.fila_switch.installed
- else None
- ),
- )
- @router.get("/{printer_id}/current-print-user")
- async def get_current_print_user(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_READ),
- db: AsyncSession = Depends(get_db),
- ):
- """Get the user who started the current print (for reprint tracking).
- Returns user info if available, empty object otherwise.
- This tracks users for reprints (which bypass the queue).
- For queue-based prints, use the queue item's created_by field instead.
- """
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- user_info = printer_manager.get_current_print_user(printer_id)
- return user_info or {}
- @router.post("/{printer_id}/refresh-status")
- async def refresh_printer_status(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_READ),
- db: AsyncSession = Depends(get_db),
- ):
- """Request a full status refresh from the printer (sends pushall command)."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- success = printer_manager.request_status_update(printer_id)
- if not success:
- raise HTTPException(400, "Printer not connected")
- return {"status": "refresh_requested"}
- @router.post("/{printer_id}/connect")
- async def connect_printer(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Manually connect to a printer."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- success = await printer_manager.connect_printer(printer)
- return {"connected": success}
- @router.post("/{printer_id}/disconnect")
- async def disconnect_printer(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Manually disconnect from a printer."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- printer_manager.disconnect_printer(printer_id)
- return {"connected": False}
- @router.post("/test")
- async def test_printer_connection(
- ip_address: str,
- serial_number: str,
- access_code: str,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CREATE),
- ):
- """Test connection to a printer without saving."""
- result = await printer_manager.test_connection(
- ip_address=ip_address,
- serial_number=serial_number,
- access_code=access_code,
- )
- return result
- @router.post("/diagnostic", response_model=PrinterDiagnosticResult)
- async def diagnose_connection(
- req: DiagnosticRequest,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CREATE),
- ):
- """Run connection diagnostics for the Add-Printer flow (printer not yet saved).
- When serial_number + access_code are supplied the MQTT credential check
- also runs; otherwise only the network-level checks are performed.
- """
- return await run_connection_diagnostic(
- req.ip_address,
- serial_number=req.serial_number or None,
- access_code=req.access_code or None,
- )
- @router.get("/{printer_id}/diagnostic", response_model=PrinterDiagnosticResult)
- async def diagnose_printer(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_READ),
- db: AsyncSession = Depends(get_db),
- ):
- """Run connection diagnostics for an existing saved printer."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- return await run_connection_diagnostic(printer.ip_address, printer=printer)
- # Cache for cover images (printer_id -> {(subtask_name, view_key) -> image_bytes}).
- # Cleared on every print start by main.py::on_print_start, so re-dispatches with
- # different plates always fetch a fresh thumbnail without needing plate in the key.
- _cover_cache: dict[int, dict[tuple[str, str], bytes]] = {}
- # Negative cache (#1420): when a cover lookup exhausts every FTP path with 550
- # (file sliced on SD card, not on printer storage), remember the failure so the
- # next request short-circuits to 404 instead of re-hammering FTP 8 paths deep.
- # Cleared on print start alongside _cover_cache.
- _cover_404_cache: dict[int, set[tuple[str, str]]] = {}
- def clear_cover_cache(printer_id: int) -> None:
- """Clear cached cover images for a printer. Call on print start to avoid stale thumbnails."""
- _cover_cache.pop(printer_id, None)
- _cover_404_cache.pop(printer_id, None)
- @router.get("/{printer_id}/cover")
- async def get_printer_cover(
- printer_id: int,
- view: str | None = None,
- db: AsyncSession = Depends(get_db),
- _: None = RequireCameraStreamTokenIfAuthEnabled,
- ):
- """Get the cover image for the current print job.
- Args:
- view: Optional view type. Use "top" for top-down build plate view (useful for skip objects).
- Default returns angled 3D perspective view.
- """
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- state = printer_manager.get_status(printer_id)
- if not state:
- raise HTTPException(404, "Printer not connected")
- # Use subtask_name as the 3MF filename (gcode_file is the path inside the 3MF)
- subtask_name = state.subtask_name
- if not subtask_name:
- raise HTTPException(404, f"No subtask_name in printer state (state={state.state})")
- # Resolve the active plate. Precedence (#1166):
- # 1. The plate Bambuddy dispatched (authoritative when we sent the print)
- # 2. plate_(\d+)\.gcode regex on state.gcode_file (works on firmware that
- # reflects the full path, e.g. some X1C builds)
- # 3. Scan the downloaded 3MF for a unique Metadata/plate_*.gcode (covers
- # per-plate archives sliced separately in Bambu Studio, where the
- # printer's gcode_file echo is just the .3mf filename)
- # 4. Fall back to plate 1
- # The 3MF-scan fallback runs later — after the file is on disk.
- plate_num = resolve_plate_id(state)
- if plate_num is not None:
- logger.info("Cover: resolved plate %s before download (subtask=%s)", plate_num, subtask_name)
- # Normalize view parameter
- view_key = view or "default"
- # Check cache. Cache by (subtask_name, view_key) only — clear_cover_cache()
- # runs on every print start, so a re-dispatch with a different plate gets
- # a fresh image regardless. Pre-#1166 the key included plate_num, but with
- # late plate resolution the cache check would always miss.
- cache_key = (subtask_name, view_key)
- if printer_id in _cover_cache and cache_key in _cover_cache[printer_id]:
- return Response(content=_cover_cache[printer_id][cache_key], media_type="image/png")
- # Negative-cache short-circuit (#1420): if a prior lookup for this same
- # subtask + view already failed, don't replay 8 FTP retries on every page
- # refresh. _cover_404_cache is cleared on print start.
- if printer_id in _cover_404_cache and cache_key in _cover_404_cache[printer_id]:
- raise HTTPException(404, f"No cover available for '{subtask_name}' (cached)")
- # Build possible 3MF filenames from subtask_name
- # Bambu printers may store files as "name.gcode.3mf" (sliced via Bambu Studio)
- # or just "name.3mf" (uploaded directly)
- possible_filenames = []
- if subtask_name.endswith(".3mf"):
- possible_filenames.append(subtask_name)
- else:
- # Try both naming patterns
- possible_filenames.append(f"{subtask_name}.gcode.3mf")
- possible_filenames.append(f"{subtask_name}.3mf")
- # Also try with spaces converted to underscores (Bambu Studio may normalize filenames)
- if " " in subtask_name:
- normalized = subtask_name.replace(" ", "_")
- if normalized.endswith(".3mf"):
- possible_filenames.append(normalized)
- else:
- possible_filenames.append(f"{normalized}.gcode.3mf")
- possible_filenames.append(f"{normalized}.3mf")
- # Build list of all remote paths to try
- remote_paths = []
- for filename in possible_filenames:
- remote_paths.extend(
- [
- f"/{filename}", # Root directory (most common)
- f"/cache/{filename}",
- f"/model/{filename}",
- f"/data/{filename}",
- ]
- )
- # Use first filename for temp path (will be reused)
- temp_filename = possible_filenames[0]
- temp_path = settings.archive_dir / "temp" / f"cover_{printer_id}_{temp_filename}"
- temp_path.parent.mkdir(parents=True, exist_ok=True)
- # Cache check (#972): the archive-metadata flow in main.py may have already
- # downloaded this 3MF during the print-start handler. Reusing that file
- # avoids a second 36MB transfer competing with the printer's single FTP
- # socket (which produces the 425 errors that feed the retry storm).
- downloaded = False
- using_cached = False
- for candidate_name in possible_filenames:
- cached = get_cached_3mf(printer_id, candidate_name)
- if cached:
- logger.info("Cover using cached 3MF from %s (avoided duplicate FTP)", cached)
- temp_path = cached
- downloaded = True
- using_cached = True
- break
- if not downloaded:
- logger.info(
- f"Trying to download cover for '{subtask_name}' from {printer.ip_address} (trying {len(remote_paths)} paths)"
- )
- # Retry logic for transient FTP failures
- max_retries = 2
- last_error = None
- for attempt in range(max_retries + 1):
- try:
- downloaded = await download_file_try_paths_async(
- printer.ip_address,
- printer.access_code,
- remote_paths,
- temp_path,
- printer_model=printer.model,
- )
- if downloaded:
- break
- except Exception as e:
- last_error = e
- if attempt < max_retries:
- logger.warning("FTP download attempt %s failed: %s, retrying...", attempt + 1, e)
- await asyncio.sleep(0.5 * (attempt + 1)) # Brief backoff
- else:
- logger.error("FTP download failed after %s attempts: %s", max_retries + 1, e)
- if last_error and not downloaded:
- raise HTTPException(503, f"FTP download temporarily unavailable: {last_error}")
- if not downloaded:
- # Remember this failure so subsequent requests for the same print
- # skip the 8-path FTP fan-out (#1420).
- _cover_404_cache.setdefault(printer_id, set()).add(cache_key)
- raise HTTPException(
- 404,
- f"Could not download 3MF file for '{subtask_name}' from printer {printer.ip_address}. Tried: {possible_filenames}",
- )
- # Share the fresh download with the archive flow.
- cache_3mf_download(printer_id, temp_filename, temp_path)
- # Verify file actually exists and has content
- if not temp_path.exists():
- raise HTTPException(500, f"Download reported success but file not found: {temp_path}")
- file_size = temp_path.stat().st_size
- logger.info("Downloaded file size: %s bytes", file_size)
- if file_size == 0:
- if not using_cached:
- temp_path.unlink()
- raise HTTPException(500, f"Downloaded file is empty for '{subtask_name}'")
- try:
- # Extract thumbnail from 3MF (which is a ZIP file)
- try:
- zf = zipfile.ZipFile(temp_path, "r")
- except zipfile.BadZipFile:
- raise HTTPException(500, "Downloaded file is not a valid 3MF/ZIP archive")
- except OSError as e:
- logger.error("Failed to open 3MF file: %s", e, exc_info=True)
- raise HTTPException(500, "Failed to open 3MF file. Check server logs for details.")
- try:
- # 3MF-scan fallback for plate detection (#1166). Per-plate archives
- # sliced separately in Bambu Studio contain a single
- # Metadata/plate_N.gcode for the active plate, even though
- # thumbnails for all plates are bundled. Using that gcode's plate
- # number prevents falling back to plate_1.png.
- if plate_num is None:
- plate_gcodes = [name for name in zf.namelist() if re.match(r"^Metadata/plate_\d+\.gcode$", name)]
- if len(plate_gcodes) == 1:
- match = re.search(r"plate_(\d+)\.gcode", plate_gcodes[0])
- if match:
- plate_num = int(match.group(1))
- logger.info("Cover: detected plate %s from 3MF contents", plate_num)
- if plate_num is None:
- plate_num = 1
- # Try common thumbnail paths in 3MF files
- # Use plate_num to get the correct plate's thumbnail for multi-plate projects
- # Use top-down view if requested (better for skip objects modal)
- if view == "top":
- thumbnail_paths = [
- f"Metadata/top_{plate_num}.png",
- # Fall back to plate 1 if specific plate not found
- "Metadata/top_1.png",
- f"Metadata/plate_{plate_num}.png",
- "Metadata/plate_1.png",
- "Metadata/thumbnail.png",
- ]
- else:
- thumbnail_paths = [
- f"Metadata/plate_{plate_num}.png",
- # Fall back to plate 1 if specific plate not found
- "Metadata/plate_1.png",
- "Metadata/thumbnail.png",
- f"Metadata/plate_{plate_num}_small.png",
- "Metadata/plate_1_small.png",
- "Thumbnails/thumbnail.png",
- "thumbnail.png",
- ]
- for thumb_path in thumbnail_paths:
- try:
- image_data = zf.read(thumb_path)
- if printer_id not in _cover_cache:
- _cover_cache[printer_id] = {}
- _cover_cache[printer_id][(subtask_name, view_key)] = image_data
- return Response(content=image_data, media_type="image/png")
- except KeyError:
- continue
- # If no specific thumbnail found, try any PNG in Metadata
- for name in zf.namelist():
- if name.startswith("Metadata/") and name.endswith(".png"):
- image_data = zf.read(name)
- if printer_id not in _cover_cache:
- _cover_cache[printer_id] = {}
- _cover_cache[printer_id][(subtask_name, view_key)] = image_data
- return Response(content=image_data, media_type="image/png")
- _cover_404_cache.setdefault(printer_id, set()).add(cache_key)
- raise HTTPException(404, "No thumbnail found in 3MF file")
- finally:
- zf.close()
- finally:
- # Only delete when this invocation owns the file. A cached path is
- # shared with the archive flow — removing it would force a refetch
- # the next time either flow needs the 3MF.
- if not using_cached and temp_path.exists():
- temp_path.unlink()
- # ============================================
- # File Manager Endpoints
- # ============================================
- @router.get("/{printer_id}/files")
- async def list_printer_files(
- printer_id: int,
- path: str = "/",
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_FILES),
- db: AsyncSession = Depends(get_db),
- ):
- """List files on the printer at the specified path."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- files = await list_files_async(printer.ip_address, printer.access_code, path, printer_model=printer.model)
- # Add full path to each file
- for f in files:
- f["path"] = f"{path.rstrip('/')}/{f['name']}" if path != "/" else f"/{f['name']}"
- return {
- "path": path,
- "files": files,
- }
- @router.get("/{printer_id}/files/download")
- async def download_printer_file(
- printer_id: int,
- path: str,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_FILES),
- db: AsyncSession = Depends(get_db),
- ):
- """Download a file from the printer."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- data = await download_file_bytes_async(printer.ip_address, printer.access_code, path, printer_model=printer.model)
- if data is None:
- raise HTTPException(404, f"File not found: {path}")
- # Determine content type based on extension
- filename = path.split("/")[-1]
- ext = filename.lower().split(".")[-1] if "." in filename else ""
- content_types = {
- "3mf": "application/vnd.ms-package.3dmanufacturing-3dmodel+xml",
- "gcode": "text/plain",
- "mp4": "video/mp4",
- "avi": "video/x-msvideo",
- "png": "image/png",
- "jpg": "image/jpeg",
- "jpeg": "image/jpeg",
- "json": "application/json",
- "txt": "text/plain",
- }
- content_type = content_types.get(ext, "application/octet-stream")
- return Response(
- content=data,
- media_type=content_type,
- headers={"Content-Disposition": build_content_disposition(filename)},
- )
- @router.get("/{printer_id}/files/gcode")
- async def get_printer_file_gcode(
- printer_id: int,
- path: str,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_FILES),
- db: AsyncSession = Depends(get_db),
- ):
- """Get gcode for a file stored on a printer (for preview)."""
- import io
- # Validate printer
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- data = await download_file_bytes_async(printer.ip_address, printer.access_code, path, printer_model=printer.model)
- if data is None:
- raise HTTPException(404, f"File not found: {path}")
- filename = path.split("/")[-1]
- lower = filename.lower()
- if lower.endswith(".gcode"):
- return Response(content=data, media_type="text/plain")
- if lower.endswith(".3mf"):
- try:
- with zipfile.ZipFile(io.BytesIO(data), "r") as zf:
- gcode_files = [n for n in zf.namelist() if n.endswith(".gcode")]
- if not gcode_files:
- raise HTTPException(status_code=404, detail="No gcode found in 3MF file")
- gcode_content = zf.read(gcode_files[0])
- return Response(content=gcode_content, media_type="text/plain")
- except zipfile.BadZipFile:
- raise HTTPException(status_code=400, detail="Invalid 3MF file")
- raise HTTPException(status_code=400, detail="Unsupported file type")
- @router.get("/{printer_id}/files/plates")
- async def get_printer_file_plates(
- printer_id: int,
- path: str = Query(..., description="Full path to the 3MF file on the printer"),
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_FILES),
- db: AsyncSession = Depends(get_db),
- ):
- """Get available plates from a multi-plate 3MF file stored on a printer."""
- import io
- import json
- import defusedxml.ElementTree as ET
- # Validate printer
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- filename = path.split("/")[-1]
- if not filename.lower().endswith(".3mf"):
- return {
- "printer_id": printer_id,
- "path": path,
- "filename": filename,
- "plates": [],
- "is_multi_plate": False,
- }
- data = await download_file_bytes_async(printer.ip_address, printer.access_code, path, printer_model=printer.model)
- if data is None:
- raise HTTPException(404, f"File not found: {path}")
- plates = []
- try:
- with zipfile.ZipFile(io.BytesIO(data), "r") as zf:
- namelist = zf.namelist()
- # Find all plate gcode files to determine available plates
- gcode_files = [n for n in namelist if n.startswith("Metadata/plate_") and n.endswith(".gcode")]
- # If no gcode is present (source-only or unsliced), fall back to plate JSON/PNG
- plate_indices: list[int] = []
- if gcode_files:
- for gf in gcode_files:
- try:
- plate_str = gf[15:-6] # Remove "Metadata/plate_" and ".gcode"
- plate_indices.append(int(plate_str))
- except ValueError:
- pass # Skip gcode files with non-numeric plate indices
- else:
- plate_json_files = [n for n in namelist if n.startswith("Metadata/plate_") and n.endswith(".json")]
- plate_png_files = [
- n
- for n in namelist
- if n.startswith("Metadata/plate_")
- and n.endswith(".png")
- and "_small" not in n
- and "no_light" not in n
- ]
- plate_name_candidates = plate_json_files + plate_png_files
- plate_re = re.compile(r"^Metadata/plate_(\d+)\.(json|png)$")
- seen_indices: set[int] = set()
- for name in plate_name_candidates:
- match = plate_re.match(name)
- if match:
- try:
- index = int(match.group(1))
- except ValueError:
- continue
- if index in seen_indices:
- continue
- seen_indices.add(index)
- plate_indices.append(index)
- if not plate_indices:
- return {
- "printer_id": printer_id,
- "path": path,
- "filename": filename,
- "plates": [],
- "is_multi_plate": False,
- }
- plate_indices.sort()
- # Parse model_settings.config for plate names
- plate_names = {}
- if "Metadata/model_settings.config" in namelist:
- try:
- model_content = zf.read("Metadata/model_settings.config").decode()
- model_root = ET.fromstring(model_content)
- for plate_elem in model_root.findall(".//plate"):
- plater_id = None
- plater_name = None
- for meta in plate_elem.findall("metadata"):
- key = meta.get("key")
- value = meta.get("value")
- if key == "plater_id" and value:
- try:
- plater_id = int(value)
- except ValueError:
- pass # Skip plate with unparseable ID
- elif key == "plater_name" and value:
- plater_name = value.strip()
- if plater_id is not None and plater_name:
- plate_names[plater_id] = plater_name
- except Exception:
- pass # Plate names are optional; continue without them
- # Parse slice_info.config for plate metadata
- plate_metadata = {}
- if "Metadata/slice_info.config" in namelist:
- content = zf.read("Metadata/slice_info.config").decode()
- root = ET.fromstring(content)
- for plate_elem in root.findall(".//plate"):
- plate_info = {"filaments": [], "prediction": None, "weight": None, "name": None, "objects": []}
- plate_index = None
- for meta in plate_elem.findall("metadata"):
- key = meta.get("key")
- value = meta.get("value")
- if key == "index" and value:
- try:
- plate_index = int(value)
- except ValueError:
- pass # Skip plate with unparseable index
- elif key == "prediction" and value:
- try:
- plate_info["prediction"] = int(value)
- except ValueError:
- pass # Skip unparseable prediction; leave as None
- elif key == "weight" and value:
- try:
- plate_info["weight"] = float(value)
- except ValueError:
- pass # Skip unparseable weight; leave as None
- # Get filaments used in this plate
- for filament_elem in plate_elem.findall("filament"):
- filament_id = filament_elem.get("id")
- filament_type = filament_elem.get("type", "")
- filament_color = filament_elem.get("color", "")
- used_g = filament_elem.get("used_g", "0")
- used_m = filament_elem.get("used_m", "0")
- try:
- used_grams = float(used_g)
- except (ValueError, TypeError):
- used_grams = 0
- if used_grams > 0 and filament_id:
- plate_info["filaments"].append(
- {
- "slot_id": int(filament_id),
- "type": filament_type,
- "color": filament_color,
- "used_grams": round(used_grams, 1),
- "used_meters": float(used_m) if used_m else 0,
- }
- )
- plate_info["filaments"].sort(key=lambda x: x["slot_id"])
- # Collect object names
- for obj_elem in plate_elem.findall("object"):
- obj_name = obj_elem.get("name")
- if obj_name and obj_name not in plate_info["objects"]:
- plate_info["objects"].append(obj_name)
- # Set plate name
- if plate_index is not None:
- custom_name = plate_names.get(plate_index)
- if custom_name:
- plate_info["name"] = custom_name
- elif plate_info["objects"]:
- plate_info["name"] = plate_info["objects"][0]
- plate_metadata[plate_index] = plate_info
- # Parse plate_*.json for object lists when slice_info is missing
- plate_json_objects: dict[int, list[str]] = {}
- for name in namelist:
- match = re.match(r"^Metadata/plate_(\d+)\.json$", name)
- if not match:
- continue
- try:
- plate_index = int(match.group(1))
- except ValueError:
- continue
- try:
- payload = json.loads(zf.read(name).decode())
- bbox_objects = payload.get("bbox_objects", [])
- names: list[str] = []
- for obj in bbox_objects:
- obj_name = obj.get("name") if isinstance(obj, dict) else None
- if obj_name and obj_name not in names:
- names.append(obj_name)
- if names:
- plate_json_objects[plate_index] = names
- except Exception:
- continue
- # Build plate list
- for idx in plate_indices:
- meta = plate_metadata.get(idx, {})
- has_thumbnail = f"Metadata/plate_{idx}.png" in namelist
- objects = meta.get("objects", [])
- if not objects:
- objects = plate_json_objects.get(idx, [])
- plate_name = meta.get("name")
- if not plate_name:
- plate_name = plate_names.get(idx)
- if not plate_name and objects:
- plate_name = objects[0]
- plates.append(
- {
- "index": idx,
- "name": plate_name,
- "objects": objects,
- "object_count": len(objects),
- "has_thumbnail": has_thumbnail,
- "thumbnail_url": f"/api/v1/printers/{printer_id}/files/plate-thumbnail/{idx}?path={path}",
- "print_time_seconds": meta.get("prediction"),
- "filament_used_grams": meta.get("weight"),
- "filaments": meta.get("filaments", []),
- }
- )
- except Exception as e:
- logger.warning("Failed to parse plates from printer file %s: %s", path, e)
- return {
- "printer_id": printer_id,
- "path": path,
- "filename": filename,
- "plates": plates,
- "is_multi_plate": len(plates) > 1,
- }
- @router.get("/{printer_id}/files/plate-thumbnail/{plate_index}")
- async def get_printer_file_plate_thumbnail(
- printer_id: int,
- plate_index: int,
- path: str = Query(..., description="Full path to the 3MF file on the printer"),
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_FILES),
- db: AsyncSession = Depends(get_db),
- ):
- """Get a plate thumbnail image from a printer-stored 3MF file."""
- import io
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- data = await download_file_bytes_async(printer.ip_address, printer.access_code, path, printer_model=printer.model)
- if data is None:
- raise HTTPException(404, f"File not found: {path}")
- try:
- with zipfile.ZipFile(io.BytesIO(data), "r") as zf:
- thumb_path = f"Metadata/plate_{plate_index}.png"
- if thumb_path in zf.namelist():
- image_data = zf.read(thumb_path)
- return Response(content=image_data, media_type="image/png")
- except Exception:
- pass # Corrupt or unreadable 3MF; fall through to 404
- raise HTTPException(status_code=404, detail=f"Thumbnail for plate {plate_index} not found")
- @router.post("/{printer_id}/files/download-zip")
- async def download_printer_files_as_zip(
- printer_id: int,
- request: dict,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_FILES),
- db: AsyncSession = Depends(get_db),
- ):
- """Download multiple files from the printer as a ZIP archive."""
- import io
- paths = request.get("paths", [])
- if not paths:
- raise HTTPException(400, "No files specified")
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- # Create ZIP in memory
- zip_buffer = io.BytesIO()
- with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
- for path in paths:
- try:
- data = await download_file_bytes_async(
- printer.ip_address, printer.access_code, path, printer_model=printer.model
- )
- if data:
- filename = path.split("/")[-1]
- zf.writestr(filename, data)
- except Exception as e:
- logging.warning("Failed to add %s to ZIP: %s", path, e)
- continue
- zip_buffer.seek(0)
- zip_data = zip_buffer.read()
- if len(zip_data) == 0:
- raise HTTPException(404, "No files could be downloaded")
- return Response(
- content=zip_data,
- media_type="application/zip",
- headers={"Content-Disposition": 'attachment; filename="printer-files.zip"'},
- )
- @router.delete("/{printer_id}/files")
- async def delete_printer_file(
- printer_id: int,
- path: str,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_FILES),
- db: AsyncSession = Depends(get_db),
- ):
- """Delete a file from the printer."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- success = await delete_file_async(printer.ip_address, printer.access_code, path, printer_model=printer.model)
- if not success:
- raise HTTPException(500, f"Failed to delete file: {path}")
- return {"status": "deleted", "path": path}
- @router.get("/{printer_id}/storage")
- async def get_printer_storage(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_READ),
- db: AsyncSession = Depends(get_db),
- ):
- """Get storage information from the printer."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- storage_info = await get_storage_info_async(printer.ip_address, printer.access_code, printer_model=printer.model)
- return storage_info or {"used_bytes": None, "free_bytes": None}
- # ============================================
- # MQTT Debug Logging Endpoints
- # ============================================
- @router.post("/{printer_id}/logging/enable")
- async def enable_mqtt_logging(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Enable MQTT message logging for a printer."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- success = printer_manager.enable_logging(printer_id, True)
- if not success:
- raise HTTPException(400, "Printer not connected")
- return {"logging_enabled": True}
- @router.post("/{printer_id}/logging/disable")
- async def disable_mqtt_logging(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Disable MQTT message logging for a printer."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- success = printer_manager.enable_logging(printer_id, False)
- if not success:
- raise HTTPException(400, "Printer not connected")
- return {"logging_enabled": False}
- @router.get("/{printer_id}/logging")
- async def get_mqtt_logs(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_READ),
- db: AsyncSession = Depends(get_db),
- ):
- """Get MQTT message logs for a printer."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- logs = printer_manager.get_logs(printer_id)
- return {
- "logging_enabled": printer_manager.is_logging_enabled(printer_id),
- "logs": [
- {
- "timestamp": log.timestamp,
- "topic": log.topic,
- "direction": log.direction,
- "payload": log.payload,
- }
- for log in logs
- ],
- }
- @router.delete("/{printer_id}/logging")
- async def clear_mqtt_logs(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Clear MQTT message logs for a printer."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- printer_manager.clear_logs(printer_id)
- return {"status": "cleared"}
- # ============================================
- # AMS Drying Endpoints
- # ============================================
- @router.post("/{printer_id}/drying/start")
- async def start_drying(
- printer_id: int,
- ams_id: int,
- temp: int = 45,
- duration: int = 4,
- filament: str = "",
- rotate_tray: bool = False,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Send AMS drying start command. temp=45-85, duration=hours."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- # Server-side guard: reject if this model/firmware doesn't support drying
- live_state = printer_manager.get_status(printer_id)
- firmware = live_state.firmware_version if live_state else None
- if not supports_drying(printer.model, firmware):
- raise HTTPException(400, "Drying not supported for this printer model or firmware version")
- if temp < 45 or temp > 85:
- raise HTTPException(400, "Temperature must be 45-85°C")
- if duration < 1 or duration > 24:
- raise HTTPException(400, "Duration must be 1-24 hours")
- # Inspect the live AMS unit: surface blocking dry_sf_reasons (otherwise the
- # firmware silently ignores the command — #971) and backfill an empty
- # filament field from the first loaded tray so the printer doesn't reject
- # the payload.
- target_ams: dict | None = None
- for unit in (live_state.raw_data.get("ams") if live_state else None) or []:
- try:
- if int(unit.get("id", -1)) == ams_id:
- target_ams = unit
- break
- except (TypeError, ValueError):
- continue
- if target_ams is not None:
- reason_messages = {
- 0: "Printer is busy",
- 1: "Insufficient power — too many AMS drying or external PSU required",
- 2: "AMS is busy",
- 3: "Filament is at the AMS outlet — retract it first",
- 4: "AMS is already starting a drying cycle",
- 5: "Not supported in 2D mode",
- 6: "AMS is already drying",
- 7: "AMS firmware is upgrading",
- 8: "Plug in the external AMS power adapter to start drying",
- }
- for code in target_ams.get("dry_sf_reason") or []:
- try:
- code_int = int(code)
- except (TypeError, ValueError):
- continue
- if code_int in reason_messages:
- raise HTTPException(409, reason_messages[code_int])
- if not filament:
- for tray in target_ams.get("tray") or []:
- tray_type = tray.get("tray_type")
- if tray_type:
- filament = str(tray_type)
- break
- if not filament:
- filament = "PLA"
- success = printer_manager.send_drying_command(
- printer_id, ams_id, temp, duration, mode=1, filament=filament, rotate_tray=rotate_tray
- )
- if not success:
- raise HTTPException(400, "Printer not connected")
- return {"status": "drying_started", "ams_id": ams_id, "temp": temp, "duration": duration}
- @router.post("/{printer_id}/drying/stop")
- async def stop_drying(
- printer_id: int,
- ams_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Send AMS drying stop command."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- success = printer_manager.send_drying_command(printer_id, ams_id, temp=0, duration=0, mode=0)
- if not success:
- raise HTTPException(400, "Printer not connected")
- return {"status": "drying_stopped", "ams_id": ams_id}
- # ============================================
- # Print Options (AI Detection) Endpoints
- # ============================================
- @router.post("/{printer_id}/print-options")
- async def set_print_option(
- printer_id: int,
- module_name: str,
- enabled: bool,
- print_halt: bool = True,
- sensitivity: str = "medium",
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Set an AI detection / print option on the printer.
- Valid module_name values:
- - spaghetti_detector: Spaghetti detection
- - first_layer_inspector: First layer inspection
- - printing_monitor: AI print quality monitoring
- - buildplate_marker_detector: Build plate marker detection
- - allow_skip_parts: Allow skipping failed parts
- """
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- client = printer_manager.get_client(printer_id)
- if not client or not client.state.connected:
- raise HTTPException(400, "Printer not connected")
- # Validate module_name
- valid_modules = [
- "spaghetti_detector",
- "first_layer_inspector",
- "printing_monitor",
- "buildplate_marker_detector",
- "allow_skip_parts",
- "pileup_detector",
- "clump_detector",
- "airprint_detector",
- "auto_recovery_step_loss",
- ]
- if module_name not in valid_modules:
- raise HTTPException(400, f"Invalid module_name. Must be one of: {valid_modules}")
- # Validate sensitivity
- valid_sensitivities = ["low", "medium", "high", "never_halt"]
- if sensitivity not in valid_sensitivities:
- raise HTTPException(400, f"Invalid sensitivity. Must be one of: {valid_sensitivities}")
- success = client.set_xcam_option(
- module_name=module_name,
- enabled=enabled,
- print_halt=print_halt,
- sensitivity=sensitivity,
- )
- if not success:
- raise HTTPException(500, "Failed to send command to printer")
- return {
- "success": True,
- "module_name": module_name,
- "enabled": enabled,
- "print_halt": print_halt,
- "sensitivity": sensitivity,
- }
- # ============================================
- # Calibration
- # ============================================
- @router.post("/{printer_id}/calibration")
- async def start_calibration(
- printer_id: int,
- bed_leveling: bool = False,
- vibration: bool = False,
- motor_noise: bool = False,
- nozzle_offset: bool = False,
- high_temp_heatbed: bool = False,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Start printer calibration with selected options.
- At least one option must be selected.
- Options:
- - bed_leveling: Run bed leveling calibration
- - vibration: Run vibration compensation calibration
- - motor_noise: Run motor noise cancellation calibration
- - nozzle_offset: Run nozzle offset calibration (dual nozzle printers)
- - high_temp_heatbed: Run high-temperature heatbed calibration
- """
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- client = printer_manager.get_client(printer_id)
- if not client or not client.state.connected:
- raise HTTPException(400, "Printer not connected")
- # Check that at least one option is selected
- if not any([bed_leveling, vibration, motor_noise, nozzle_offset, high_temp_heatbed]):
- raise HTTPException(400, "At least one calibration option must be selected")
- success = client.start_calibration(
- bed_leveling=bed_leveling,
- vibration=vibration,
- motor_noise=motor_noise,
- nozzle_offset=nozzle_offset,
- high_temp_heatbed=high_temp_heatbed,
- )
- if not success:
- raise HTTPException(500, "Failed to send calibration command to printer")
- return {
- "success": True,
- "bed_leveling": bed_leveling,
- "vibration": vibration,
- "motor_noise": motor_noise,
- "nozzle_offset": nozzle_offset,
- "high_temp_heatbed": high_temp_heatbed,
- }
- # ============================================================================
- # Slot Preset Mapping Endpoints
- # ============================================================================
- def _slot_preset_key(ams_id: int, tray_id: int) -> int:
- # Mirrors frontend getGlobalTrayId (amsHelpers.ts): AMS-HT (128-135) is keyed
- # by ams_id since each unit has a single slot and shares its global ID with
- # the unit itself. Regular AMS and external (255) use ams_id*4+tray_id.
- if 128 <= ams_id <= 135:
- return ams_id
- return ams_id * 4 + tray_id
- @router.get("/{printer_id}/slot-presets")
- async def get_slot_presets(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_READ),
- db: AsyncSession = Depends(get_db),
- ):
- """Get all saved slot-to-preset mappings for a printer."""
- result = await db.execute(select(SlotPresetMapping).where(SlotPresetMapping.printer_id == printer_id))
- mappings = result.scalars().all()
- return {
- _slot_preset_key(mapping.ams_id, mapping.tray_id): {
- "ams_id": mapping.ams_id,
- "tray_id": mapping.tray_id,
- "preset_id": mapping.preset_id,
- "preset_name": mapping.preset_name,
- }
- for mapping in mappings
- }
- @router.get("/{printer_id}/slot-presets/{ams_id}/{tray_id}")
- async def get_slot_preset(
- printer_id: int,
- ams_id: int,
- tray_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_READ),
- db: AsyncSession = Depends(get_db),
- ):
- """Get the saved preset for a specific slot."""
- result = await db.execute(
- select(SlotPresetMapping).where(
- SlotPresetMapping.printer_id == printer_id,
- SlotPresetMapping.ams_id == ams_id,
- SlotPresetMapping.tray_id == tray_id,
- )
- )
- mapping = result.scalar_one_or_none()
- if not mapping:
- return None
- return {
- "ams_id": mapping.ams_id,
- "tray_id": mapping.tray_id,
- "preset_id": mapping.preset_id,
- "preset_name": mapping.preset_name,
- }
- @router.put("/{printer_id}/slot-presets/{ams_id}/{tray_id}")
- async def save_slot_preset(
- printer_id: int,
- ams_id: int,
- tray_id: int,
- preset_id: str,
- preset_name: str,
- preset_source: str = "cloud",
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_UPDATE),
- db: AsyncSession = Depends(get_db),
- ):
- """Save a preset mapping for a specific slot."""
- # Check printer exists
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- if not result.scalar_one_or_none():
- raise HTTPException(404, "Printer not found")
- # Check for existing mapping
- result = await db.execute(
- select(SlotPresetMapping).where(
- SlotPresetMapping.printer_id == printer_id,
- SlotPresetMapping.ams_id == ams_id,
- SlotPresetMapping.tray_id == tray_id,
- )
- )
- mapping = result.scalar_one_or_none()
- if mapping:
- # Update existing
- mapping.preset_id = preset_id
- mapping.preset_name = preset_name
- mapping.preset_source = preset_source
- else:
- # Create new
- mapping = SlotPresetMapping(
- printer_id=printer_id,
- ams_id=ams_id,
- tray_id=tray_id,
- preset_id=preset_id,
- preset_name=preset_name,
- preset_source=preset_source,
- )
- db.add(mapping)
- await db.commit()
- await db.refresh(mapping)
- return {
- "ams_id": mapping.ams_id,
- "tray_id": mapping.tray_id,
- "preset_id": mapping.preset_id,
- "preset_name": mapping.preset_name,
- "preset_source": mapping.preset_source,
- }
- @router.delete("/{printer_id}/slot-presets/{ams_id}/{tray_id}")
- async def delete_slot_preset(
- printer_id: int,
- ams_id: int,
- tray_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_UPDATE),
- db: AsyncSession = Depends(get_db),
- ):
- """Delete a saved preset mapping for a slot."""
- result = await db.execute(
- select(SlotPresetMapping).where(
- SlotPresetMapping.printer_id == printer_id,
- SlotPresetMapping.ams_id == ams_id,
- SlotPresetMapping.tray_id == tray_id,
- )
- )
- mapping = result.scalar_one_or_none()
- if mapping:
- await db.delete(mapping)
- await db.commit()
- return {"success": True}
- @router.post("/{printer_id}/slots/{ams_id}/{tray_id}/configure")
- async def configure_ams_slot(
- printer_id: int,
- ams_id: int,
- tray_id: int,
- tray_info_idx: str = Query(...),
- tray_type: str = Query(...),
- tray_sub_brands: str = Query(...),
- tray_color: str = Query(...),
- nozzle_temp_min: int = Query(...),
- nozzle_temp_max: int = Query(...),
- cali_idx: int = Query(-1),
- nozzle_diameter: str = Query("0.4"),
- setting_id: str = Query(""),
- kprofile_filament_id: str = Query(""),
- kprofile_setting_id: str = Query(""),
- k_value: float = Query(0.0),
- db: AsyncSession = Depends(get_db),
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- ):
- """Configure an AMS slot with a specific filament setting and K profile.
- This sends two commands to the printer:
- 1. ams_filament_setting - sets filament type, color, temperature
- 2. extrusion_cali_sel - sets the K profile (pressure advance value)
- Args:
- printer_id: Database ID of the printer
- ams_id: AMS unit ID (0-3 for regular AMS, 128-135 for HT AMS)
- tray_id: Tray ID within the AMS (0-3)
- tray_info_idx: Filament ID short format (e.g., "GFL05") or user preset ID
- tray_type: Filament type (e.g., "PLA", "PETG")
- tray_sub_brands: Sub-brand/profile name (e.g., "PLA Basic", "PETG HF")
- tray_color: Color in RRGGBBAA hex format (e.g., "FFFF00FF")
- nozzle_temp_min: Minimum nozzle temperature
- nozzle_temp_max: Maximum nozzle temperature
- cali_idx: K profile calibration index (-1 for default 0.020)
- nozzle_diameter: Nozzle diameter string (e.g., "0.4")
- setting_id: Full setting ID with version (e.g., "GFSL05_07") - optional
- kprofile_filament_id: K profile's filament_id for proper K profile linking
- k_value: Direct K value to set (0.0 to skip direct K value setting)
- """
- logger = logging.getLogger(__name__)
- logger.info("[configure_ams_slot] printer_id=%s, ams_id=%s, tray_id=%s", printer_id, ams_id, tray_id)
- logger.info(
- f"[configure_ams_slot] tray_info_idx={tray_info_idx!r}, tray_type={tray_type!r}, tray_sub_brands={tray_sub_brands!r}"
- )
- logger.info(
- f"[configure_ams_slot] setting_id={setting_id!r}, kprofile_filament_id={kprofile_filament_id!r}, kprofile_setting_id={kprofile_setting_id!r}"
- )
- # Get MQTT client for this printer
- client = printer_manager.get_client(printer_id)
- if not client:
- raise HTTPException(status_code=400, detail="Printer not connected")
- # Resolve tray_info_idx for the MQTT command.
- # Priority:
- # 1. Use the provided tray_info_idx if set (including cloud-synced
- # custom presets like PFUS* / P*).
- # 2. Reuse the slot's existing tray_info_idx if it's a specific
- # (non-generic) preset for the same material.
- # 3. Fall back to a generic Bambu filament ID.
- _GENERIC_FILAMENT_IDS = {
- "PLA": "GFL99",
- "PETG": "GFG99",
- "ABS": "GFB99",
- "ASA": "GFB98",
- "PC": "GFC99",
- "PA": "GFN99",
- "NYLON": "GFN99",
- "TPU": "GFU99",
- "PVA": "GFS99",
- "HIPS": "GFS98",
- "PLA-CF": "GFL98",
- "PETG-CF": "GFG98",
- "PA-CF": "GFN98",
- "PETG HF": "GFG96",
- }
- _GENERIC_ID_VALUES = set(_GENERIC_FILAMENT_IDS.values())
- effective_tray_info_idx = tray_info_idx
- if not tray_info_idx:
- # No preset provided — try slot reuse or generic fallback
- current_tray_info_idx = ""
- current_tray_type = ""
- state = printer_manager.get_status(printer_id)
- if state and state.raw_data:
- from backend.app.api.routes.inventory import _find_tray_in_ams_data
- if ams_id == 255:
- vt_tray = state.raw_data.get("vt_tray") or []
- ext_id = tray_id + 254
- for vt in vt_tray:
- if isinstance(vt, dict) and int(vt.get("id", 254)) == ext_id:
- current_tray_info_idx = vt.get("tray_info_idx", "")
- current_tray_type = vt.get("tray_type", "")
- break
- else:
- ams_data = state.raw_data.get("ams", {})
- ams_list = (
- ams_data.get("ams", [])
- if isinstance(ams_data, dict)
- else ams_data
- if isinstance(ams_data, list)
- else []
- )
- cur_tray = _find_tray_in_ams_data(ams_list, ams_id, tray_id)
- if cur_tray:
- current_tray_info_idx = cur_tray.get("tray_info_idx", "")
- current_tray_type = cur_tray.get("tray_type", "")
- if (
- current_tray_info_idx
- and current_tray_info_idx not in _GENERIC_ID_VALUES
- and current_tray_type
- and current_tray_type.upper() == tray_type.upper()
- ):
- logger.info(
- "[configure_ams_slot] Reusing slot's existing tray_info_idx=%r (same material %r)",
- current_tray_info_idx,
- tray_type,
- )
- effective_tray_info_idx = current_tray_info_idx
- elif tray_type:
- material = tray_type.upper().strip()
- generic = (
- _GENERIC_FILAMENT_IDS.get(material)
- or _GENERIC_FILAMENT_IDS.get(material.split("-")[0].split(" ")[0])
- or ""
- )
- if generic:
- logger.info("[configure_ams_slot] Falling back to generic %r for material %r", generic, tray_type)
- effective_tray_info_idx = generic
- # Send filament setting + K-profile commands
- filament_id_for_kprofile = kprofile_filament_id if kprofile_filament_id else effective_tray_info_idx
- # Realign the slot's filament context to the K-profile's calibration
- # context. The printer's calibration table is keyed by (filament_id,
- # cali_idx) — so for the cali_idx selected via extrusion_cali_sel to
- # actually stick to the slot, ams_filament_setting must declare the
- # slot under the SAME filament_id.
- #
- # Without this, configure_ams_slot would send:
- # ams_filament_setting → tray_info_idx=GFL99 (generic from material)
- # extrusion_cali_sel → filament_id=P4d64437 (kp's preset)
- # ...and the cali_idx would silently be dropped to default because the
- # slot's filament context (GFL99) doesn't match the kp's (P4d64437).
- #
- # This realignment fires only when the kp is targeted at a different
- # preset than the user's filament selection AND the kp's preset is a
- # valid tray_info_idx (GF* official, P* local — not PFUS* cloud-user
- # which the slicer rejects in tray_info_idx).
- effective_setting_id = setting_id
- if (
- kprofile_filament_id
- and kprofile_filament_id != effective_tray_info_idx
- and not kprofile_filament_id.startswith("PFUS")
- ):
- logger.info(
- "[configure_ams_slot] realigning slot filament context to kp: tray_info_idx %r → %r, setting_id %r → %r",
- effective_tray_info_idx,
- kprofile_filament_id,
- setting_id,
- kprofile_setting_id or setting_id,
- )
- effective_tray_info_idx = kprofile_filament_id
- if kprofile_setting_id:
- effective_setting_id = kprofile_setting_id
- # Always send ams_set_filament_setting — the user explicitly clicked
- # "Configure Slot", so honor that. Previous versions skipped this for
- # RFID-tagged slots to preserve the slicer eye icon, but printers cache
- # stale tag_uid/tray_uuid after a BL spool is removed, causing the check
- # to false-positive on non-RFID slots and silently drop the command.
- success = client.ams_set_filament_setting(
- ams_id=ams_id,
- tray_id=tray_id,
- tray_info_idx=effective_tray_info_idx,
- tray_type=tray_type,
- tray_sub_brands=tray_sub_brands,
- tray_color=tray_color,
- nozzle_temp_min=nozzle_temp_min,
- nozzle_temp_max=nozzle_temp_max,
- setting_id=effective_setting_id,
- )
- if not success:
- raise HTTPException(status_code=500, detail="Failed to send filament configuration command")
- # Method 1: Select existing calibration profile by cali_idx
- # Do NOT include setting_id — BambuStudio never sends it in extrusion_cali_sel,
- # and including it causes the firmware to mislink the profile on X1C/P1S.
- client.extrusion_cali_sel(
- ams_id=ams_id,
- tray_id=tray_id,
- cali_idx=cali_idx,
- filament_id=filament_id_for_kprofile,
- nozzle_diameter=nozzle_diameter,
- )
- # Method 2: Only send extrusion_cali_set when NO existing profile was selected
- # (cali_idx == -1). When cali_idx >= 0, extrusion_cali_sel already selected the
- # correct profile. Sending extrusion_cali_set with the same cali_idx would MODIFY
- # the existing profile's metadata (extruder_id, nozzle_id, name, setting_id),
- # corrupting it — e.g., overwriting a High Flow extruder 1 profile with
- # hardcoded extruder_id=0 and nozzle_id=HS00.
- if k_value > 0 and cali_idx < 0:
- # Calculate global tray ID for extrusion_cali_set
- if ams_id <= 3:
- global_tray_id = ams_id * 4 + tray_id
- elif ams_id >= 128 and ams_id <= 135:
- global_tray_id = (ams_id - 128) * 4 + tray_id
- else:
- global_tray_id = tray_id
- client.extrusion_cali_set(
- tray_id=global_tray_id,
- k_value=k_value,
- nozzle_diameter=nozzle_diameter,
- nozzle_temp=nozzle_temp_max,
- filament_id=filament_id_for_kprofile,
- setting_id=kprofile_setting_id or "",
- name=tray_sub_brands or "",
- cali_idx=cali_idx,
- )
- # Persist the user's K-profile choice so it survives RFID re-reads and
- # session restarts. Pre-Phase-13 this was ephemeral — the MQTT command
- # took effect on the printer but bambuddy never recorded it, so the next
- # `_apply_pa_after_refresh` cycle had no stored profile to re-assert.
- if cali_idx >= 0:
- try:
- from sqlalchemy.orm import selectinload
- from backend.app.models.spool_assignment import SpoolAssignment
- from backend.app.models.spool_k_profile import SpoolKProfile
- from backend.app.models.spoolman_k_profile import SpoolmanKProfile
- from backend.app.models.spoolman_slot_assignment import SpoolmanSlotAssignment
- # Resolve slot's extruder index for the K-profile match key. Same
- # logic as _apply_pa_after_refresh: external slots invert tray→extruder,
- # AMS slots come from ams_extruder_map. Falls back to 0 (single-nozzle).
- slot_state = printer_manager.get_status(printer_id)
- slot_extruder: int | None = None
- if slot_state and slot_state.ams_extruder_map:
- if ams_id == 255:
- slot_extruder = 1 - tray_id
- else:
- slot_extruder = slot_state.ams_extruder_map.get(str(ams_id))
- kp_extruder = slot_extruder if slot_extruder is not None else 0
- # Spoolman SlotAssignment first — has UniqueConstraint, idempotent.
- sm_result = await db.execute(
- select(SpoolmanSlotAssignment).where(
- SpoolmanSlotAssignment.printer_id == printer_id,
- SpoolmanSlotAssignment.ams_id == ams_id,
- SpoolmanSlotAssignment.tray_id == tray_id,
- )
- )
- sm_assignment = sm_result.scalar_one_or_none()
- if sm_assignment:
- existing = await db.execute(
- select(SpoolmanKProfile).where(
- SpoolmanKProfile.spoolman_spool_id == sm_assignment.spoolman_spool_id,
- SpoolmanKProfile.printer_id == printer_id,
- SpoolmanKProfile.extruder == kp_extruder,
- SpoolmanKProfile.nozzle_diameter == nozzle_diameter,
- )
- )
- kp = existing.scalar_one_or_none()
- if kp:
- kp.cali_idx = cali_idx
- kp.k_value = k_value or 0.0
- kp.setting_id = kprofile_setting_id or None
- kp.name = tray_sub_brands or None
- else:
- db.add(
- SpoolmanKProfile(
- spoolman_spool_id=sm_assignment.spoolman_spool_id,
- printer_id=printer_id,
- extruder=kp_extruder,
- nozzle_diameter=nozzle_diameter,
- k_value=k_value or 0.0,
- name=tray_sub_brands or None,
- cali_idx=cali_idx,
- setting_id=kprofile_setting_id or None,
- )
- )
- await db.commit()
- logger.info(
- "[configure_ams_slot] Persisted Spoolman K-profile spool=%d printer=%d ams=%d tray=%d cali_idx=%d",
- sm_assignment.spoolman_spool_id,
- printer_id,
- ams_id,
- tray_id,
- cali_idx,
- )
- else:
- # Local SpoolAssignment + SpoolKProfile (no UNIQUE — use .first())
- local_result = await db.execute(
- select(SpoolAssignment)
- .options(selectinload(SpoolAssignment.spool))
- .where(
- SpoolAssignment.printer_id == printer_id,
- SpoolAssignment.ams_id == ams_id,
- SpoolAssignment.tray_id == tray_id,
- )
- )
- local_assignment = local_result.scalar_one_or_none()
- if local_assignment and local_assignment.spool:
- existing = await db.execute(
- select(SpoolKProfile).where(
- SpoolKProfile.spool_id == local_assignment.spool.id,
- SpoolKProfile.printer_id == printer_id,
- SpoolKProfile.extruder == kp_extruder,
- SpoolKProfile.nozzle_diameter == nozzle_diameter,
- )
- )
- # SpoolKProfile has no unique constraint on this tuple, so
- # multiple rows could theoretically exist (shouldn't, but
- # don't crash if they do). Update the first match, leave
- # any duplicates alone.
- kp = existing.scalars().first()
- if kp:
- kp.cali_idx = cali_idx
- kp.k_value = k_value or 0.0
- kp.setting_id = kprofile_setting_id or None
- kp.name = tray_sub_brands or None
- else:
- db.add(
- SpoolKProfile(
- spool_id=local_assignment.spool.id,
- printer_id=printer_id,
- extruder=kp_extruder,
- nozzle_diameter=nozzle_diameter,
- k_value=k_value or 0.0,
- name=tray_sub_brands or None,
- cali_idx=cali_idx,
- setting_id=kprofile_setting_id or None,
- )
- )
- await db.commit()
- logger.info(
- "[configure_ams_slot] Persisted local K-profile spool=%d printer=%d ams=%d tray=%d cali_idx=%d",
- local_assignment.spool.id,
- printer_id,
- ams_id,
- tray_id,
- cali_idx,
- )
- except Exception:
- # MQTT command was already sent successfully — DB persist is best-effort.
- logger.exception(
- "[configure_ams_slot] Failed to persist K-profile (printer=%d ams=%d tray=%d cali_idx=%d)",
- printer_id,
- ams_id,
- tray_id,
- cali_idx,
- )
- try:
- await db.rollback()
- except Exception:
- pass
- # Request fresh status push from printer so frontend gets updated data via WebSocket
- logger.info("[configure_ams_slot] Requesting status update from printer")
- update_result = client.request_status_update()
- logger.info("[configure_ams_slot] Status update request result: %s", update_result)
- return {
- "success": True,
- "message": f"Configured AMS {ams_id} tray {tray_id} with {tray_sub_brands}",
- }
- @router.post("/{printer_id}/ams/{ams_id}/tray/{tray_id}/reset")
- async def reset_ams_slot(
- printer_id: int,
- ams_id: int,
- tray_id: int,
- db: AsyncSession = Depends(get_db),
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- ):
- """Reset an AMS slot to empty/unconfigured state.
- This clears the filament configuration from the slot.
- """
- # Get MQTT client for this printer
- client = printer_manager.get_client(printer_id)
- if not client:
- raise HTTPException(status_code=400, detail="Printer not connected")
- # Reset the slot
- success = client.reset_ams_slot(ams_id=ams_id, tray_id=tray_id)
- if not success:
- raise HTTPException(status_code=500, detail="Failed to send reset command")
- # Also delete any saved slot preset mapping
- result = await db.execute(
- select(SlotPresetMapping).where(
- SlotPresetMapping.printer_id == printer_id,
- SlotPresetMapping.ams_id == ams_id,
- SlotPresetMapping.tray_id == tray_id,
- )
- )
- mapping = result.scalar_one_or_none()
- if mapping:
- await db.delete(mapping)
- await db.commit()
- # Request fresh status push from printer so frontend gets updated data via WebSocket
- client.request_status_update()
- return {
- "success": True,
- "message": f"Reset AMS {ams_id} tray {tray_id}",
- }
- @router.get("/{printer_id}/ams-labels")
- async def get_ams_labels(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_READ),
- db: AsyncSession = Depends(get_db),
- ):
- """Get all user-defined AMS labels for a printer, keyed by AMS unit ID.
- Labels are stored by AMS serial number. This endpoint resolves the current
- serial-to-ams_id mapping from the live printer state so the response is still
- keyed by ams_id for UI compatibility.
- """
- # Build serial -> ams_id map from live printer state
- serial_to_ams_id: dict[str, int] = {}
- state = printer_manager.get_status(printer_id)
- if state and state.raw_data:
- for ams_unit in state.raw_data.get("ams", []):
- sn = str(ams_unit.get("sn") or ams_unit.get("serial_number") or "")
- if sn:
- serial_to_ams_id[sn] = int(ams_unit.get("id", 0))
- # Collect all known serials for this printer (live + synthetic fallback keys)
- serials_to_query = set(serial_to_ams_id.keys())
- # Fetch labels for all known serials
- labels: dict[int, str] = {}
- if serials_to_query:
- result = await db.execute(select(AmsLabel).where(AmsLabel.ams_serial_number.in_(serials_to_query)))
- for lbl in result.scalars().all():
- aid = serial_to_ams_id.get(lbl.ams_serial_number)
- if aid is not None:
- labels[aid] = lbl.label
- # Also fetch labels stored under synthetic keys for this printer (backward compat)
- # Collect all synthetic keys first, then query with a single IN clause.
- if state and state.raw_data:
- synthetic_key_to_aid: dict[str, int] = {
- f"p{printer_id}a{int(ams_unit.get('id', 0))}": int(ams_unit.get("id", 0))
- for ams_unit in state.raw_data.get("ams", [])
- if int(ams_unit.get("id", 0)) not in labels
- }
- if synthetic_key_to_aid:
- result = await db.execute(
- select(AmsLabel).where(AmsLabel.ams_serial_number.in_(synthetic_key_to_aid.keys()))
- )
- for lbl in result.scalars().all():
- aid = synthetic_key_to_aid.get(lbl.ams_serial_number)
- if aid is not None:
- labels[aid] = lbl.label
- return labels
- @router.put("/{printer_id}/ams-labels/{ams_id}")
- async def save_ams_label(
- printer_id: int,
- ams_id: int,
- body: AmsLabelBody,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_UPDATE),
- db: AsyncSession = Depends(get_db),
- ):
- """Create or update the friendly name for a specific AMS unit.
- When ``ams_serial`` is provided the label is stored under that serial number so
- it survives the AMS being moved to a different printer. When it is absent (e.g.
- older firmware that does not report a serial) a synthetic key based on the
- printer_id and ams_id is used as a fallback.
- """
- # Verify printer exists
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- if not result.scalar_one_or_none():
- raise HTTPException(404, "Printer not found")
- # Determine the serial key to store under
- stripped = body.ams_serial.strip() if body.ams_serial else ""
- serial_key = stripped if stripped else f"p{printer_id}a{ams_id}"
- result = await db.execute(select(AmsLabel).where(AmsLabel.ams_serial_number == serial_key))
- existing = result.scalar_one_or_none()
- if existing:
- existing.label = body.label
- existing.ams_id = ams_id
- else:
- db.add(AmsLabel(ams_serial_number=serial_key, ams_id=ams_id, label=body.label))
- await db.commit()
- return {"ams_id": ams_id, "label": body.label}
- @router.delete("/{printer_id}/ams-labels/{ams_id}")
- async def delete_ams_label(
- printer_id: int,
- ams_id: int,
- ams_serial: str = Query(default="", max_length=50),
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_UPDATE),
- db: AsyncSession = Depends(get_db),
- ):
- """Delete the friendly name for a specific AMS unit, reverting to the auto label."""
- stripped = ams_serial.strip() if ams_serial else ""
- serial_key = stripped if stripped else f"p{printer_id}a{ams_id}"
- result = await db.execute(select(AmsLabel).where(AmsLabel.ams_serial_number == serial_key))
- existing = result.scalar_one_or_none()
- if existing:
- await db.delete(existing)
- await db.commit()
- return {"success": True}
- @router.post("/{printer_id}/debug/simulate-print-complete")
- async def debug_simulate_print_complete(
- printer_id: int,
- db: AsyncSession = Depends(get_db),
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- ):
- """DEBUG: Simulate print completion to test freeze behavior.
- This triggers the same code path as a real print completion,
- without needing to wait for an actual print to finish.
- """
- from backend.app.main import _active_prints, on_print_complete
- from backend.app.models.archive import PrintArchive
- # Get the most recent archive for this printer
- result = await db.execute(
- select(PrintArchive)
- .where(PrintArchive.printer_id == printer_id)
- .order_by(PrintArchive.created_at.desc())
- .limit(1)
- )
- archive = result.scalar_one_or_none()
- if not archive:
- raise HTTPException(status_code=404, detail="No archives found for this printer")
- # Register this archive as "active" so on_print_complete can find it
- filename = archive.file_path.split("/")[-1] if archive.file_path else "test.3mf"
- subtask_name = archive.print_name or "Test Print"
- _active_prints[(printer_id, filename)] = archive.id
- _active_prints[(printer_id, subtask_name)] = archive.id
- # Simulate print completion data
- data = {
- "status": "completed",
- "filename": filename,
- "subtask_name": subtask_name,
- "timelapse_was_active": False,
- }
- logger.info("Simulating print complete for printer %s, archive %s", printer_id, archive.id)
- # Call the actual on_print_complete handler
- await on_print_complete(printer_id, data)
- return {"success": True, "archive_id": archive.id, "message": "Print completion simulated"}
- # =============================================================================
- # Print Control Endpoints
- # =============================================================================
- @router.post("/{printer_id}/print/stop")
- async def stop_print(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Stop/cancel the current print job."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- client = printer_manager.get_client(printer_id)
- if not client:
- raise HTTPException(400, "Printer not connected")
- success = client.stop_print()
- if not success:
- raise HTTPException(500, "Failed to stop print")
- # Mark this printer as user-stopped so on_print_complete reclassifies
- # the resulting "failed"/"aborted" MQTT status as "cancelled" — otherwise
- # the HMS heuristic in _dispatch_archive_update mislabels user-cancels
- # (e.g. the H2D's cancel-sequence module-0x0C HMS) as "Layer shift".
- try:
- from backend.app.main import mark_printer_stopped_by_user
- mark_printer_stopped_by_user(printer_id)
- except Exception as _mark_err:
- logger.warning("Failed to mark printer %s as user-stopped: %s", printer_id, _mark_err)
- return {"success": True, "message": "Print stop command sent"}
- @router.post("/{printer_id}/clear-plate")
- async def clear_plate(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CLEAR_PLATE),
- db: AsyncSession = Depends(get_db),
- ):
- """Acknowledge that the build plate has been cleared after a finished/failed print.
- Sets a plate-cleared flag so the scheduler can start the next queued print.
- No MQTT command is sent to the printer — the scheduler's start_print command
- will override the FINISH/FAILED state when it sends the next job.
- """
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- if not printer_manager.is_connected(printer_id):
- raise HTTPException(400, "Printer not connected")
- # Accept the acknowledgment whenever the printer is awaiting it — not only when the
- # reported state is FINISH/FAILED. After a power cycle the printer boots into IDLE
- # but the awaiting flag persists, and the user still needs a way to ack it (#961).
- state = printer_manager.get_status(printer_id)
- awaiting = printer_manager.is_awaiting_plate_clear(printer_id)
- if not awaiting and (not state or state.state not in ("FINISH", "FAILED")):
- raise HTTPException(
- 400,
- f"Printer is not awaiting plate-clear acknowledgment (state={state.state if state else 'unknown'})",
- )
- printer_manager.set_awaiting_plate_clear(printer_id, False)
- return {"success": True, "message": "Plate cleared, next print will start shortly"}
- @router.post("/{printer_id}/print/pause")
- async def pause_print(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Pause the current print job."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- client = printer_manager.get_client(printer_id)
- if not client:
- raise HTTPException(400, "Printer not connected")
- success = client.pause_print()
- if not success:
- raise HTTPException(500, "Failed to pause print")
- return {"success": True, "message": "Print pause command sent"}
- @router.post("/{printer_id}/print/resume")
- async def resume_print(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Resume a paused print job."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- client = printer_manager.get_client(printer_id)
- if not client:
- raise HTTPException(400, "Printer not connected")
- success = client.resume_print()
- if not success:
- raise HTTPException(500, "Failed to resume print")
- return {"success": True, "message": "Print resume command sent"}
- @router.post("/{printer_id}/print-speed")
- async def set_print_speed(
- printer_id: int,
- mode: int = Query(..., description="Speed mode (1=silent, 2=standard, 3=sport, 4=ludicrous)"),
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Set the print speed mode."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- client = printer_manager.get_client(printer_id)
- if not client:
- raise HTTPException(400, "Printer not connected")
- success = client.set_print_speed(mode)
- if not success:
- raise HTTPException(500, "Failed to set print speed")
- speed_names = {1: "Silent", 2: "Standard", 3: "Sport", 4: "Ludicrous"}
- return {"success": True, "message": f"Print speed set to {speed_names.get(mode, 'Unknown')}"}
- @router.post("/{printer_id}/airduct-mode")
- async def set_airduct_mode(
- printer_id: int,
- mode: str = Query(..., description="Airduct mode: 'cooling' or 'heating'"),
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Set the airduct mode (cooling/heating) on supported printers (P2S/H2*)."""
- if mode not in ("cooling", "heating"):
- raise HTTPException(400, "Mode must be 'cooling' or 'heating'")
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- client = printer_manager.get_client(printer_id)
- if not client:
- raise HTTPException(400, "Printer not connected")
- success = client.set_airduct_mode(mode)
- if not success:
- raise HTTPException(500, "Failed to set airduct mode")
- return {"success": True, "message": f"Airduct mode set to {mode}"}
- @router.post("/{printer_id}/chamber-light")
- async def set_chamber_light(
- printer_id: int,
- on: bool = Query(..., description="True to turn on, False to turn off"),
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Turn the chamber light on or off."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- client = printer_manager.get_client(printer_id)
- if not client:
- raise HTTPException(400, "Printer not connected")
- success = client.set_chamber_light(on)
- if not success:
- raise HTTPException(500, "Failed to control chamber light")
- return {"success": True, "message": f"Chamber light {'on' if on else 'off'}"}
- @router.post("/{printer_id}/bed-jog")
- async def bed_jog(
- printer_id: int,
- distance: float = Query(
- ...,
- description=(
- "Signed nozzle-bed gap adjustment in mm. Negative = decrease gap "
- '("up" arrow in the UI: bed up on bed-on-Z models, toolhead down '
- "on A1 bed-slingers). Positive = increase gap. The backend "
- "translates this into the right G-code Z sign per printer model."
- ),
- ),
- force: bool = Query(False, description="If true, bypass soft endstops via M211 (for use when Z is not homed)"),
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Adjust the nozzle-bed gap by a relative distance.
- Emits a short G-code sequence via MQTT. When ``force`` is true the soft
- endstops are disabled for the duration of the move, matching the
- "ignore and move anyway" option Bambu Studio offers when the printer
- is not homed.
- Direction handling: on bed-on-Z printers (X1 / P1 / H2 family) the bed
- is the Z-axis, and Bambu's home convention puts Z=0 at the top with
- Z+ moving the bed down — so a frontend "Up" (decrease gap) maps
- naturally to ``G1 Z-``. On bed-slingers (A1 / A1 Mini) the Z-axis is
- the *toolhead*, and ``G1 Z-`` instead drives the nozzle DOWN into the
- bed (#1334 reported exactly that crash). For those models we invert
- the sign before emitting the G-code, so the UI semantics stay the
- same regardless of which part physically moves.
- """
- if distance == 0 or abs(distance) > 200:
- raise HTTPException(400, "Distance must be non-zero and ≤ 200 mm")
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- client = printer_manager.get_client(printer_id)
- if not client:
- raise HTTPException(400, "Printer not connected")
- from backend.app.services.printer_manager import is_bed_slinger
- gcode_distance = -distance if is_bed_slinger(printer.model) else distance
- lines = []
- if force:
- lines.append("M211 S0")
- lines += ["G91", f"G1 Z{gcode_distance:.2f} F600", "G90"]
- if force:
- lines.append("M211 S1")
- if not client.send_gcode("\n".join(lines)):
- raise HTTPException(500, "Failed to send bed-jog command")
- return {"success": True, "message": f"Bed jog {distance:+.1f} mm sent"}
- @router.post("/{printer_id}/home-axes")
- async def home_axes(
- printer_id: int,
- axes: str = Query(
- "all",
- description="Legacy; accepted values are 'z' | 'xy' | 'all'. Always runs the printer's full auto-home sequence — see below.",
- ),
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Run the printer's full auto-home sequence via bare `G28`.
- Bambu printers (H2C / H2D / H2S / X1 family) home the Z axis by moving
- the BED UP toward an endstop at the top of travel. If the toolhead is
- not already parked out of the way, a bare `G28 Z` will crash the bed
- into the toolhead — #1052 reported exactly that on H2C: the bed rose
- without stopping at a safe height because `G28 Z` skipped the
- toolhead-park step that a full `G28` runs first.
- The endpoint therefore ignores the `axes` argument and always sends a
- bare `G28`, which the firmware expands into a safe multi-step sequence
- (park toolhead → home XY → home Z). The argument is kept only for
- backward-compat with existing clients; sending an invalid value still
- returns 400 so typos surface instead of silently proceeding.
- """
- axes = axes.lower()
- if axes not in ("z", "xy", "all"):
- raise HTTPException(400, "axes must be 'z', 'xy', or 'all'")
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- client = printer_manager.get_client(printer_id)
- if not client:
- raise HTTPException(400, "Printer not connected")
- if not client.send_gcode("G28"):
- raise HTTPException(500, "Failed to send home command")
- return {"success": True, "message": "Full auto-home sequence sent"}
- @router.post("/{printer_id}/hms/clear")
- async def clear_hms_errors(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Clear HMS/print errors on the printer."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- client = printer_manager.get_client(printer_id)
- if not client:
- raise HTTPException(400, "Printer not connected")
- success = client.clear_hms_errors()
- if not success:
- raise HTTPException(500, "Failed to clear HMS errors")
- return {"success": True, "message": "HMS errors cleared"}
- @router.get("/{printer_id}/print/objects")
- async def get_printable_objects(
- printer_id: int,
- reload: bool = False,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_READ),
- db: AsyncSession = Depends(get_db),
- ):
- """Get the list of printable objects for the current print.
- Returns a list of objects with id, name, position (if available), and skip status.
- Objects that have already been skipped are marked in the skipped_objects list.
- Args:
- reload: If True, reload objects from the archive file (useful after restart)
- """
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- client = printer_manager.get_client(printer_id)
- if not client:
- raise HTTPException(400, "Printer not connected")
- # Reload objects from 3MF if requested or no objects loaded
- if reload or not client.state.printable_objects:
- subtask_name = client.state.subtask_name
- if subtask_name:
- from backend.app.services.archive import extract_printable_objects_from_3mf
- from backend.app.services.bambu_ftp import download_file_try_paths_async
- # Build possible 3MF filenames (try both .gcode.3mf and .3mf)
- possible_filenames = []
- if subtask_name.endswith(".3mf"):
- possible_filenames.append(subtask_name)
- else:
- possible_filenames.append(f"{subtask_name}.gcode.3mf")
- possible_filenames.append(f"{subtask_name}.3mf")
- # Also try with spaces converted to underscores (Bambu Studio may normalize filenames)
- if " " in subtask_name:
- normalized = subtask_name.replace(" ", "_")
- if normalized.endswith(".3mf"):
- possible_filenames.append(normalized)
- else:
- possible_filenames.append(f"{normalized}.gcode.3mf")
- possible_filenames.append(f"{normalized}.3mf")
- # Download 3MF from printer
- temp_path = settings.archive_dir / "temp" / f"objects_{printer_id}_{possible_filenames[0]}"
- temp_path.parent.mkdir(parents=True, exist_ok=True)
- # Build list of all remote paths to try
- remote_paths = []
- for filename in possible_filenames:
- remote_paths.extend([f"/{filename}", f"/cache/{filename}", f"/model/{filename}"])
- try:
- downloaded = await download_file_try_paths_async(
- printer.ip_address,
- printer.access_code,
- remote_paths,
- temp_path,
- printer_model=printer.model,
- )
- if downloaded and temp_path.exists():
- with open(temp_path, "rb") as f:
- data = f.read()
- objects, bbox_all = extract_printable_objects_from_3mf(data, include_positions=True)
- if objects:
- client.state.printable_objects = objects
- client.state.printable_objects_bbox_all = bbox_all
- logger.info("Reloaded %s objects for printer %s", len(objects), printer_id)
- except Exception as e:
- logger.debug("Failed to reload objects from printer: %s", e)
- finally:
- if temp_path.exists():
- temp_path.unlink()
- # Return objects with their skip status and position data
- objects = []
- for obj_id, obj_data in client.state.printable_objects.items():
- # Handle both old format (string name) and new format (dict with name, x, y)
- if isinstance(obj_data, dict):
- obj_entry = {
- "id": obj_id,
- "name": obj_data.get("name", f"Object {obj_id}"),
- "x": obj_data.get("x"),
- "y": obj_data.get("y"),
- "skipped": obj_id in client.state.skipped_objects,
- }
- else:
- # Legacy format: obj_data is just the name string
- obj_entry = {
- "id": obj_id,
- "name": obj_data,
- "x": None,
- "y": None,
- "skipped": obj_id in client.state.skipped_objects,
- }
- objects.append(obj_entry)
- return {
- "objects": objects,
- "total": len(objects),
- "skipped_count": len(client.state.skipped_objects),
- "is_printing": client.state.state in ("RUNNING", "PAUSE"),
- "bbox_all": getattr(client.state, "printable_objects_bbox_all", None),
- }
- @router.post("/{printer_id}/print/skip-objects")
- async def skip_objects(
- printer_id: int,
- object_ids: list[int],
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Skip specific objects during the current print.
- Args:
- object_ids: List of object identify_id values to skip
- """
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- client = printer_manager.get_client(printer_id)
- if not client:
- raise HTTPException(400, "Printer not connected")
- if not object_ids:
- raise HTTPException(400, "No object IDs provided")
- # Validate object IDs exist in printable_objects
- invalid_ids = [oid for oid in object_ids if oid not in client.state.printable_objects]
- if invalid_ids:
- raise HTTPException(400, f"Invalid object IDs: {invalid_ids}")
- success = client.skip_objects(object_ids)
- if not success:
- raise HTTPException(500, "Failed to skip objects")
- # Get names of skipped objects for response (handle both old and new format)
- skipped_names = []
- for oid in object_ids:
- obj_data = client.state.printable_objects.get(oid, str(oid))
- if isinstance(obj_data, dict):
- skipped_names.append(obj_data.get("name", str(oid)))
- else:
- skipped_names.append(obj_data)
- return {
- "success": True,
- "message": f"Skipped {len(object_ids)} object(s): {', '.join(skipped_names)}",
- "skipped_objects": object_ids,
- }
- # =============================================================================
- # AMS Control Endpoints
- # =============================================================================
- @router.post("/{printer_id}/ams/{ams_id}/slot/{slot_id}/refresh")
- async def refresh_ams_slot(
- printer_id: int,
- ams_id: int,
- slot_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_AMS_RFID),
- db: AsyncSession = Depends(get_db),
- ):
- """Re-read RFID for an AMS slot (triggers filament info refresh)."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- client = printer_manager.get_client(printer_id)
- if not client:
- raise HTTPException(400, "Printer not connected")
- success, message = client.ams_refresh_tray(ams_id, slot_id)
- if not success:
- raise HTTPException(400, message)
- # Apply PA profile after delay (RFID re-read takes a few seconds)
- asyncio.create_task(_apply_pa_after_refresh(printer_id, ams_id, slot_id))
- return {"success": True, "message": message}
- async def _apply_pa_after_refresh(printer_id: int, ams_id: int, slot_id: int):
- """Apply PA profile after RFID re-read completes.
- Waits for the printer to finish processing the RFID data, then selects
- the K-profile via extrusion_cali_sel. Does NOT re-send ams_set_filament_setting
- because that would overwrite the RFID-provided filament data.
- """
- await asyncio.sleep(5)
- try:
- from backend.app.api.routes.inventory import _find_tray_in_ams_data
- from backend.app.core.database import async_session
- from backend.app.models.spool import Spool
- from backend.app.models.spool_assignment import SpoolAssignment as SA
- from backend.app.models.spoolman_k_profile import SpoolmanKProfile
- from backend.app.models.spoolman_slot_assignment import SpoolmanSlotAssignment
- from backend.app.services.spool_tag_matcher import (
- ZERO_TAG_UID,
- ZERO_TRAY_UUID,
- is_bambu_tag,
- )
- from backend.app.utils.tag_normalization import (
- normalize_tag_uid,
- normalize_tray_uuid,
- )
- client = printer_manager.get_client(printer_id)
- if not client:
- return
- state = printer_manager.get_status(printer_id)
- if not state or not state.raw_data:
- return
- # Find current tray data (should have RFID data by now)
- ams_data = state.raw_data.get("ams", {})
- ams_list = (
- ams_data.get("ams", []) if isinstance(ams_data, dict) else ams_data if isinstance(ams_data, list) else []
- )
- tray = _find_tray_in_ams_data(ams_list, ams_id, slot_id)
- if not tray or not tray.get("tray_type"):
- logger.debug("PA re-apply: no tray data for AMS%d-T%d", ams_id, slot_id)
- return
- tag_uid = tray.get("tag_uid", "")
- tray_uuid = tray.get("tray_uuid", "")
- tray_info_idx = tray.get("tray_info_idx", "")
- if not is_bambu_tag(tag_uid, tray_uuid, tray_info_idx):
- return
- # Compute nozzle/extruder once — used by both local and Spoolman lookup.
- nozzle_diameter = "0.4"
- if state.nozzles:
- nd = state.nozzles[0].nozzle_diameter
- if nd:
- nozzle_diameter = nd
- slot_extruder = None
- if state.ams_extruder_map:
- if ams_id == 255:
- # External slots: ext-L (tray 0) → extruder 1, ext-R (tray 1) → extruder 0
- slot_extruder = 1 - slot_id
- else:
- slot_extruder = state.ams_extruder_map.get(str(ams_id))
- # 3-stage K-profile cascade: local SpoolKProfile → Spoolman SpoolmanKProfile
- # → live tray.cali_idx fallback. Pre-Phase-13 only handled the local path
- # and exited silently if no SpoolKProfile match; Spoolman-assigned slots
- # were ignored entirely and live cali_idx was never re-asserted.
- matching_cali_idx: int | None = None
- matching_filament_id: str = tray_info_idx
- async with async_session() as db:
- from sqlalchemy import or_, select as sa_select
- from sqlalchemy.orm import selectinload
- # Stage 1: local SpoolAssignment + SpoolKProfile match
- result = await db.execute(
- sa_select(SA)
- .options(selectinload(SA.spool).selectinload(Spool.k_profiles))
- .where(SA.printer_id == printer_id, SA.ams_id == ams_id, SA.tray_id == slot_id)
- )
- assignment = result.scalar_one_or_none()
- spool: Spool | None = assignment.spool if assignment else None
- # Stage 1b: tag-based fallback. The slot may have just been reset
- # (SpoolAssignment row deleted) before the user triggered a re-read.
- # The live tray already carries the spool's tray_uuid/tag_uid from
- # the RFID re-read, but the SA row hasn't been re-created yet.
- # Without this fallback we miss the stored SpoolKProfile and Stage 3
- # ends up re-asserting whatever cali_idx the firmware reset to
- # (typically the default profile).
- if spool is None:
- norm_uuid = normalize_tray_uuid(tray_uuid) if tray_uuid else ""
- norm_tag = normalize_tag_uid(tag_uid) if tag_uid else ""
- tag_filters = []
- if norm_uuid and norm_uuid != ZERO_TRAY_UUID:
- tag_filters.append(Spool.tray_uuid == norm_uuid)
- if norm_tag and norm_tag != ZERO_TAG_UID:
- tag_filters.append(Spool.tag_uid == norm_tag)
- if tag_filters:
- tag_lookup = await db.execute(
- sa_select(Spool).options(selectinload(Spool.k_profiles)).where(or_(*tag_filters)).limit(1)
- )
- spool = tag_lookup.scalar_one_or_none()
- if spool is not None:
- logger.info(
- "PA re-apply AMS%d-T%d: matched spool %d via tag fallback "
- "(SpoolAssignment row missing, likely after slot reset)",
- ams_id,
- slot_id,
- spool.id,
- )
- if spool is not None and spool.k_profiles:
- # Prefer exact extruder match, fall back to extruder-agnostic kp
- # for the same printer + nozzle. Hard-skipping on extruder
- # mismatch made the cascade refuse perfectly valid stored
- # profiles whenever the AMS-extruder mapping had shifted since
- # calibration time, falling all the way through to Stage 3 and
- # re-asserting the firmware default.
- exact_kp = None
- fallback_kp = None
- for kp in spool.k_profiles:
- if kp.printer_id != printer_id or kp.nozzle_diameter != nozzle_diameter or kp.cali_idx is None:
- continue
- if slot_extruder is not None and kp.extruder is not None and kp.extruder == slot_extruder:
- exact_kp = kp
- break
- if fallback_kp is None:
- fallback_kp = kp
- chosen_kp = exact_kp or fallback_kp
- if chosen_kp is not None:
- matching_cali_idx = chosen_kp.cali_idx
- # The filament_id in extrusion_cali_sel must match the preset
- # under which the K-profile was calibrated. Prefer the spool's
- # slicer_filament setting, falling back to the tray's RFID value.
- matching_filament_id = spool.slicer_filament or tray_info_idx
- # Stage 2: Spoolman SpoolmanSlotAssignment + SpoolmanKProfile match
- # (only when no local spool was matched — local takes priority,
- # including the tag-based fallback above)
- if matching_cali_idx is None and spool is None:
- sm_result = await db.execute(
- sa_select(SpoolmanSlotAssignment).where(
- SpoolmanSlotAssignment.printer_id == printer_id,
- SpoolmanSlotAssignment.ams_id == ams_id,
- SpoolmanSlotAssignment.tray_id == slot_id,
- )
- )
- sm_assignment = sm_result.scalar_one_or_none()
- if sm_assignment:
- kp_result = await db.execute(
- sa_select(SpoolmanKProfile).where(
- SpoolmanKProfile.spoolman_spool_id == sm_assignment.spoolman_spool_id,
- SpoolmanKProfile.printer_id == printer_id,
- )
- )
- for kp in kp_result.scalars().all():
- if kp.nozzle_diameter == nozzle_diameter:
- if slot_extruder is not None and kp.extruder is not None and kp.extruder != slot_extruder:
- continue
- if kp.cali_idx is not None:
- matching_cali_idx = kp.cali_idx
- # Spoolman has no slicer_filament — use the tray's RFID value
- matching_filament_id = tray_info_idx
- break
- # Stage 3: live tray.cali_idx fallback. Re-asserts the printer's current
- # selection so the value sticks across the RFID re-read (otherwise some
- # firmwares clear cali_idx back to -1 mid-cycle).
- if matching_cali_idx is None:
- live_cali_idx = tray.get("cali_idx")
- if live_cali_idx is not None and live_cali_idx >= 0:
- matching_cali_idx = live_cali_idx
- if matching_cali_idx is None:
- logger.debug(
- "PA re-apply AMS%d-T%d: no stored or live cali_idx — skipping MQTT",
- ams_id,
- slot_id,
- )
- return
- logger.info(
- "PA re-apply AMS%d-T%d: cali_idx=%d, filament_id=%s",
- ams_id,
- slot_id,
- matching_cali_idx,
- matching_filament_id,
- )
- # NOTE: Do NOT send ams_set_filament_setting here — it tells the firmware
- # "this is a manual config" which destroys the RFID-detected spool state
- # (changes eye icon to pen icon in slicer).
- client.extrusion_cali_sel(
- ams_id=ams_id,
- tray_id=slot_id,
- cali_idx=matching_cali_idx,
- filament_id=matching_filament_id,
- nozzle_diameter=nozzle_diameter,
- )
- # NOTE: Do NOT send extrusion_cali_set here. extrusion_cali_sel already
- # selected the correct profile by cali_idx. Sending extrusion_cali_set with
- # the same cali_idx would MODIFY the existing profile's metadata (extruder_id,
- # nozzle_id, name), corrupting it.
- logger.info(
- "Applied PA profile cali_idx=%d to printer %d AMS%d-T%d",
- matching_cali_idx,
- printer_id,
- ams_id,
- slot_id,
- )
- except Exception as e:
- logger.warning("Failed to apply PA profile after RFID re-read: %s", e)
- @router.post("/{printer_id}/ams/load")
- async def ams_load(
- printer_id: int,
- tray_id: int = Query(..., description="Tray ID: 0-15 for AMS slots (ams_id*4+slot_id), 254 for external spool"),
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Load filament from a specific AMS slot or external spool.
- Tray ID encoding (matches Bambu firmware convention):
- - 0..15: AMS slot, computed as ams_id * 4 + slot_id
- - 254: external spool (single-external printers, or Ext-L on dual-nozzle H2D)
- - 255: Ext-R on dual-nozzle H2D
- """
- if tray_id not in range(16) and tray_id not in (254, 255):
- raise HTTPException(400, "tray_id must be 0..15 (AMS slot), 254 (external / Ext-L), or 255 (Ext-R)")
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- client = printer_manager.get_client(printer_id)
- if not client:
- raise HTTPException(400, "Printer not connected")
- success = client.ams_load_filament(tray_id)
- if not success:
- raise HTTPException(500, "Failed to send load command")
- if tray_id == 254:
- target = "external spool"
- elif tray_id == 255:
- target = "Ext-R"
- else:
- target = f"AMS {tray_id // 4} slot {tray_id % 4 + 1}"
- return {"success": True, "message": f"Loading filament from {target}"}
- @router.post("/{printer_id}/ams/unload")
- async def ams_unload(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Unload the currently loaded filament."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- client = printer_manager.get_client(printer_id)
- if not client:
- raise HTTPException(400, "Printer not connected")
- success = client.ams_unload_filament()
- if not success:
- raise HTTPException(500, "Failed to send unload command")
- return {"success": True, "message": "Unloading filament"}
- @router.get("/{printer_id}/runtime-debug")
- async def get_runtime_debug(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_READ),
- db: AsyncSession = Depends(get_db),
- ):
- """Debug endpoint: Get runtime tracking status for a printer."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- state = printer_manager.get_status(printer_id)
- return {
- "printer_name": printer.name,
- "runtime_seconds": printer.runtime_seconds,
- "runtime_hours": printer.runtime_seconds / 3600.0 if printer.runtime_seconds else 0,
- "print_hours_offset": printer.print_hours_offset,
- "total_hours": (printer.runtime_seconds / 3600.0 if printer.runtime_seconds else 0)
- + (printer.print_hours_offset or 0),
- "last_runtime_update": printer.last_runtime_update.isoformat() if printer.last_runtime_update else None,
- "mqtt_state": {
- "connected": state.connected if state else False,
- "state": state.state if state else None,
- "progress": state.progress if state else None,
- "gcode_file": state.gcode_file if state else None,
- }
- if state
- else None,
- "is_active": printer.is_active,
- }
|