| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322 |
- import asyncio
- import logging
- import re
- import zipfile
- from fastapi import APIRouter, Depends, HTTPException, Query
- from fastapi.responses import Response
- from sqlalchemy import func, select
- from sqlalchemy.ext.asyncio import AsyncSession
- from backend.app.core.auth import RequireCameraStreamTokenIfAuthEnabled, RequirePermissionIfAuthEnabled
- from backend.app.core.config import settings
- from backend.app.core.database import get_db
- from backend.app.core.permissions import Permission
- from backend.app.models.ams_label import AmsLabel
- from backend.app.models.printer import Printer
- from backend.app.models.slot_preset import SlotPresetMapping
- from backend.app.schemas.printer import (
- AmsLabelBody,
- AMSTray,
- AMSUnit,
- FilaSwitchResponse,
- HMSErrorResponse,
- NozzleInfoResponse,
- NozzleRackSlot,
- PrinterCreate,
- PrinterResponse,
- PrinterStatus,
- PrinterUpdate,
- PrintOptionsResponse,
- )
- from backend.app.services.bambu_ftp import (
- cache_3mf_download,
- delete_file_async,
- download_file_bytes_async,
- download_file_try_paths_async,
- get_cached_3mf,
- get_storage_info_async,
- list_files_async,
- )
- from backend.app.services.printer_manager import (
- get_derived_status_name,
- printer_manager,
- resolve_plate_id,
- supports_chamber_temp,
- supports_drying,
- )
- logger = logging.getLogger(__name__)
- router = APIRouter(prefix="/printers", tags=["printers"])
- @router.get("/", response_model=list[PrinterResponse])
- async def list_printers(
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_READ),
- db: AsyncSession = Depends(get_db),
- ):
- """List all configured printers."""
- result = await db.execute(select(Printer).order_by(Printer.name))
- return list(result.scalars().all())
- @router.post("/", response_model=PrinterResponse)
- async def create_printer(
- printer_data: PrinterCreate,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CREATE),
- db: AsyncSession = Depends(get_db),
- ):
- """Add a new printer."""
- # Check if serial number already exists
- result = await db.execute(select(Printer).where(Printer.serial_number == printer_data.serial_number))
- if result.scalar_one_or_none():
- raise HTTPException(400, "Printer with this serial number already exists")
- printer = Printer(**printer_data.model_dump())
- db.add(printer)
- await db.commit()
- await db.refresh(printer)
- # Connect to the printer
- if printer.is_active:
- await printer_manager.connect_printer(printer)
- return printer
- @router.get("/usb-cameras")
- async def list_usb_cameras(
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_READ),
- ):
- """List available USB cameras connected to the system.
- Returns a list of detected V4L2 video devices with their info.
- Only works on Linux systems with V4L2 support.
- Returns:
- List of dicts with {device: str, name: str, capabilities: list, formats?: list}
- """
- from backend.app.services.external_camera import list_usb_cameras
- cameras = list_usb_cameras()
- return {"cameras": cameras}
- @router.get("/available-filaments")
- async def get_available_filaments(
- model: str = Query(..., description="Target printer model"),
- location: str | None = Query(None, description="Optional location filter"),
- _=RequirePermissionIfAuthEnabled(Permission.QUEUE_CREATE),
- db: AsyncSession = Depends(get_db),
- ):
- """Get deduplicated list of filaments loaded across all active printers of a given model.
- Used by the frontend to offer filament override options for model-based queue assignment.
- """
- from backend.app.utils.printer_models import normalize_printer_model, normalize_printer_model_id
- # Normalize model name
- normalized_model = normalize_printer_model(model) or normalize_printer_model_id(model) or model
- query = (
- select(Printer).where(func.lower(Printer.model) == normalized_model.lower()).where(Printer.is_active == True) # noqa: E712
- )
- if location:
- query = query.where(Printer.location == location)
- result = await db.execute(query)
- printers_list = list(result.scalars().all())
- if not printers_list:
- return []
- # Collect filaments from all matching printers
- # Dedup key includes extruder_id and tray_sub_brands so "PLA Basic" and "PLA Matte" appear separately
- seen: set[tuple[str, str, str, int | None]] = set() # (type_upper, color_normalized, sub_brands_upper, extruder_id)
- filaments = []
- for printer in printers_list:
- status = printer_manager.get_status(printer.id)
- if not status:
- continue
- # Get ams_extruder_map for dual-nozzle printers
- ams_extruder_map = status.raw_data.get("ams_extruder_map", {})
- # AMS trays
- for ams_unit in status.raw_data.get("ams", []):
- ams_id = str(ams_unit.get("id", 0))
- extruder_id = ams_extruder_map.get(ams_id)
- for tray in ams_unit.get("tray", []):
- tray_type = tray.get("tray_type")
- if not tray_type:
- continue
- tray_color = tray.get("tray_color", "")
- # Normalize color: remove alpha, add hash
- hex_color = tray_color.replace("#", "")[:6] if tray_color else "808080"
- color = f"#{hex_color}"
- tray_info_idx = tray.get("tray_info_idx", "")
- tray_sub_brands = tray.get("tray_sub_brands", "") or ""
- key = (tray_type.upper(), hex_color.lower(), tray_sub_brands.upper(), extruder_id)
- if key not in seen:
- seen.add(key)
- filaments.append(
- {
- "type": tray_type,
- "color": color,
- "tray_info_idx": tray_info_idx,
- "tray_sub_brands": tray_sub_brands,
- "extruder_id": extruder_id,
- }
- )
- # External spools (vt_tray)
- for vt in status.raw_data.get("vt_tray") or []:
- vt_type = vt.get("tray_type")
- if not vt_type:
- continue
- vt_color = vt.get("tray_color", "")
- hex_color = vt_color.replace("#", "")[:6] if vt_color else "808080"
- color = f"#{hex_color}"
- tray_info_idx = vt.get("tray_info_idx", "")
- tray_sub_brands = vt.get("tray_sub_brands", "") or ""
- vt_id = int(vt.get("id", 254))
- extruder_id = (255 - vt_id) if ams_extruder_map else None
- key = (vt_type.upper(), hex_color.lower(), tray_sub_brands.upper(), extruder_id)
- if key not in seen:
- seen.add(key)
- filaments.append(
- {
- "type": vt_type,
- "color": color,
- "tray_info_idx": tray_info_idx,
- "tray_sub_brands": tray_sub_brands,
- "extruder_id": extruder_id,
- }
- )
- return filaments
- @router.get("/developer-mode-warnings")
- async def get_developer_mode_warnings(
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_READ),
- db: AsyncSession = Depends(get_db),
- ):
- """Check if any connected printer lacks developer LAN mode."""
- result = await db.execute(select(Printer).where(Printer.is_active == True)) # noqa: E712
- printers = result.scalars().all()
- statuses = printer_manager.get_all_statuses()
- warnings = []
- for printer in printers:
- state = statuses.get(printer.id)
- if state and state.connected and state.developer_mode is False:
- warnings.append(
- {
- "printer_id": printer.id,
- "name": printer.name,
- }
- )
- return warnings
- @router.get("/{printer_id}", response_model=PrinterResponse)
- async def get_printer(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_READ),
- db: AsyncSession = Depends(get_db),
- ):
- """Get a specific printer."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- return printer
- @router.patch("/{printer_id}", response_model=PrinterResponse)
- async def update_printer(
- printer_id: int,
- printer_data: PrinterUpdate,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_UPDATE),
- db: AsyncSession = Depends(get_db),
- ):
- """Update a printer."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- update_data = printer_data.model_dump(exclude_unset=True)
- # Handle nested ROI object - flatten to individual columns
- if "plate_detection_roi" in update_data:
- roi = update_data.pop("plate_detection_roi")
- if roi:
- update_data["plate_detection_roi_x"] = roi.get("x")
- update_data["plate_detection_roi_y"] = roi.get("y")
- update_data["plate_detection_roi_w"] = roi.get("w")
- update_data["plate_detection_roi_h"] = roi.get("h")
- else:
- # Clear ROI if set to null
- update_data["plate_detection_roi_x"] = None
- update_data["plate_detection_roi_y"] = None
- update_data["plate_detection_roi_w"] = None
- update_data["plate_detection_roi_h"] = None
- for field, value in update_data.items():
- setattr(printer, field, value)
- await db.commit()
- await db.refresh(printer)
- # Reconnect if connection settings changed
- if any(k in update_data for k in ["ip_address", "access_code", "is_active"]):
- printer_manager.disconnect_printer(printer_id)
- if printer.is_active:
- await printer_manager.connect_printer(printer)
- return printer
- @router.delete("/{printer_id}")
- async def delete_printer(
- printer_id: int,
- delete_archives: bool = True,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_DELETE),
- db: AsyncSession = Depends(get_db),
- ):
- """Delete a printer.
- Args:
- printer_id: ID of the printer to delete
- delete_archives: If True (default), delete all print archives for this printer.
- If False, keep archives but remove their printer association.
- """
- from sqlalchemy import delete as sql_delete
- from backend.app.models.archive import PrintArchive
- from backend.app.models.maintenance import MaintenanceHistory, PrinterMaintenance
- from backend.app.models.spoolman_slot_assignment import SpoolmanSlotAssignment
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- printer_manager.disconnect_printer(printer_id)
- if delete_archives:
- # Delete all archives for this printer
- await db.execute(sql_delete(PrintArchive).where(PrintArchive.printer_id == printer_id))
- else:
- # Orphan the archives instead of deleting them
- from sqlalchemy import update
- await db.execute(update(PrintArchive).where(PrintArchive.printer_id == printer_id).values(printer_id=None))
- # Delete slot assignments for this printer (SQLite doesn't enforce FK cascades)
- await db.execute(sql_delete(SpoolmanSlotAssignment).where(SpoolmanSlotAssignment.printer_id == printer_id))
- # Delete maintenance history and items for this printer
- # (SQLite doesn't enforce FK cascades, so do it explicitly)
- maintenance_ids = (
- (await db.execute(select(PrinterMaintenance.id).where(PrinterMaintenance.printer_id == printer_id)))
- .scalars()
- .all()
- )
- if maintenance_ids:
- await db.execute(
- sql_delete(MaintenanceHistory).where(MaintenanceHistory.printer_maintenance_id.in_(maintenance_ids))
- )
- await db.execute(sql_delete(PrinterMaintenance).where(PrinterMaintenance.printer_id == printer_id))
- await db.delete(printer)
- await db.commit()
- return {"status": "deleted", "archives_deleted": delete_archives}
- @router.get("/{printer_id}/status", response_model=PrinterStatus)
- async def get_printer_status(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_READ),
- db: AsyncSession = Depends(get_db),
- ):
- """Get real-time status of a printer."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- state = printer_manager.get_status(printer_id)
- if not state:
- return PrinterStatus(
- id=printer_id,
- name=printer.name,
- connected=False,
- )
- # Determine cover URL if there's an active print (including paused)
- cover_url = None
- if state.state in ("RUNNING", "PAUSE") and state.gcode_file:
- cover_url = f"/api/v1/printers/{printer_id}/cover"
- # Convert HMS errors to response format
- hms_errors = [
- HMSErrorResponse(code=e.code, attr=e.attr, module=e.module, severity=e.severity)
- for e in (state.hms_errors or [])
- ]
- # Parse AMS data from raw_data
- ams_units = []
- vt_tray = []
- ams_exists = False
- raw_data = state.raw_data or {}
- # Build K-profile lookup map: cali_idx -> k_value
- # This allows looking up the calibrated K value for each AMS slot
- kprofile_map: dict[int, float] = {}
- for kp in state.kprofiles or []:
- if kp.slot_id is not None and kp.k_value:
- try:
- kprofile_map[kp.slot_id] = float(kp.k_value)
- except (ValueError, TypeError):
- pass # Skip K-profile entries with unparseable values
- if "ams" in raw_data and isinstance(raw_data["ams"], list):
- ams_exists = True
- for ams_data in raw_data["ams"]:
- # Skip if ams_data is not a dict (defensive check)
- if not isinstance(ams_data, dict):
- continue
- trays = []
- for tray_data in ams_data.get("tray", []):
- # Filter out empty/invalid tag values
- tag_uid = tray_data.get("tag_uid", "")
- if tag_uid in ("", "0000000000000000"):
- tag_uid = None
- tray_uuid = tray_data.get("tray_uuid", "")
- if tray_uuid in ("", "00000000000000000000000000000000"):
- tray_uuid = None
- # Get K value: first try tray's k field, then lookup from K-profiles
- k_value = tray_data.get("k")
- cali_idx = tray_data.get("cali_idx")
- if k_value is None and cali_idx is not None and cali_idx in kprofile_map:
- k_value = kprofile_map[cali_idx]
- trays.append(
- AMSTray(
- id=tray_data.get("id", 0),
- tray_color=tray_data.get("tray_color"),
- tray_type=tray_data.get("tray_type"),
- tray_sub_brands=tray_data.get("tray_sub_brands"),
- tray_id_name=tray_data.get("tray_id_name"),
- tray_info_idx=tray_data.get("tray_info_idx"),
- remain=tray_data.get("remain", 0),
- k=k_value,
- cali_idx=cali_idx,
- tag_uid=tag_uid,
- tray_uuid=tray_uuid,
- nozzle_temp_min=tray_data.get("nozzle_temp_min"),
- nozzle_temp_max=tray_data.get("nozzle_temp_max"),
- drying_temp=tray_data.get("drying_temp"),
- drying_time=tray_data.get("drying_time"),
- state=tray_data.get("state"),
- )
- )
- # Prefer humidity_raw (percentage) over humidity (index 1-5)
- # humidity_raw is the actual percentage value from the sensor
- humidity_raw = ams_data.get("humidity_raw")
- humidity_idx = ams_data.get("humidity")
- humidity_value = None
- if humidity_raw is not None:
- try:
- humidity_value = int(humidity_raw)
- except (ValueError, TypeError):
- pass # Skip unparseable humidity; will try index fallback
- if humidity_value is None and humidity_idx is not None:
- try:
- humidity_value = int(humidity_idx)
- except (ValueError, TypeError):
- pass # Skip unparseable humidity index; humidity remains None
- # AMS-HT has 1 tray, regular AMS has 4 trays
- is_ams_ht = len(trays) == 1
- ams_units.append(
- AMSUnit(
- id=ams_data.get("id", 0),
- humidity=humidity_value,
- temp=ams_data.get("temp"),
- is_ams_ht=is_ams_ht,
- tray=trays,
- # Serial number: Bambu MQTT uses "sn" key on AMS unit objects
- serial_number=str(ams_data.get("sn") or ams_data.get("serial_number") or ""),
- # Firmware version: populated by _handle_version_info from info.module ams/* entries
- sw_ver=str(ams_data.get("sw_ver") or ""),
- # Drying: dry_time > 0 means drying is active (minutes remaining)
- dry_time=int(ams_data.get("dry_time") or 0),
- module_type=str(ams_data.get("module_type") or ""),
- )
- )
- # Virtual tray (external spool holder) - comes from vt_tray in raw_data (list)
- if "vt_tray" in raw_data:
- for vt_data in raw_data["vt_tray"]:
- # Filter out empty/invalid tag values for vt_tray
- vt_tag_uid = vt_data.get("tag_uid", "")
- if vt_tag_uid in ("", "0000000000000000"):
- vt_tag_uid = None
- vt_tray_uuid = vt_data.get("tray_uuid", "")
- if vt_tray_uuid in ("", "00000000000000000000000000000000"):
- vt_tray_uuid = None
- # Get K value: first try tray's k field, then lookup from K-profiles
- vt_k_value = vt_data.get("k")
- vt_cali_idx = vt_data.get("cali_idx")
- if vt_k_value is None and vt_cali_idx is not None and vt_cali_idx in kprofile_map:
- vt_k_value = kprofile_map[vt_cali_idx]
- tray_id = int(vt_data.get("id", 254))
- vt_tray.append(
- AMSTray(
- id=tray_id,
- tray_color=vt_data.get("tray_color"),
- tray_type=vt_data.get("tray_type"),
- tray_sub_brands=vt_data.get("tray_sub_brands"),
- tray_id_name=vt_data.get("tray_id_name"),
- tray_info_idx=vt_data.get("tray_info_idx"),
- remain=vt_data.get("remain", 0),
- k=vt_k_value,
- cali_idx=vt_cali_idx,
- tag_uid=vt_tag_uid,
- tray_uuid=vt_tray_uuid,
- nozzle_temp_min=vt_data.get("nozzle_temp_min"),
- nozzle_temp_max=vt_data.get("nozzle_temp_max"),
- )
- )
- # Convert nozzle info to response format
- nozzles = [
- NozzleInfoResponse(
- nozzle_type=n.nozzle_type,
- nozzle_diameter=n.nozzle_diameter,
- )
- for n in (state.nozzles or [])
- ]
- # H2C nozzle rack (tool-changer dock positions)
- nozzle_rack = [
- NozzleRackSlot(
- id=n.get("id", 0),
- nozzle_type=n.get("type", ""),
- nozzle_diameter=n.get("diameter", ""),
- wear=n.get("wear"),
- stat=n.get("stat"),
- max_temp=n.get("max_temp", 0),
- serial_number=n.get("serial_number", ""),
- filament_color=n.get("filament_color", ""),
- filament_id=n.get("filament_id", ""),
- filament_type=n.get("filament_type", ""),
- )
- for n in (state.nozzle_rack or [])
- ]
- # Convert print options to response format
- print_options = PrintOptionsResponse(
- spaghetti_detector=state.print_options.spaghetti_detector,
- print_halt=state.print_options.print_halt,
- halt_print_sensitivity=state.print_options.halt_print_sensitivity,
- first_layer_inspector=state.print_options.first_layer_inspector,
- printing_monitor=state.print_options.printing_monitor,
- buildplate_marker_detector=state.print_options.buildplate_marker_detector,
- allow_skip_parts=state.print_options.allow_skip_parts,
- nozzle_clumping_detector=state.print_options.nozzle_clumping_detector,
- nozzle_clumping_sensitivity=state.print_options.nozzle_clumping_sensitivity,
- pileup_detector=state.print_options.pileup_detector,
- pileup_sensitivity=state.print_options.pileup_sensitivity,
- airprint_detector=state.print_options.airprint_detector,
- airprint_sensitivity=state.print_options.airprint_sensitivity,
- auto_recovery_step_loss=state.print_options.auto_recovery_step_loss,
- filament_tangle_detect=state.print_options.filament_tangle_detect,
- )
- # Get AMS mapping from raw_data (which AMS is connected to which nozzle)
- ams_mapping = raw_data.get("ams_mapping", [])
- # Get per-AMS extruder map from state attribute (not raw_data, to avoid race condition
- # where raw_data gets replaced during MQTT updates and ams_extruder_map is temporarily missing)
- ams_extruder_map = state.ams_extruder_map or {}
- logger.debug("API returning ams_mapping: %s, ams_extruder_map: %s", ams_mapping, ams_extruder_map)
- # tray_now from MQTT is already a global tray ID: (ams_id * 4) + slot_id
- # Per OpenBambuAPI docs: 254 = external spool, 255 = no filament, otherwise global tray ID
- # No conversion needed - just use the raw value directly
- tray_now = state.tray_now
- logger.debug("Using tray_now directly as global ID: %s", tray_now)
- # Filter out chamber temp for models that don't have a real sensor
- # P1P, P1S, A1, A1Mini report meaningless chamber_temper values
- temperatures = state.temperatures
- if not supports_chamber_temp(printer.model):
- temperatures = {
- k: v for k, v in temperatures.items() if k not in ("chamber", "chamber_target", "chamber_heating")
- }
- # Resolve the active print's archive + plate (#881 follow-up): lets the
- # printer card show the actual plate name for multi-plate 3MFs instead of
- # just the 3MF filename. Only attempted for active prints, since subtask_id
- # is only meaningful then.
- current_archive_id: int | None = None
- current_plate_id: int | None = None
- if state.state in ("RUNNING", "PAUSE"):
- current_plate_id = resolve_plate_id(state)
- if state.subtask_id:
- from backend.app.models.archive import PrintArchive
- archive_row = await db.execute(
- select(PrintArchive.id)
- .where(PrintArchive.subtask_id == state.subtask_id)
- .where(PrintArchive.printer_id == printer_id)
- .order_by(PrintArchive.created_at.desc())
- .limit(1)
- )
- current_archive_id = archive_row.scalar_one_or_none()
- return PrinterStatus(
- id=printer_id,
- name=printer.name,
- connected=state.connected,
- state=state.state,
- current_print=state.current_print,
- subtask_name=state.subtask_name,
- gcode_file=state.gcode_file,
- progress=state.progress,
- remaining_time=state.remaining_time,
- layer_num=state.layer_num,
- total_layers=state.total_layers,
- temperatures=temperatures,
- cover_url=cover_url,
- hms_errors=hms_errors,
- ams=ams_units,
- ams_exists=ams_exists,
- vt_tray=vt_tray,
- sdcard=state.sdcard,
- store_to_sdcard=state.store_to_sdcard,
- timelapse=state.timelapse,
- ipcam=state.ipcam,
- wifi_signal=state.wifi_signal,
- wired_network=state.wired_network,
- door_open=state.door_open,
- nozzles=nozzles,
- nozzle_rack=nozzle_rack,
- print_options=print_options,
- stg_cur=state.stg_cur,
- stg_cur_name=get_derived_status_name(state, printer.model),
- stg=state.stg,
- airduct_mode=state.airduct_mode,
- speed_level=state.speed_level,
- chamber_light=state.chamber_light,
- active_extruder=state.active_extruder,
- ams_mapping=ams_mapping,
- ams_extruder_map=ams_extruder_map,
- tray_now=tray_now,
- ams_status_main=state.ams_status_main,
- ams_status_sub=state.ams_status_sub,
- mc_print_sub_stage=state.mc_print_sub_stage,
- last_ams_update=state.last_ams_update,
- printable_objects_count=len(state.printable_objects),
- cooling_fan_speed=state.cooling_fan_speed,
- big_fan1_speed=state.big_fan1_speed,
- big_fan2_speed=state.big_fan2_speed,
- heatbreak_fan_speed=state.heatbreak_fan_speed,
- firmware_version=state.firmware_version,
- developer_mode=state.developer_mode if state else None,
- awaiting_plate_clear=printer_manager.is_awaiting_plate_clear(printer_id),
- supports_drying=supports_drying(printer.model, state.firmware_version),
- current_archive_id=current_archive_id,
- current_plate_id=current_plate_id,
- fila_switch=(
- FilaSwitchResponse(
- installed=state.fila_switch.installed,
- in_slots=list(state.fila_switch.in_slots),
- out_extruders=list(state.fila_switch.out_extruders),
- stat=state.fila_switch.stat,
- info=state.fila_switch.info,
- )
- if state.fila_switch and state.fila_switch.installed
- else None
- ),
- )
- @router.get("/{printer_id}/current-print-user")
- async def get_current_print_user(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_READ),
- db: AsyncSession = Depends(get_db),
- ):
- """Get the user who started the current print (for reprint tracking).
- Returns user info if available, empty object otherwise.
- This tracks users for reprints (which bypass the queue).
- For queue-based prints, use the queue item's created_by field instead.
- """
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- user_info = printer_manager.get_current_print_user(printer_id)
- return user_info or {}
- @router.post("/{printer_id}/refresh-status")
- async def refresh_printer_status(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_READ),
- db: AsyncSession = Depends(get_db),
- ):
- """Request a full status refresh from the printer (sends pushall command)."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- success = printer_manager.request_status_update(printer_id)
- if not success:
- raise HTTPException(400, "Printer not connected")
- return {"status": "refresh_requested"}
- @router.post("/{printer_id}/connect")
- async def connect_printer(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Manually connect to a printer."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- success = await printer_manager.connect_printer(printer)
- return {"connected": success}
- @router.post("/{printer_id}/disconnect")
- async def disconnect_printer(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Manually disconnect from a printer."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- printer_manager.disconnect_printer(printer_id)
- return {"connected": False}
- @router.post("/test")
- async def test_printer_connection(
- ip_address: str,
- serial_number: str,
- access_code: str,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CREATE),
- ):
- """Test connection to a printer without saving."""
- result = await printer_manager.test_connection(
- ip_address=ip_address,
- serial_number=serial_number,
- access_code=access_code,
- )
- return result
- # Cache for cover images (printer_id -> {(subtask_name, view_key) -> image_bytes}).
- # Cleared on every print start by main.py::on_print_start, so re-dispatches with
- # different plates always fetch a fresh thumbnail without needing plate in the key.
- _cover_cache: dict[int, dict[tuple[str, str], bytes]] = {}
- def clear_cover_cache(printer_id: int) -> None:
- """Clear cached cover images for a printer. Call on print start to avoid stale thumbnails."""
- _cover_cache.pop(printer_id, None)
- @router.get("/{printer_id}/cover")
- async def get_printer_cover(
- printer_id: int,
- view: str | None = None,
- db: AsyncSession = Depends(get_db),
- _: None = RequireCameraStreamTokenIfAuthEnabled,
- ):
- """Get the cover image for the current print job.
- Args:
- view: Optional view type. Use "top" for top-down build plate view (useful for skip objects).
- Default returns angled 3D perspective view.
- """
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- state = printer_manager.get_status(printer_id)
- if not state:
- raise HTTPException(404, "Printer not connected")
- # Use subtask_name as the 3MF filename (gcode_file is the path inside the 3MF)
- subtask_name = state.subtask_name
- if not subtask_name:
- raise HTTPException(404, f"No subtask_name in printer state (state={state.state})")
- # Resolve the active plate. Precedence (#1166):
- # 1. The plate Bambuddy dispatched (authoritative when we sent the print)
- # 2. plate_(\d+)\.gcode regex on state.gcode_file (works on firmware that
- # reflects the full path, e.g. some X1C builds)
- # 3. Scan the downloaded 3MF for a unique Metadata/plate_*.gcode (covers
- # per-plate archives sliced separately in Bambu Studio, where the
- # printer's gcode_file echo is just the .3mf filename)
- # 4. Fall back to plate 1
- # The 3MF-scan fallback runs later — after the file is on disk.
- plate_num = resolve_plate_id(state)
- if plate_num is not None:
- logger.info("Cover: resolved plate %s before download (subtask=%s)", plate_num, subtask_name)
- # Normalize view parameter
- view_key = view or "default"
- # Check cache. Cache by (subtask_name, view_key) only — clear_cover_cache()
- # runs on every print start, so a re-dispatch with a different plate gets
- # a fresh image regardless. Pre-#1166 the key included plate_num, but with
- # late plate resolution the cache check would always miss.
- if printer_id in _cover_cache:
- cache_key = (subtask_name, view_key)
- if cache_key in _cover_cache[printer_id]:
- return Response(content=_cover_cache[printer_id][cache_key], media_type="image/png")
- # Build possible 3MF filenames from subtask_name
- # Bambu printers may store files as "name.gcode.3mf" (sliced via Bambu Studio)
- # or just "name.3mf" (uploaded directly)
- possible_filenames = []
- if subtask_name.endswith(".3mf"):
- possible_filenames.append(subtask_name)
- else:
- # Try both naming patterns
- possible_filenames.append(f"{subtask_name}.gcode.3mf")
- possible_filenames.append(f"{subtask_name}.3mf")
- # Also try with spaces converted to underscores (Bambu Studio may normalize filenames)
- if " " in subtask_name:
- normalized = subtask_name.replace(" ", "_")
- if normalized.endswith(".3mf"):
- possible_filenames.append(normalized)
- else:
- possible_filenames.append(f"{normalized}.gcode.3mf")
- possible_filenames.append(f"{normalized}.3mf")
- # Build list of all remote paths to try
- remote_paths = []
- for filename in possible_filenames:
- remote_paths.extend(
- [
- f"/{filename}", # Root directory (most common)
- f"/cache/{filename}",
- f"/model/{filename}",
- f"/data/{filename}",
- ]
- )
- # Use first filename for temp path (will be reused)
- temp_filename = possible_filenames[0]
- temp_path = settings.archive_dir / "temp" / f"cover_{printer_id}_{temp_filename}"
- temp_path.parent.mkdir(parents=True, exist_ok=True)
- # Cache check (#972): the archive-metadata flow in main.py may have already
- # downloaded this 3MF during the print-start handler. Reusing that file
- # avoids a second 36MB transfer competing with the printer's single FTP
- # socket (which produces the 425 errors that feed the retry storm).
- downloaded = False
- using_cached = False
- for candidate_name in possible_filenames:
- cached = get_cached_3mf(printer_id, candidate_name)
- if cached:
- logger.info("Cover using cached 3MF from %s (avoided duplicate FTP)", cached)
- temp_path = cached
- downloaded = True
- using_cached = True
- break
- if not downloaded:
- logger.info(
- f"Trying to download cover for '{subtask_name}' from {printer.ip_address} (trying {len(remote_paths)} paths)"
- )
- # Retry logic for transient FTP failures
- max_retries = 2
- last_error = None
- for attempt in range(max_retries + 1):
- try:
- downloaded = await download_file_try_paths_async(
- printer.ip_address,
- printer.access_code,
- remote_paths,
- temp_path,
- printer_model=printer.model,
- )
- if downloaded:
- break
- except Exception as e:
- last_error = e
- if attempt < max_retries:
- logger.warning("FTP download attempt %s failed: %s, retrying...", attempt + 1, e)
- await asyncio.sleep(0.5 * (attempt + 1)) # Brief backoff
- else:
- logger.error("FTP download failed after %s attempts: %s", max_retries + 1, e)
- if last_error and not downloaded:
- raise HTTPException(503, f"FTP download temporarily unavailable: {last_error}")
- if not downloaded:
- raise HTTPException(
- 404,
- f"Could not download 3MF file for '{subtask_name}' from printer {printer.ip_address}. Tried: {possible_filenames}",
- )
- # Share the fresh download with the archive flow.
- cache_3mf_download(printer_id, temp_filename, temp_path)
- # Verify file actually exists and has content
- if not temp_path.exists():
- raise HTTPException(500, f"Download reported success but file not found: {temp_path}")
- file_size = temp_path.stat().st_size
- logger.info("Downloaded file size: %s bytes", file_size)
- if file_size == 0:
- if not using_cached:
- temp_path.unlink()
- raise HTTPException(500, f"Downloaded file is empty for '{subtask_name}'")
- try:
- # Extract thumbnail from 3MF (which is a ZIP file)
- try:
- zf = zipfile.ZipFile(temp_path, "r")
- except zipfile.BadZipFile:
- raise HTTPException(500, "Downloaded file is not a valid 3MF/ZIP archive")
- except OSError as e:
- logger.error("Failed to open 3MF file: %s", e, exc_info=True)
- raise HTTPException(500, "Failed to open 3MF file. Check server logs for details.")
- try:
- # 3MF-scan fallback for plate detection (#1166). Per-plate archives
- # sliced separately in Bambu Studio contain a single
- # Metadata/plate_N.gcode for the active plate, even though
- # thumbnails for all plates are bundled. Using that gcode's plate
- # number prevents falling back to plate_1.png.
- if plate_num is None:
- plate_gcodes = [name for name in zf.namelist() if re.match(r"^Metadata/plate_\d+\.gcode$", name)]
- if len(plate_gcodes) == 1:
- match = re.search(r"plate_(\d+)\.gcode", plate_gcodes[0])
- if match:
- plate_num = int(match.group(1))
- logger.info("Cover: detected plate %s from 3MF contents", plate_num)
- if plate_num is None:
- plate_num = 1
- # Try common thumbnail paths in 3MF files
- # Use plate_num to get the correct plate's thumbnail for multi-plate projects
- # Use top-down view if requested (better for skip objects modal)
- if view == "top":
- thumbnail_paths = [
- f"Metadata/top_{plate_num}.png",
- # Fall back to plate 1 if specific plate not found
- "Metadata/top_1.png",
- f"Metadata/plate_{plate_num}.png",
- "Metadata/plate_1.png",
- "Metadata/thumbnail.png",
- ]
- else:
- thumbnail_paths = [
- f"Metadata/plate_{plate_num}.png",
- # Fall back to plate 1 if specific plate not found
- "Metadata/plate_1.png",
- "Metadata/thumbnail.png",
- f"Metadata/plate_{plate_num}_small.png",
- "Metadata/plate_1_small.png",
- "Thumbnails/thumbnail.png",
- "thumbnail.png",
- ]
- for thumb_path in thumbnail_paths:
- try:
- image_data = zf.read(thumb_path)
- if printer_id not in _cover_cache:
- _cover_cache[printer_id] = {}
- _cover_cache[printer_id][(subtask_name, view_key)] = image_data
- return Response(content=image_data, media_type="image/png")
- except KeyError:
- continue
- # If no specific thumbnail found, try any PNG in Metadata
- for name in zf.namelist():
- if name.startswith("Metadata/") and name.endswith(".png"):
- image_data = zf.read(name)
- if printer_id not in _cover_cache:
- _cover_cache[printer_id] = {}
- _cover_cache[printer_id][(subtask_name, view_key)] = image_data
- return Response(content=image_data, media_type="image/png")
- raise HTTPException(404, "No thumbnail found in 3MF file")
- finally:
- zf.close()
- finally:
- # Only delete when this invocation owns the file. A cached path is
- # shared with the archive flow — removing it would force a refetch
- # the next time either flow needs the 3MF.
- if not using_cached and temp_path.exists():
- temp_path.unlink()
- # ============================================
- # File Manager Endpoints
- # ============================================
- @router.get("/{printer_id}/files")
- async def list_printer_files(
- printer_id: int,
- path: str = "/",
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_FILES),
- db: AsyncSession = Depends(get_db),
- ):
- """List files on the printer at the specified path."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- files = await list_files_async(printer.ip_address, printer.access_code, path, printer_model=printer.model)
- # Add full path to each file
- for f in files:
- f["path"] = f"{path.rstrip('/')}/{f['name']}" if path != "/" else f"/{f['name']}"
- return {
- "path": path,
- "files": files,
- }
- @router.get("/{printer_id}/files/download")
- async def download_printer_file(
- printer_id: int,
- path: str,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_FILES),
- db: AsyncSession = Depends(get_db),
- ):
- """Download a file from the printer."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- data = await download_file_bytes_async(printer.ip_address, printer.access_code, path, printer_model=printer.model)
- if data is None:
- raise HTTPException(404, f"File not found: {path}")
- # Determine content type based on extension
- filename = path.split("/")[-1]
- ext = filename.lower().split(".")[-1] if "." in filename else ""
- content_types = {
- "3mf": "application/vnd.ms-package.3dmanufacturing-3dmodel+xml",
- "gcode": "text/plain",
- "mp4": "video/mp4",
- "avi": "video/x-msvideo",
- "png": "image/png",
- "jpg": "image/jpeg",
- "jpeg": "image/jpeg",
- "json": "application/json",
- "txt": "text/plain",
- }
- content_type = content_types.get(ext, "application/octet-stream")
- return Response(
- content=data,
- media_type=content_type,
- headers={"Content-Disposition": f'attachment; filename="{filename}"'},
- )
- @router.get("/{printer_id}/files/gcode")
- async def get_printer_file_gcode(
- printer_id: int,
- path: str,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_FILES),
- db: AsyncSession = Depends(get_db),
- ):
- """Get gcode for a file stored on a printer (for preview)."""
- import io
- # Validate printer
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- data = await download_file_bytes_async(printer.ip_address, printer.access_code, path, printer_model=printer.model)
- if data is None:
- raise HTTPException(404, f"File not found: {path}")
- filename = path.split("/")[-1]
- lower = filename.lower()
- if lower.endswith(".gcode"):
- return Response(content=data, media_type="text/plain")
- if lower.endswith(".3mf"):
- try:
- with zipfile.ZipFile(io.BytesIO(data), "r") as zf:
- gcode_files = [n for n in zf.namelist() if n.endswith(".gcode")]
- if not gcode_files:
- raise HTTPException(status_code=404, detail="No gcode found in 3MF file")
- gcode_content = zf.read(gcode_files[0])
- return Response(content=gcode_content, media_type="text/plain")
- except zipfile.BadZipFile:
- raise HTTPException(status_code=400, detail="Invalid 3MF file")
- raise HTTPException(status_code=400, detail="Unsupported file type")
- @router.get("/{printer_id}/files/plates")
- async def get_printer_file_plates(
- printer_id: int,
- path: str = Query(..., description="Full path to the 3MF file on the printer"),
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_FILES),
- db: AsyncSession = Depends(get_db),
- ):
- """Get available plates from a multi-plate 3MF file stored on a printer."""
- import io
- import json
- import defusedxml.ElementTree as ET
- # Validate printer
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- filename = path.split("/")[-1]
- if not filename.lower().endswith(".3mf"):
- return {
- "printer_id": printer_id,
- "path": path,
- "filename": filename,
- "plates": [],
- "is_multi_plate": False,
- }
- data = await download_file_bytes_async(printer.ip_address, printer.access_code, path, printer_model=printer.model)
- if data is None:
- raise HTTPException(404, f"File not found: {path}")
- plates = []
- try:
- with zipfile.ZipFile(io.BytesIO(data), "r") as zf:
- namelist = zf.namelist()
- # Find all plate gcode files to determine available plates
- gcode_files = [n for n in namelist if n.startswith("Metadata/plate_") and n.endswith(".gcode")]
- # If no gcode is present (source-only or unsliced), fall back to plate JSON/PNG
- plate_indices: list[int] = []
- if gcode_files:
- for gf in gcode_files:
- try:
- plate_str = gf[15:-6] # Remove "Metadata/plate_" and ".gcode"
- plate_indices.append(int(plate_str))
- except ValueError:
- pass # Skip gcode files with non-numeric plate indices
- else:
- plate_json_files = [n for n in namelist if n.startswith("Metadata/plate_") and n.endswith(".json")]
- plate_png_files = [
- n
- for n in namelist
- if n.startswith("Metadata/plate_")
- and n.endswith(".png")
- and "_small" not in n
- and "no_light" not in n
- ]
- plate_name_candidates = plate_json_files + plate_png_files
- plate_re = re.compile(r"^Metadata/plate_(\d+)\.(json|png)$")
- seen_indices: set[int] = set()
- for name in plate_name_candidates:
- match = plate_re.match(name)
- if match:
- try:
- index = int(match.group(1))
- except ValueError:
- continue
- if index in seen_indices:
- continue
- seen_indices.add(index)
- plate_indices.append(index)
- if not plate_indices:
- return {
- "printer_id": printer_id,
- "path": path,
- "filename": filename,
- "plates": [],
- "is_multi_plate": False,
- }
- plate_indices.sort()
- # Parse model_settings.config for plate names
- plate_names = {}
- if "Metadata/model_settings.config" in namelist:
- try:
- model_content = zf.read("Metadata/model_settings.config").decode()
- model_root = ET.fromstring(model_content)
- for plate_elem in model_root.findall(".//plate"):
- plater_id = None
- plater_name = None
- for meta in plate_elem.findall("metadata"):
- key = meta.get("key")
- value = meta.get("value")
- if key == "plater_id" and value:
- try:
- plater_id = int(value)
- except ValueError:
- pass # Skip plate with unparseable ID
- elif key == "plater_name" and value:
- plater_name = value.strip()
- if plater_id is not None and plater_name:
- plate_names[plater_id] = plater_name
- except Exception:
- pass # Plate names are optional; continue without them
- # Parse slice_info.config for plate metadata
- plate_metadata = {}
- if "Metadata/slice_info.config" in namelist:
- content = zf.read("Metadata/slice_info.config").decode()
- root = ET.fromstring(content)
- for plate_elem in root.findall(".//plate"):
- plate_info = {"filaments": [], "prediction": None, "weight": None, "name": None, "objects": []}
- plate_index = None
- for meta in plate_elem.findall("metadata"):
- key = meta.get("key")
- value = meta.get("value")
- if key == "index" and value:
- try:
- plate_index = int(value)
- except ValueError:
- pass # Skip plate with unparseable index
- elif key == "prediction" and value:
- try:
- plate_info["prediction"] = int(value)
- except ValueError:
- pass # Skip unparseable prediction; leave as None
- elif key == "weight" and value:
- try:
- plate_info["weight"] = float(value)
- except ValueError:
- pass # Skip unparseable weight; leave as None
- # Get filaments used in this plate
- for filament_elem in plate_elem.findall("filament"):
- filament_id = filament_elem.get("id")
- filament_type = filament_elem.get("type", "")
- filament_color = filament_elem.get("color", "")
- used_g = filament_elem.get("used_g", "0")
- used_m = filament_elem.get("used_m", "0")
- try:
- used_grams = float(used_g)
- except (ValueError, TypeError):
- used_grams = 0
- if used_grams > 0 and filament_id:
- plate_info["filaments"].append(
- {
- "slot_id": int(filament_id),
- "type": filament_type,
- "color": filament_color,
- "used_grams": round(used_grams, 1),
- "used_meters": float(used_m) if used_m else 0,
- }
- )
- plate_info["filaments"].sort(key=lambda x: x["slot_id"])
- # Collect object names
- for obj_elem in plate_elem.findall("object"):
- obj_name = obj_elem.get("name")
- if obj_name and obj_name not in plate_info["objects"]:
- plate_info["objects"].append(obj_name)
- # Set plate name
- if plate_index is not None:
- custom_name = plate_names.get(plate_index)
- if custom_name:
- plate_info["name"] = custom_name
- elif plate_info["objects"]:
- plate_info["name"] = plate_info["objects"][0]
- plate_metadata[plate_index] = plate_info
- # Parse plate_*.json for object lists when slice_info is missing
- plate_json_objects: dict[int, list[str]] = {}
- for name in namelist:
- match = re.match(r"^Metadata/plate_(\d+)\.json$", name)
- if not match:
- continue
- try:
- plate_index = int(match.group(1))
- except ValueError:
- continue
- try:
- payload = json.loads(zf.read(name).decode())
- bbox_objects = payload.get("bbox_objects", [])
- names: list[str] = []
- for obj in bbox_objects:
- obj_name = obj.get("name") if isinstance(obj, dict) else None
- if obj_name and obj_name not in names:
- names.append(obj_name)
- if names:
- plate_json_objects[plate_index] = names
- except Exception:
- continue
- # Build plate list
- for idx in plate_indices:
- meta = plate_metadata.get(idx, {})
- has_thumbnail = f"Metadata/plate_{idx}.png" in namelist
- objects = meta.get("objects", [])
- if not objects:
- objects = plate_json_objects.get(idx, [])
- plate_name = meta.get("name")
- if not plate_name:
- plate_name = plate_names.get(idx)
- if not plate_name and objects:
- plate_name = objects[0]
- plates.append(
- {
- "index": idx,
- "name": plate_name,
- "objects": objects,
- "object_count": len(objects),
- "has_thumbnail": has_thumbnail,
- "thumbnail_url": f"/api/v1/printers/{printer_id}/files/plate-thumbnail/{idx}?path={path}",
- "print_time_seconds": meta.get("prediction"),
- "filament_used_grams": meta.get("weight"),
- "filaments": meta.get("filaments", []),
- }
- )
- except Exception as e:
- logger.warning("Failed to parse plates from printer file %s: %s", path, e)
- return {
- "printer_id": printer_id,
- "path": path,
- "filename": filename,
- "plates": plates,
- "is_multi_plate": len(plates) > 1,
- }
- @router.get("/{printer_id}/files/plate-thumbnail/{plate_index}")
- async def get_printer_file_plate_thumbnail(
- printer_id: int,
- plate_index: int,
- path: str = Query(..., description="Full path to the 3MF file on the printer"),
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_FILES),
- db: AsyncSession = Depends(get_db),
- ):
- """Get a plate thumbnail image from a printer-stored 3MF file."""
- import io
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- data = await download_file_bytes_async(printer.ip_address, printer.access_code, path, printer_model=printer.model)
- if data is None:
- raise HTTPException(404, f"File not found: {path}")
- try:
- with zipfile.ZipFile(io.BytesIO(data), "r") as zf:
- thumb_path = f"Metadata/plate_{plate_index}.png"
- if thumb_path in zf.namelist():
- image_data = zf.read(thumb_path)
- return Response(content=image_data, media_type="image/png")
- except Exception:
- pass # Corrupt or unreadable 3MF; fall through to 404
- raise HTTPException(status_code=404, detail=f"Thumbnail for plate {plate_index} not found")
- @router.post("/{printer_id}/files/download-zip")
- async def download_printer_files_as_zip(
- printer_id: int,
- request: dict,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_FILES),
- db: AsyncSession = Depends(get_db),
- ):
- """Download multiple files from the printer as a ZIP archive."""
- import io
- paths = request.get("paths", [])
- if not paths:
- raise HTTPException(400, "No files specified")
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- # Create ZIP in memory
- zip_buffer = io.BytesIO()
- with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
- for path in paths:
- try:
- data = await download_file_bytes_async(
- printer.ip_address, printer.access_code, path, printer_model=printer.model
- )
- if data:
- filename = path.split("/")[-1]
- zf.writestr(filename, data)
- except Exception as e:
- logging.warning("Failed to add %s to ZIP: %s", path, e)
- continue
- zip_buffer.seek(0)
- zip_data = zip_buffer.read()
- if len(zip_data) == 0:
- raise HTTPException(404, "No files could be downloaded")
- return Response(
- content=zip_data,
- media_type="application/zip",
- headers={"Content-Disposition": 'attachment; filename="printer-files.zip"'},
- )
- @router.delete("/{printer_id}/files")
- async def delete_printer_file(
- printer_id: int,
- path: str,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_FILES),
- db: AsyncSession = Depends(get_db),
- ):
- """Delete a file from the printer."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- success = await delete_file_async(printer.ip_address, printer.access_code, path, printer_model=printer.model)
- if not success:
- raise HTTPException(500, f"Failed to delete file: {path}")
- return {"status": "deleted", "path": path}
- @router.get("/{printer_id}/storage")
- async def get_printer_storage(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_READ),
- db: AsyncSession = Depends(get_db),
- ):
- """Get storage information from the printer."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- storage_info = await get_storage_info_async(printer.ip_address, printer.access_code, printer_model=printer.model)
- return storage_info or {"used_bytes": None, "free_bytes": None}
- # ============================================
- # MQTT Debug Logging Endpoints
- # ============================================
- @router.post("/{printer_id}/logging/enable")
- async def enable_mqtt_logging(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Enable MQTT message logging for a printer."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- success = printer_manager.enable_logging(printer_id, True)
- if not success:
- raise HTTPException(400, "Printer not connected")
- return {"logging_enabled": True}
- @router.post("/{printer_id}/logging/disable")
- async def disable_mqtt_logging(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Disable MQTT message logging for a printer."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- success = printer_manager.enable_logging(printer_id, False)
- if not success:
- raise HTTPException(400, "Printer not connected")
- return {"logging_enabled": False}
- @router.get("/{printer_id}/logging")
- async def get_mqtt_logs(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_READ),
- db: AsyncSession = Depends(get_db),
- ):
- """Get MQTT message logs for a printer."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- logs = printer_manager.get_logs(printer_id)
- return {
- "logging_enabled": printer_manager.is_logging_enabled(printer_id),
- "logs": [
- {
- "timestamp": log.timestamp,
- "topic": log.topic,
- "direction": log.direction,
- "payload": log.payload,
- }
- for log in logs
- ],
- }
- @router.delete("/{printer_id}/logging")
- async def clear_mqtt_logs(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Clear MQTT message logs for a printer."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- printer_manager.clear_logs(printer_id)
- return {"status": "cleared"}
- # ============================================
- # AMS Drying Endpoints
- # ============================================
- @router.post("/{printer_id}/drying/start")
- async def start_drying(
- printer_id: int,
- ams_id: int,
- temp: int = 45,
- duration: int = 4,
- filament: str = "",
- rotate_tray: bool = False,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Send AMS drying start command. temp=45-85, duration=hours."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- # Server-side guard: reject if this model/firmware doesn't support drying
- live_state = printer_manager.get_status(printer_id)
- firmware = live_state.firmware_version if live_state else None
- if not supports_drying(printer.model, firmware):
- raise HTTPException(400, "Drying not supported for this printer model or firmware version")
- if temp < 45 or temp > 85:
- raise HTTPException(400, "Temperature must be 45-85°C")
- if duration < 1 or duration > 24:
- raise HTTPException(400, "Duration must be 1-24 hours")
- # Inspect the live AMS unit: surface blocking dry_sf_reasons (otherwise the
- # firmware silently ignores the command — #971) and backfill an empty
- # filament field from the first loaded tray so the printer doesn't reject
- # the payload.
- target_ams: dict | None = None
- for unit in (live_state.raw_data.get("ams") if live_state else None) or []:
- try:
- if int(unit.get("id", -1)) == ams_id:
- target_ams = unit
- break
- except (TypeError, ValueError):
- continue
- if target_ams is not None:
- reason_messages = {
- 0: "Printer is busy",
- 1: "Insufficient power — too many AMS drying or external PSU required",
- 2: "AMS is busy",
- 3: "Filament is at the AMS outlet — retract it first",
- 4: "AMS is already starting a drying cycle",
- 5: "Not supported in 2D mode",
- 6: "AMS is already drying",
- 7: "AMS firmware is upgrading",
- 8: "Plug in the external AMS power adapter to start drying",
- }
- for code in target_ams.get("dry_sf_reason") or []:
- try:
- code_int = int(code)
- except (TypeError, ValueError):
- continue
- if code_int in reason_messages:
- raise HTTPException(409, reason_messages[code_int])
- if not filament:
- for tray in target_ams.get("tray") or []:
- tray_type = tray.get("tray_type")
- if tray_type:
- filament = str(tray_type)
- break
- if not filament:
- filament = "PLA"
- success = printer_manager.send_drying_command(
- printer_id, ams_id, temp, duration, mode=1, filament=filament, rotate_tray=rotate_tray
- )
- if not success:
- raise HTTPException(400, "Printer not connected")
- return {"status": "drying_started", "ams_id": ams_id, "temp": temp, "duration": duration}
- @router.post("/{printer_id}/drying/stop")
- async def stop_drying(
- printer_id: int,
- ams_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Send AMS drying stop command."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- success = printer_manager.send_drying_command(printer_id, ams_id, temp=0, duration=0, mode=0)
- if not success:
- raise HTTPException(400, "Printer not connected")
- return {"status": "drying_stopped", "ams_id": ams_id}
- # ============================================
- # Print Options (AI Detection) Endpoints
- # ============================================
- @router.post("/{printer_id}/print-options")
- async def set_print_option(
- printer_id: int,
- module_name: str,
- enabled: bool,
- print_halt: bool = True,
- sensitivity: str = "medium",
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Set an AI detection / print option on the printer.
- Valid module_name values:
- - spaghetti_detector: Spaghetti detection
- - first_layer_inspector: First layer inspection
- - printing_monitor: AI print quality monitoring
- - buildplate_marker_detector: Build plate marker detection
- - allow_skip_parts: Allow skipping failed parts
- """
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- client = printer_manager.get_client(printer_id)
- if not client or not client.state.connected:
- raise HTTPException(400, "Printer not connected")
- # Validate module_name
- valid_modules = [
- "spaghetti_detector",
- "first_layer_inspector",
- "printing_monitor",
- "buildplate_marker_detector",
- "allow_skip_parts",
- "pileup_detector",
- "clump_detector",
- "airprint_detector",
- "auto_recovery_step_loss",
- ]
- if module_name not in valid_modules:
- raise HTTPException(400, f"Invalid module_name. Must be one of: {valid_modules}")
- # Validate sensitivity
- valid_sensitivities = ["low", "medium", "high", "never_halt"]
- if sensitivity not in valid_sensitivities:
- raise HTTPException(400, f"Invalid sensitivity. Must be one of: {valid_sensitivities}")
- success = client.set_xcam_option(
- module_name=module_name,
- enabled=enabled,
- print_halt=print_halt,
- sensitivity=sensitivity,
- )
- if not success:
- raise HTTPException(500, "Failed to send command to printer")
- return {
- "success": True,
- "module_name": module_name,
- "enabled": enabled,
- "print_halt": print_halt,
- "sensitivity": sensitivity,
- }
- # ============================================
- # Calibration
- # ============================================
- @router.post("/{printer_id}/calibration")
- async def start_calibration(
- printer_id: int,
- bed_leveling: bool = False,
- vibration: bool = False,
- motor_noise: bool = False,
- nozzle_offset: bool = False,
- high_temp_heatbed: bool = False,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Start printer calibration with selected options.
- At least one option must be selected.
- Options:
- - bed_leveling: Run bed leveling calibration
- - vibration: Run vibration compensation calibration
- - motor_noise: Run motor noise cancellation calibration
- - nozzle_offset: Run nozzle offset calibration (dual nozzle printers)
- - high_temp_heatbed: Run high-temperature heatbed calibration
- """
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- client = printer_manager.get_client(printer_id)
- if not client or not client.state.connected:
- raise HTTPException(400, "Printer not connected")
- # Check that at least one option is selected
- if not any([bed_leveling, vibration, motor_noise, nozzle_offset, high_temp_heatbed]):
- raise HTTPException(400, "At least one calibration option must be selected")
- success = client.start_calibration(
- bed_leveling=bed_leveling,
- vibration=vibration,
- motor_noise=motor_noise,
- nozzle_offset=nozzle_offset,
- high_temp_heatbed=high_temp_heatbed,
- )
- if not success:
- raise HTTPException(500, "Failed to send calibration command to printer")
- return {
- "success": True,
- "bed_leveling": bed_leveling,
- "vibration": vibration,
- "motor_noise": motor_noise,
- "nozzle_offset": nozzle_offset,
- "high_temp_heatbed": high_temp_heatbed,
- }
- # ============================================================================
- # Slot Preset Mapping Endpoints
- # ============================================================================
- def _slot_preset_key(ams_id: int, tray_id: int) -> int:
- # Mirrors frontend getGlobalTrayId (amsHelpers.ts): AMS-HT (128-135) is keyed
- # by ams_id since each unit has a single slot and shares its global ID with
- # the unit itself. Regular AMS and external (255) use ams_id*4+tray_id.
- if 128 <= ams_id <= 135:
- return ams_id
- return ams_id * 4 + tray_id
- @router.get("/{printer_id}/slot-presets")
- async def get_slot_presets(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_READ),
- db: AsyncSession = Depends(get_db),
- ):
- """Get all saved slot-to-preset mappings for a printer."""
- result = await db.execute(select(SlotPresetMapping).where(SlotPresetMapping.printer_id == printer_id))
- mappings = result.scalars().all()
- return {
- _slot_preset_key(mapping.ams_id, mapping.tray_id): {
- "ams_id": mapping.ams_id,
- "tray_id": mapping.tray_id,
- "preset_id": mapping.preset_id,
- "preset_name": mapping.preset_name,
- }
- for mapping in mappings
- }
- @router.get("/{printer_id}/slot-presets/{ams_id}/{tray_id}")
- async def get_slot_preset(
- printer_id: int,
- ams_id: int,
- tray_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_READ),
- db: AsyncSession = Depends(get_db),
- ):
- """Get the saved preset for a specific slot."""
- result = await db.execute(
- select(SlotPresetMapping).where(
- SlotPresetMapping.printer_id == printer_id,
- SlotPresetMapping.ams_id == ams_id,
- SlotPresetMapping.tray_id == tray_id,
- )
- )
- mapping = result.scalar_one_or_none()
- if not mapping:
- return None
- return {
- "ams_id": mapping.ams_id,
- "tray_id": mapping.tray_id,
- "preset_id": mapping.preset_id,
- "preset_name": mapping.preset_name,
- }
- @router.put("/{printer_id}/slot-presets/{ams_id}/{tray_id}")
- async def save_slot_preset(
- printer_id: int,
- ams_id: int,
- tray_id: int,
- preset_id: str,
- preset_name: str,
- preset_source: str = "cloud",
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_UPDATE),
- db: AsyncSession = Depends(get_db),
- ):
- """Save a preset mapping for a specific slot."""
- # Check printer exists
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- if not result.scalar_one_or_none():
- raise HTTPException(404, "Printer not found")
- # Check for existing mapping
- result = await db.execute(
- select(SlotPresetMapping).where(
- SlotPresetMapping.printer_id == printer_id,
- SlotPresetMapping.ams_id == ams_id,
- SlotPresetMapping.tray_id == tray_id,
- )
- )
- mapping = result.scalar_one_or_none()
- if mapping:
- # Update existing
- mapping.preset_id = preset_id
- mapping.preset_name = preset_name
- mapping.preset_source = preset_source
- else:
- # Create new
- mapping = SlotPresetMapping(
- printer_id=printer_id,
- ams_id=ams_id,
- tray_id=tray_id,
- preset_id=preset_id,
- preset_name=preset_name,
- preset_source=preset_source,
- )
- db.add(mapping)
- await db.commit()
- await db.refresh(mapping)
- return {
- "ams_id": mapping.ams_id,
- "tray_id": mapping.tray_id,
- "preset_id": mapping.preset_id,
- "preset_name": mapping.preset_name,
- "preset_source": mapping.preset_source,
- }
- @router.delete("/{printer_id}/slot-presets/{ams_id}/{tray_id}")
- async def delete_slot_preset(
- printer_id: int,
- ams_id: int,
- tray_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_UPDATE),
- db: AsyncSession = Depends(get_db),
- ):
- """Delete a saved preset mapping for a slot."""
- result = await db.execute(
- select(SlotPresetMapping).where(
- SlotPresetMapping.printer_id == printer_id,
- SlotPresetMapping.ams_id == ams_id,
- SlotPresetMapping.tray_id == tray_id,
- )
- )
- mapping = result.scalar_one_or_none()
- if mapping:
- await db.delete(mapping)
- await db.commit()
- return {"success": True}
- @router.post("/{printer_id}/slots/{ams_id}/{tray_id}/configure")
- async def configure_ams_slot(
- printer_id: int,
- ams_id: int,
- tray_id: int,
- tray_info_idx: str = Query(...),
- tray_type: str = Query(...),
- tray_sub_brands: str = Query(...),
- tray_color: str = Query(...),
- nozzle_temp_min: int = Query(...),
- nozzle_temp_max: int = Query(...),
- cali_idx: int = Query(-1),
- nozzle_diameter: str = Query("0.4"),
- setting_id: str = Query(""),
- kprofile_filament_id: str = Query(""),
- kprofile_setting_id: str = Query(""),
- k_value: float = Query(0.0),
- db: AsyncSession = Depends(get_db),
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- ):
- """Configure an AMS slot with a specific filament setting and K profile.
- This sends two commands to the printer:
- 1. ams_filament_setting - sets filament type, color, temperature
- 2. extrusion_cali_sel - sets the K profile (pressure advance value)
- Args:
- printer_id: Database ID of the printer
- ams_id: AMS unit ID (0-3 for regular AMS, 128-135 for HT AMS)
- tray_id: Tray ID within the AMS (0-3)
- tray_info_idx: Filament ID short format (e.g., "GFL05") or user preset ID
- tray_type: Filament type (e.g., "PLA", "PETG")
- tray_sub_brands: Sub-brand/profile name (e.g., "PLA Basic", "PETG HF")
- tray_color: Color in RRGGBBAA hex format (e.g., "FFFF00FF")
- nozzle_temp_min: Minimum nozzle temperature
- nozzle_temp_max: Maximum nozzle temperature
- cali_idx: K profile calibration index (-1 for default 0.020)
- nozzle_diameter: Nozzle diameter string (e.g., "0.4")
- setting_id: Full setting ID with version (e.g., "GFSL05_07") - optional
- kprofile_filament_id: K profile's filament_id for proper K profile linking
- k_value: Direct K value to set (0.0 to skip direct K value setting)
- """
- logger = logging.getLogger(__name__)
- logger.info("[configure_ams_slot] printer_id=%s, ams_id=%s, tray_id=%s", printer_id, ams_id, tray_id)
- logger.info(
- f"[configure_ams_slot] tray_info_idx={tray_info_idx!r}, tray_type={tray_type!r}, tray_sub_brands={tray_sub_brands!r}"
- )
- logger.info(
- f"[configure_ams_slot] setting_id={setting_id!r}, kprofile_filament_id={kprofile_filament_id!r}, kprofile_setting_id={kprofile_setting_id!r}"
- )
- # Get MQTT client for this printer
- client = printer_manager.get_client(printer_id)
- if not client:
- raise HTTPException(status_code=400, detail="Printer not connected")
- # Resolve tray_info_idx for the MQTT command.
- # Priority:
- # 1. Use the provided tray_info_idx if set (including cloud-synced
- # custom presets like PFUS* / P*).
- # 2. Reuse the slot's existing tray_info_idx if it's a specific
- # (non-generic) preset for the same material.
- # 3. Fall back to a generic Bambu filament ID.
- _GENERIC_FILAMENT_IDS = {
- "PLA": "GFL99",
- "PETG": "GFG99",
- "ABS": "GFB99",
- "ASA": "GFB98",
- "PC": "GFC99",
- "PA": "GFN99",
- "NYLON": "GFN99",
- "TPU": "GFU99",
- "PVA": "GFS99",
- "HIPS": "GFS98",
- "PLA-CF": "GFL98",
- "PETG-CF": "GFG98",
- "PA-CF": "GFN98",
- "PETG HF": "GFG96",
- }
- _GENERIC_ID_VALUES = set(_GENERIC_FILAMENT_IDS.values())
- effective_tray_info_idx = tray_info_idx
- if not tray_info_idx:
- # No preset provided — try slot reuse or generic fallback
- current_tray_info_idx = ""
- current_tray_type = ""
- state = printer_manager.get_status(printer_id)
- if state and state.raw_data:
- from backend.app.api.routes.inventory import _find_tray_in_ams_data
- if ams_id == 255:
- vt_tray = state.raw_data.get("vt_tray") or []
- ext_id = tray_id + 254
- for vt in vt_tray:
- if isinstance(vt, dict) and int(vt.get("id", 254)) == ext_id:
- current_tray_info_idx = vt.get("tray_info_idx", "")
- current_tray_type = vt.get("tray_type", "")
- break
- else:
- ams_data = state.raw_data.get("ams", {})
- ams_list = (
- ams_data.get("ams", [])
- if isinstance(ams_data, dict)
- else ams_data
- if isinstance(ams_data, list)
- else []
- )
- cur_tray = _find_tray_in_ams_data(ams_list, ams_id, tray_id)
- if cur_tray:
- current_tray_info_idx = cur_tray.get("tray_info_idx", "")
- current_tray_type = cur_tray.get("tray_type", "")
- if (
- current_tray_info_idx
- and current_tray_info_idx not in _GENERIC_ID_VALUES
- and current_tray_type
- and current_tray_type.upper() == tray_type.upper()
- ):
- logger.info(
- "[configure_ams_slot] Reusing slot's existing tray_info_idx=%r (same material %r)",
- current_tray_info_idx,
- tray_type,
- )
- effective_tray_info_idx = current_tray_info_idx
- elif tray_type:
- material = tray_type.upper().strip()
- generic = (
- _GENERIC_FILAMENT_IDS.get(material)
- or _GENERIC_FILAMENT_IDS.get(material.split("-")[0].split(" ")[0])
- or ""
- )
- if generic:
- logger.info("[configure_ams_slot] Falling back to generic %r for material %r", generic, tray_type)
- effective_tray_info_idx = generic
- # Send filament setting + K-profile commands
- filament_id_for_kprofile = kprofile_filament_id if kprofile_filament_id else effective_tray_info_idx
- # Realign the slot's filament context to the K-profile's calibration
- # context. The printer's calibration table is keyed by (filament_id,
- # cali_idx) — so for the cali_idx selected via extrusion_cali_sel to
- # actually stick to the slot, ams_filament_setting must declare the
- # slot under the SAME filament_id.
- #
- # Without this, configure_ams_slot would send:
- # ams_filament_setting → tray_info_idx=GFL99 (generic from material)
- # extrusion_cali_sel → filament_id=P4d64437 (kp's preset)
- # ...and the cali_idx would silently be dropped to default because the
- # slot's filament context (GFL99) doesn't match the kp's (P4d64437).
- #
- # This realignment fires only when the kp is targeted at a different
- # preset than the user's filament selection AND the kp's preset is a
- # valid tray_info_idx (GF* official, P* local — not PFUS* cloud-user
- # which the slicer rejects in tray_info_idx).
- effective_setting_id = setting_id
- if (
- kprofile_filament_id
- and kprofile_filament_id != effective_tray_info_idx
- and not kprofile_filament_id.startswith("PFUS")
- ):
- logger.info(
- "[configure_ams_slot] realigning slot filament context to kp: tray_info_idx %r → %r, setting_id %r → %r",
- effective_tray_info_idx,
- kprofile_filament_id,
- setting_id,
- kprofile_setting_id or setting_id,
- )
- effective_tray_info_idx = kprofile_filament_id
- if kprofile_setting_id:
- effective_setting_id = kprofile_setting_id
- # Always send ams_set_filament_setting — the user explicitly clicked
- # "Configure Slot", so honor that. Previous versions skipped this for
- # RFID-tagged slots to preserve the slicer eye icon, but printers cache
- # stale tag_uid/tray_uuid after a BL spool is removed, causing the check
- # to false-positive on non-RFID slots and silently drop the command.
- success = client.ams_set_filament_setting(
- ams_id=ams_id,
- tray_id=tray_id,
- tray_info_idx=effective_tray_info_idx,
- tray_type=tray_type,
- tray_sub_brands=tray_sub_brands,
- tray_color=tray_color,
- nozzle_temp_min=nozzle_temp_min,
- nozzle_temp_max=nozzle_temp_max,
- setting_id=effective_setting_id,
- )
- if not success:
- raise HTTPException(status_code=500, detail="Failed to send filament configuration command")
- # Method 1: Select existing calibration profile by cali_idx
- # Do NOT include setting_id — BambuStudio never sends it in extrusion_cali_sel,
- # and including it causes the firmware to mislink the profile on X1C/P1S.
- client.extrusion_cali_sel(
- ams_id=ams_id,
- tray_id=tray_id,
- cali_idx=cali_idx,
- filament_id=filament_id_for_kprofile,
- nozzle_diameter=nozzle_diameter,
- )
- # Method 2: Only send extrusion_cali_set when NO existing profile was selected
- # (cali_idx == -1). When cali_idx >= 0, extrusion_cali_sel already selected the
- # correct profile. Sending extrusion_cali_set with the same cali_idx would MODIFY
- # the existing profile's metadata (extruder_id, nozzle_id, name, setting_id),
- # corrupting it — e.g., overwriting a High Flow extruder 1 profile with
- # hardcoded extruder_id=0 and nozzle_id=HS00.
- if k_value > 0 and cali_idx < 0:
- # Calculate global tray ID for extrusion_cali_set
- if ams_id <= 3:
- global_tray_id = ams_id * 4 + tray_id
- elif ams_id >= 128 and ams_id <= 135:
- global_tray_id = (ams_id - 128) * 4 + tray_id
- else:
- global_tray_id = tray_id
- client.extrusion_cali_set(
- tray_id=global_tray_id,
- k_value=k_value,
- nozzle_diameter=nozzle_diameter,
- nozzle_temp=nozzle_temp_max,
- filament_id=filament_id_for_kprofile,
- setting_id=kprofile_setting_id or "",
- name=tray_sub_brands or "",
- cali_idx=cali_idx,
- )
- # Persist the user's K-profile choice so it survives RFID re-reads and
- # session restarts. Pre-Phase-13 this was ephemeral — the MQTT command
- # took effect on the printer but bambuddy never recorded it, so the next
- # `_apply_pa_after_refresh` cycle had no stored profile to re-assert.
- if cali_idx >= 0:
- try:
- from sqlalchemy.orm import selectinload
- from backend.app.models.spool_assignment import SpoolAssignment
- from backend.app.models.spool_k_profile import SpoolKProfile
- from backend.app.models.spoolman_k_profile import SpoolmanKProfile
- from backend.app.models.spoolman_slot_assignment import SpoolmanSlotAssignment
- # Resolve slot's extruder index for the K-profile match key. Same
- # logic as _apply_pa_after_refresh: external slots invert tray→extruder,
- # AMS slots come from ams_extruder_map. Falls back to 0 (single-nozzle).
- slot_state = printer_manager.get_status(printer_id)
- slot_extruder: int | None = None
- if slot_state and slot_state.ams_extruder_map:
- if ams_id == 255:
- slot_extruder = 1 - tray_id
- else:
- slot_extruder = slot_state.ams_extruder_map.get(str(ams_id))
- kp_extruder = slot_extruder if slot_extruder is not None else 0
- # Spoolman SlotAssignment first — has UniqueConstraint, idempotent.
- sm_result = await db.execute(
- select(SpoolmanSlotAssignment).where(
- SpoolmanSlotAssignment.printer_id == printer_id,
- SpoolmanSlotAssignment.ams_id == ams_id,
- SpoolmanSlotAssignment.tray_id == tray_id,
- )
- )
- sm_assignment = sm_result.scalar_one_or_none()
- if sm_assignment:
- existing = await db.execute(
- select(SpoolmanKProfile).where(
- SpoolmanKProfile.spoolman_spool_id == sm_assignment.spoolman_spool_id,
- SpoolmanKProfile.printer_id == printer_id,
- SpoolmanKProfile.extruder == kp_extruder,
- SpoolmanKProfile.nozzle_diameter == nozzle_diameter,
- )
- )
- kp = existing.scalar_one_or_none()
- if kp:
- kp.cali_idx = cali_idx
- kp.k_value = k_value or 0.0
- kp.setting_id = kprofile_setting_id or None
- kp.name = tray_sub_brands or None
- else:
- db.add(
- SpoolmanKProfile(
- spoolman_spool_id=sm_assignment.spoolman_spool_id,
- printer_id=printer_id,
- extruder=kp_extruder,
- nozzle_diameter=nozzle_diameter,
- k_value=k_value or 0.0,
- name=tray_sub_brands or None,
- cali_idx=cali_idx,
- setting_id=kprofile_setting_id or None,
- )
- )
- await db.commit()
- logger.info(
- "[configure_ams_slot] Persisted Spoolman K-profile spool=%d printer=%d ams=%d tray=%d cali_idx=%d",
- sm_assignment.spoolman_spool_id,
- printer_id,
- ams_id,
- tray_id,
- cali_idx,
- )
- else:
- # Local SpoolAssignment + SpoolKProfile (no UNIQUE — use .first())
- local_result = await db.execute(
- select(SpoolAssignment)
- .options(selectinload(SpoolAssignment.spool))
- .where(
- SpoolAssignment.printer_id == printer_id,
- SpoolAssignment.ams_id == ams_id,
- SpoolAssignment.tray_id == tray_id,
- )
- )
- local_assignment = local_result.scalar_one_or_none()
- if local_assignment and local_assignment.spool:
- existing = await db.execute(
- select(SpoolKProfile).where(
- SpoolKProfile.spool_id == local_assignment.spool.id,
- SpoolKProfile.printer_id == printer_id,
- SpoolKProfile.extruder == kp_extruder,
- SpoolKProfile.nozzle_diameter == nozzle_diameter,
- )
- )
- # SpoolKProfile has no unique constraint on this tuple, so
- # multiple rows could theoretically exist (shouldn't, but
- # don't crash if they do). Update the first match, leave
- # any duplicates alone.
- kp = existing.scalars().first()
- if kp:
- kp.cali_idx = cali_idx
- kp.k_value = k_value or 0.0
- kp.setting_id = kprofile_setting_id or None
- kp.name = tray_sub_brands or None
- else:
- db.add(
- SpoolKProfile(
- spool_id=local_assignment.spool.id,
- printer_id=printer_id,
- extruder=kp_extruder,
- nozzle_diameter=nozzle_diameter,
- k_value=k_value or 0.0,
- name=tray_sub_brands or None,
- cali_idx=cali_idx,
- setting_id=kprofile_setting_id or None,
- )
- )
- await db.commit()
- logger.info(
- "[configure_ams_slot] Persisted local K-profile spool=%d printer=%d ams=%d tray=%d cali_idx=%d",
- local_assignment.spool.id,
- printer_id,
- ams_id,
- tray_id,
- cali_idx,
- )
- except Exception:
- # MQTT command was already sent successfully — DB persist is best-effort.
- logger.exception(
- "[configure_ams_slot] Failed to persist K-profile (printer=%d ams=%d tray=%d cali_idx=%d)",
- printer_id,
- ams_id,
- tray_id,
- cali_idx,
- )
- try:
- await db.rollback()
- except Exception:
- pass
- # Request fresh status push from printer so frontend gets updated data via WebSocket
- logger.info("[configure_ams_slot] Requesting status update from printer")
- update_result = client.request_status_update()
- logger.info("[configure_ams_slot] Status update request result: %s", update_result)
- return {
- "success": True,
- "message": f"Configured AMS {ams_id} tray {tray_id} with {tray_sub_brands}",
- }
- @router.post("/{printer_id}/ams/{ams_id}/tray/{tray_id}/reset")
- async def reset_ams_slot(
- printer_id: int,
- ams_id: int,
- tray_id: int,
- db: AsyncSession = Depends(get_db),
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- ):
- """Reset an AMS slot to empty/unconfigured state.
- This clears the filament configuration from the slot.
- """
- # Get MQTT client for this printer
- client = printer_manager.get_client(printer_id)
- if not client:
- raise HTTPException(status_code=400, detail="Printer not connected")
- # Reset the slot
- success = client.reset_ams_slot(ams_id=ams_id, tray_id=tray_id)
- if not success:
- raise HTTPException(status_code=500, detail="Failed to send reset command")
- # Also delete any saved slot preset mapping
- result = await db.execute(
- select(SlotPresetMapping).where(
- SlotPresetMapping.printer_id == printer_id,
- SlotPresetMapping.ams_id == ams_id,
- SlotPresetMapping.tray_id == tray_id,
- )
- )
- mapping = result.scalar_one_or_none()
- if mapping:
- await db.delete(mapping)
- await db.commit()
- # Request fresh status push from printer so frontend gets updated data via WebSocket
- client.request_status_update()
- return {
- "success": True,
- "message": f"Reset AMS {ams_id} tray {tray_id}",
- }
- @router.get("/{printer_id}/ams-labels")
- async def get_ams_labels(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_READ),
- db: AsyncSession = Depends(get_db),
- ):
- """Get all user-defined AMS labels for a printer, keyed by AMS unit ID.
- Labels are stored by AMS serial number. This endpoint resolves the current
- serial-to-ams_id mapping from the live printer state so the response is still
- keyed by ams_id for UI compatibility.
- """
- # Build serial -> ams_id map from live printer state
- serial_to_ams_id: dict[str, int] = {}
- state = printer_manager.get_status(printer_id)
- if state and state.raw_data:
- for ams_unit in state.raw_data.get("ams", []):
- sn = str(ams_unit.get("sn") or ams_unit.get("serial_number") or "")
- if sn:
- serial_to_ams_id[sn] = int(ams_unit.get("id", 0))
- # Collect all known serials for this printer (live + synthetic fallback keys)
- serials_to_query = set(serial_to_ams_id.keys())
- # Fetch labels for all known serials
- labels: dict[int, str] = {}
- if serials_to_query:
- result = await db.execute(select(AmsLabel).where(AmsLabel.ams_serial_number.in_(serials_to_query)))
- for lbl in result.scalars().all():
- aid = serial_to_ams_id.get(lbl.ams_serial_number)
- if aid is not None:
- labels[aid] = lbl.label
- # Also fetch labels stored under synthetic keys for this printer (backward compat)
- # Collect all synthetic keys first, then query with a single IN clause.
- if state and state.raw_data:
- synthetic_key_to_aid: dict[str, int] = {
- f"p{printer_id}a{int(ams_unit.get('id', 0))}": int(ams_unit.get("id", 0))
- for ams_unit in state.raw_data.get("ams", [])
- if int(ams_unit.get("id", 0)) not in labels
- }
- if synthetic_key_to_aid:
- result = await db.execute(
- select(AmsLabel).where(AmsLabel.ams_serial_number.in_(synthetic_key_to_aid.keys()))
- )
- for lbl in result.scalars().all():
- aid = synthetic_key_to_aid.get(lbl.ams_serial_number)
- if aid is not None:
- labels[aid] = lbl.label
- return labels
- @router.put("/{printer_id}/ams-labels/{ams_id}")
- async def save_ams_label(
- printer_id: int,
- ams_id: int,
- body: AmsLabelBody,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_UPDATE),
- db: AsyncSession = Depends(get_db),
- ):
- """Create or update the friendly name for a specific AMS unit.
- When ``ams_serial`` is provided the label is stored under that serial number so
- it survives the AMS being moved to a different printer. When it is absent (e.g.
- older firmware that does not report a serial) a synthetic key based on the
- printer_id and ams_id is used as a fallback.
- """
- # Verify printer exists
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- if not result.scalar_one_or_none():
- raise HTTPException(404, "Printer not found")
- # Determine the serial key to store under
- stripped = body.ams_serial.strip() if body.ams_serial else ""
- serial_key = stripped if stripped else f"p{printer_id}a{ams_id}"
- result = await db.execute(select(AmsLabel).where(AmsLabel.ams_serial_number == serial_key))
- existing = result.scalar_one_or_none()
- if existing:
- existing.label = body.label
- existing.ams_id = ams_id
- else:
- db.add(AmsLabel(ams_serial_number=serial_key, ams_id=ams_id, label=body.label))
- await db.commit()
- return {"ams_id": ams_id, "label": body.label}
- @router.delete("/{printer_id}/ams-labels/{ams_id}")
- async def delete_ams_label(
- printer_id: int,
- ams_id: int,
- ams_serial: str = Query(default="", max_length=50),
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_UPDATE),
- db: AsyncSession = Depends(get_db),
- ):
- """Delete the friendly name for a specific AMS unit, reverting to the auto label."""
- stripped = ams_serial.strip() if ams_serial else ""
- serial_key = stripped if stripped else f"p{printer_id}a{ams_id}"
- result = await db.execute(select(AmsLabel).where(AmsLabel.ams_serial_number == serial_key))
- existing = result.scalar_one_or_none()
- if existing:
- await db.delete(existing)
- await db.commit()
- return {"success": True}
- @router.post("/{printer_id}/debug/simulate-print-complete")
- async def debug_simulate_print_complete(
- printer_id: int,
- db: AsyncSession = Depends(get_db),
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- ):
- """DEBUG: Simulate print completion to test freeze behavior.
- This triggers the same code path as a real print completion,
- without needing to wait for an actual print to finish.
- """
- from backend.app.main import _active_prints, on_print_complete
- from backend.app.models.archive import PrintArchive
- # Get the most recent archive for this printer
- result = await db.execute(
- select(PrintArchive)
- .where(PrintArchive.printer_id == printer_id)
- .order_by(PrintArchive.created_at.desc())
- .limit(1)
- )
- archive = result.scalar_one_or_none()
- if not archive:
- raise HTTPException(status_code=404, detail="No archives found for this printer")
- # Register this archive as "active" so on_print_complete can find it
- filename = archive.file_path.split("/")[-1] if archive.file_path else "test.3mf"
- subtask_name = archive.print_name or "Test Print"
- _active_prints[(printer_id, filename)] = archive.id
- _active_prints[(printer_id, subtask_name)] = archive.id
- # Simulate print completion data
- data = {
- "status": "completed",
- "filename": filename,
- "subtask_name": subtask_name,
- "timelapse_was_active": False,
- }
- logger.info("Simulating print complete for printer %s, archive %s", printer_id, archive.id)
- # Call the actual on_print_complete handler
- await on_print_complete(printer_id, data)
- return {"success": True, "archive_id": archive.id, "message": "Print completion simulated"}
- # =============================================================================
- # Print Control Endpoints
- # =============================================================================
- @router.post("/{printer_id}/print/stop")
- async def stop_print(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Stop/cancel the current print job."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- client = printer_manager.get_client(printer_id)
- if not client:
- raise HTTPException(400, "Printer not connected")
- success = client.stop_print()
- if not success:
- raise HTTPException(500, "Failed to stop print")
- # Mark this printer as user-stopped so on_print_complete reclassifies
- # the resulting "failed"/"aborted" MQTT status as "cancelled" — otherwise
- # the HMS heuristic in _dispatch_archive_update mislabels user-cancels
- # (e.g. the H2D's cancel-sequence module-0x0C HMS) as "Layer shift".
- try:
- from backend.app.main import mark_printer_stopped_by_user
- mark_printer_stopped_by_user(printer_id)
- except Exception as _mark_err:
- logger.warning("Failed to mark printer %s as user-stopped: %s", printer_id, _mark_err)
- return {"success": True, "message": "Print stop command sent"}
- @router.post("/{printer_id}/clear-plate")
- async def clear_plate(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CLEAR_PLATE),
- db: AsyncSession = Depends(get_db),
- ):
- """Acknowledge that the build plate has been cleared after a finished/failed print.
- Sets a plate-cleared flag so the scheduler can start the next queued print.
- No MQTT command is sent to the printer — the scheduler's start_print command
- will override the FINISH/FAILED state when it sends the next job.
- """
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- if not printer_manager.is_connected(printer_id):
- raise HTTPException(400, "Printer not connected")
- # Accept the acknowledgment whenever the printer is awaiting it — not only when the
- # reported state is FINISH/FAILED. After a power cycle the printer boots into IDLE
- # but the awaiting flag persists, and the user still needs a way to ack it (#961).
- state = printer_manager.get_status(printer_id)
- awaiting = printer_manager.is_awaiting_plate_clear(printer_id)
- if not awaiting and (not state or state.state not in ("FINISH", "FAILED")):
- raise HTTPException(
- 400,
- f"Printer is not awaiting plate-clear acknowledgment (state={state.state if state else 'unknown'})",
- )
- printer_manager.set_awaiting_plate_clear(printer_id, False)
- return {"success": True, "message": "Plate cleared, next print will start shortly"}
- @router.post("/{printer_id}/print/pause")
- async def pause_print(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Pause the current print job."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- client = printer_manager.get_client(printer_id)
- if not client:
- raise HTTPException(400, "Printer not connected")
- success = client.pause_print()
- if not success:
- raise HTTPException(500, "Failed to pause print")
- return {"success": True, "message": "Print pause command sent"}
- @router.post("/{printer_id}/print/resume")
- async def resume_print(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Resume a paused print job."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- client = printer_manager.get_client(printer_id)
- if not client:
- raise HTTPException(400, "Printer not connected")
- success = client.resume_print()
- if not success:
- raise HTTPException(500, "Failed to resume print")
- return {"success": True, "message": "Print resume command sent"}
- @router.post("/{printer_id}/print-speed")
- async def set_print_speed(
- printer_id: int,
- mode: int = Query(..., description="Speed mode (1=silent, 2=standard, 3=sport, 4=ludicrous)"),
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Set the print speed mode."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- client = printer_manager.get_client(printer_id)
- if not client:
- raise HTTPException(400, "Printer not connected")
- success = client.set_print_speed(mode)
- if not success:
- raise HTTPException(500, "Failed to set print speed")
- speed_names = {1: "Silent", 2: "Standard", 3: "Sport", 4: "Ludicrous"}
- return {"success": True, "message": f"Print speed set to {speed_names.get(mode, 'Unknown')}"}
- @router.post("/{printer_id}/airduct-mode")
- async def set_airduct_mode(
- printer_id: int,
- mode: str = Query(..., description="Airduct mode: 'cooling' or 'heating'"),
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Set the airduct mode (cooling/heating) on supported printers (P2S/H2*)."""
- if mode not in ("cooling", "heating"):
- raise HTTPException(400, "Mode must be 'cooling' or 'heating'")
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- client = printer_manager.get_client(printer_id)
- if not client:
- raise HTTPException(400, "Printer not connected")
- success = client.set_airduct_mode(mode)
- if not success:
- raise HTTPException(500, "Failed to set airduct mode")
- return {"success": True, "message": f"Airduct mode set to {mode}"}
- @router.post("/{printer_id}/chamber-light")
- async def set_chamber_light(
- printer_id: int,
- on: bool = Query(..., description="True to turn on, False to turn off"),
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Turn the chamber light on or off."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- client = printer_manager.get_client(printer_id)
- if not client:
- raise HTTPException(400, "Printer not connected")
- success = client.set_chamber_light(on)
- if not success:
- raise HTTPException(500, "Failed to control chamber light")
- return {"success": True, "message": f"Chamber light {'on' if on else 'off'}"}
- @router.post("/{printer_id}/bed-jog")
- async def bed_jog(
- printer_id: int,
- distance: float = Query(
- ..., description="Relative Z distance in mm (positive = bed down / nozzle further away, negative = bed up)"
- ),
- force: bool = Query(False, description="If true, bypass soft endstops via M211 (for use when Z is not homed)"),
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Move the build plate along the Z axis by a relative distance.
- Emits a short G-code sequence via MQTT. When ``force`` is true the soft
- endstops are disabled for the duration of the move, matching the
- "ignore and move anyway" option Bambu Studio offers when the printer
- is not homed.
- """
- if distance == 0 or abs(distance) > 200:
- raise HTTPException(400, "Distance must be non-zero and ≤ 200 mm")
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- client = printer_manager.get_client(printer_id)
- if not client:
- raise HTTPException(400, "Printer not connected")
- lines = []
- if force:
- lines.append("M211 S0")
- lines += ["G91", f"G1 Z{distance:.2f} F600", "G90"]
- if force:
- lines.append("M211 S1")
- if not client.send_gcode("\n".join(lines)):
- raise HTTPException(500, "Failed to send bed-jog command")
- return {"success": True, "message": f"Bed jog {distance:+.1f} mm sent"}
- @router.post("/{printer_id}/home-axes")
- async def home_axes(
- printer_id: int,
- axes: str = Query(
- "all",
- description="Legacy; accepted values are 'z' | 'xy' | 'all'. Always runs the printer's full auto-home sequence — see below.",
- ),
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Run the printer's full auto-home sequence via bare `G28`.
- Bambu printers (H2C / H2D / H2S / X1 family) home the Z axis by moving
- the BED UP toward an endstop at the top of travel. If the toolhead is
- not already parked out of the way, a bare `G28 Z` will crash the bed
- into the toolhead — #1052 reported exactly that on H2C: the bed rose
- without stopping at a safe height because `G28 Z` skipped the
- toolhead-park step that a full `G28` runs first.
- The endpoint therefore ignores the `axes` argument and always sends a
- bare `G28`, which the firmware expands into a safe multi-step sequence
- (park toolhead → home XY → home Z). The argument is kept only for
- backward-compat with existing clients; sending an invalid value still
- returns 400 so typos surface instead of silently proceeding.
- """
- axes = axes.lower()
- if axes not in ("z", "xy", "all"):
- raise HTTPException(400, "axes must be 'z', 'xy', or 'all'")
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- client = printer_manager.get_client(printer_id)
- if not client:
- raise HTTPException(400, "Printer not connected")
- if not client.send_gcode("G28"):
- raise HTTPException(500, "Failed to send home command")
- return {"success": True, "message": "Full auto-home sequence sent"}
- @router.post("/{printer_id}/hms/clear")
- async def clear_hms_errors(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Clear HMS/print errors on the printer."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- client = printer_manager.get_client(printer_id)
- if not client:
- raise HTTPException(400, "Printer not connected")
- success = client.clear_hms_errors()
- if not success:
- raise HTTPException(500, "Failed to clear HMS errors")
- return {"success": True, "message": "HMS errors cleared"}
- @router.get("/{printer_id}/print/objects")
- async def get_printable_objects(
- printer_id: int,
- reload: bool = False,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_READ),
- db: AsyncSession = Depends(get_db),
- ):
- """Get the list of printable objects for the current print.
- Returns a list of objects with id, name, position (if available), and skip status.
- Objects that have already been skipped are marked in the skipped_objects list.
- Args:
- reload: If True, reload objects from the archive file (useful after restart)
- """
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- client = printer_manager.get_client(printer_id)
- if not client:
- raise HTTPException(400, "Printer not connected")
- # Reload objects from 3MF if requested or no objects loaded
- if reload or not client.state.printable_objects:
- subtask_name = client.state.subtask_name
- if subtask_name:
- from backend.app.services.archive import extract_printable_objects_from_3mf
- from backend.app.services.bambu_ftp import download_file_try_paths_async
- # Build possible 3MF filenames (try both .gcode.3mf and .3mf)
- possible_filenames = []
- if subtask_name.endswith(".3mf"):
- possible_filenames.append(subtask_name)
- else:
- possible_filenames.append(f"{subtask_name}.gcode.3mf")
- possible_filenames.append(f"{subtask_name}.3mf")
- # Also try with spaces converted to underscores (Bambu Studio may normalize filenames)
- if " " in subtask_name:
- normalized = subtask_name.replace(" ", "_")
- if normalized.endswith(".3mf"):
- possible_filenames.append(normalized)
- else:
- possible_filenames.append(f"{normalized}.gcode.3mf")
- possible_filenames.append(f"{normalized}.3mf")
- # Download 3MF from printer
- temp_path = settings.archive_dir / "temp" / f"objects_{printer_id}_{possible_filenames[0]}"
- temp_path.parent.mkdir(parents=True, exist_ok=True)
- # Build list of all remote paths to try
- remote_paths = []
- for filename in possible_filenames:
- remote_paths.extend([f"/{filename}", f"/cache/{filename}", f"/model/{filename}"])
- try:
- downloaded = await download_file_try_paths_async(
- printer.ip_address,
- printer.access_code,
- remote_paths,
- temp_path,
- printer_model=printer.model,
- )
- if downloaded and temp_path.exists():
- with open(temp_path, "rb") as f:
- data = f.read()
- objects, bbox_all = extract_printable_objects_from_3mf(data, include_positions=True)
- if objects:
- client.state.printable_objects = objects
- client.state.printable_objects_bbox_all = bbox_all
- logger.info("Reloaded %s objects for printer %s", len(objects), printer_id)
- except Exception as e:
- logger.debug("Failed to reload objects from printer: %s", e)
- finally:
- if temp_path.exists():
- temp_path.unlink()
- # Return objects with their skip status and position data
- objects = []
- for obj_id, obj_data in client.state.printable_objects.items():
- # Handle both old format (string name) and new format (dict with name, x, y)
- if isinstance(obj_data, dict):
- obj_entry = {
- "id": obj_id,
- "name": obj_data.get("name", f"Object {obj_id}"),
- "x": obj_data.get("x"),
- "y": obj_data.get("y"),
- "skipped": obj_id in client.state.skipped_objects,
- }
- else:
- # Legacy format: obj_data is just the name string
- obj_entry = {
- "id": obj_id,
- "name": obj_data,
- "x": None,
- "y": None,
- "skipped": obj_id in client.state.skipped_objects,
- }
- objects.append(obj_entry)
- return {
- "objects": objects,
- "total": len(objects),
- "skipped_count": len(client.state.skipped_objects),
- "is_printing": client.state.state in ("RUNNING", "PAUSE"),
- "bbox_all": getattr(client.state, "printable_objects_bbox_all", None),
- }
- @router.post("/{printer_id}/print/skip-objects")
- async def skip_objects(
- printer_id: int,
- object_ids: list[int],
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Skip specific objects during the current print.
- Args:
- object_ids: List of object identify_id values to skip
- """
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- client = printer_manager.get_client(printer_id)
- if not client:
- raise HTTPException(400, "Printer not connected")
- if not object_ids:
- raise HTTPException(400, "No object IDs provided")
- # Validate object IDs exist in printable_objects
- invalid_ids = [oid for oid in object_ids if oid not in client.state.printable_objects]
- if invalid_ids:
- raise HTTPException(400, f"Invalid object IDs: {invalid_ids}")
- success = client.skip_objects(object_ids)
- if not success:
- raise HTTPException(500, "Failed to skip objects")
- # Get names of skipped objects for response (handle both old and new format)
- skipped_names = []
- for oid in object_ids:
- obj_data = client.state.printable_objects.get(oid, str(oid))
- if isinstance(obj_data, dict):
- skipped_names.append(obj_data.get("name", str(oid)))
- else:
- skipped_names.append(obj_data)
- return {
- "success": True,
- "message": f"Skipped {len(object_ids)} object(s): {', '.join(skipped_names)}",
- "skipped_objects": object_ids,
- }
- # =============================================================================
- # AMS Control Endpoints
- # =============================================================================
- @router.post("/{printer_id}/ams/{ams_id}/slot/{slot_id}/refresh")
- async def refresh_ams_slot(
- printer_id: int,
- ams_id: int,
- slot_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_AMS_RFID),
- db: AsyncSession = Depends(get_db),
- ):
- """Re-read RFID for an AMS slot (triggers filament info refresh)."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- client = printer_manager.get_client(printer_id)
- if not client:
- raise HTTPException(400, "Printer not connected")
- success, message = client.ams_refresh_tray(ams_id, slot_id)
- if not success:
- raise HTTPException(400, message)
- # Apply PA profile after delay (RFID re-read takes a few seconds)
- asyncio.create_task(_apply_pa_after_refresh(printer_id, ams_id, slot_id))
- return {"success": True, "message": message}
- async def _apply_pa_after_refresh(printer_id: int, ams_id: int, slot_id: int):
- """Apply PA profile after RFID re-read completes.
- Waits for the printer to finish processing the RFID data, then selects
- the K-profile via extrusion_cali_sel. Does NOT re-send ams_set_filament_setting
- because that would overwrite the RFID-provided filament data.
- """
- await asyncio.sleep(5)
- try:
- from backend.app.api.routes.inventory import _find_tray_in_ams_data
- from backend.app.core.database import async_session
- from backend.app.models.spool import Spool
- from backend.app.models.spool_assignment import SpoolAssignment as SA
- from backend.app.models.spoolman_k_profile import SpoolmanKProfile
- from backend.app.models.spoolman_slot_assignment import SpoolmanSlotAssignment
- from backend.app.services.spool_tag_matcher import (
- ZERO_TAG_UID,
- ZERO_TRAY_UUID,
- is_bambu_tag,
- )
- from backend.app.utils.tag_normalization import (
- normalize_tag_uid,
- normalize_tray_uuid,
- )
- client = printer_manager.get_client(printer_id)
- if not client:
- return
- state = printer_manager.get_status(printer_id)
- if not state or not state.raw_data:
- return
- # Find current tray data (should have RFID data by now)
- ams_data = state.raw_data.get("ams", {})
- ams_list = (
- ams_data.get("ams", []) if isinstance(ams_data, dict) else ams_data if isinstance(ams_data, list) else []
- )
- tray = _find_tray_in_ams_data(ams_list, ams_id, slot_id)
- if not tray or not tray.get("tray_type"):
- logger.debug("PA re-apply: no tray data for AMS%d-T%d", ams_id, slot_id)
- return
- tag_uid = tray.get("tag_uid", "")
- tray_uuid = tray.get("tray_uuid", "")
- tray_info_idx = tray.get("tray_info_idx", "")
- if not is_bambu_tag(tag_uid, tray_uuid, tray_info_idx):
- return
- # Compute nozzle/extruder once — used by both local and Spoolman lookup.
- nozzle_diameter = "0.4"
- if state.nozzles:
- nd = state.nozzles[0].nozzle_diameter
- if nd:
- nozzle_diameter = nd
- slot_extruder = None
- if state.ams_extruder_map:
- if ams_id == 255:
- # External slots: ext-L (tray 0) → extruder 1, ext-R (tray 1) → extruder 0
- slot_extruder = 1 - slot_id
- else:
- slot_extruder = state.ams_extruder_map.get(str(ams_id))
- # 3-stage K-profile cascade: local SpoolKProfile → Spoolman SpoolmanKProfile
- # → live tray.cali_idx fallback. Pre-Phase-13 only handled the local path
- # and exited silently if no SpoolKProfile match; Spoolman-assigned slots
- # were ignored entirely and live cali_idx was never re-asserted.
- matching_cali_idx: int | None = None
- matching_filament_id: str = tray_info_idx
- async with async_session() as db:
- from sqlalchemy import or_, select as sa_select
- from sqlalchemy.orm import selectinload
- # Stage 1: local SpoolAssignment + SpoolKProfile match
- result = await db.execute(
- sa_select(SA)
- .options(selectinload(SA.spool).selectinload(Spool.k_profiles))
- .where(SA.printer_id == printer_id, SA.ams_id == ams_id, SA.tray_id == slot_id)
- )
- assignment = result.scalar_one_or_none()
- spool: Spool | None = assignment.spool if assignment else None
- # Stage 1b: tag-based fallback. The slot may have just been reset
- # (SpoolAssignment row deleted) before the user triggered a re-read.
- # The live tray already carries the spool's tray_uuid/tag_uid from
- # the RFID re-read, but the SA row hasn't been re-created yet.
- # Without this fallback we miss the stored SpoolKProfile and Stage 3
- # ends up re-asserting whatever cali_idx the firmware reset to
- # (typically the default profile).
- if spool is None:
- norm_uuid = normalize_tray_uuid(tray_uuid) if tray_uuid else ""
- norm_tag = normalize_tag_uid(tag_uid) if tag_uid else ""
- tag_filters = []
- if norm_uuid and norm_uuid != ZERO_TRAY_UUID:
- tag_filters.append(Spool.tray_uuid == norm_uuid)
- if norm_tag and norm_tag != ZERO_TAG_UID:
- tag_filters.append(Spool.tag_uid == norm_tag)
- if tag_filters:
- tag_lookup = await db.execute(
- sa_select(Spool).options(selectinload(Spool.k_profiles)).where(or_(*tag_filters)).limit(1)
- )
- spool = tag_lookup.scalar_one_or_none()
- if spool is not None:
- logger.info(
- "PA re-apply AMS%d-T%d: matched spool %d via tag fallback "
- "(SpoolAssignment row missing, likely after slot reset)",
- ams_id,
- slot_id,
- spool.id,
- )
- if spool is not None and spool.k_profiles:
- # Prefer exact extruder match, fall back to extruder-agnostic kp
- # for the same printer + nozzle. Hard-skipping on extruder
- # mismatch made the cascade refuse perfectly valid stored
- # profiles whenever the AMS-extruder mapping had shifted since
- # calibration time, falling all the way through to Stage 3 and
- # re-asserting the firmware default.
- exact_kp = None
- fallback_kp = None
- for kp in spool.k_profiles:
- if kp.printer_id != printer_id or kp.nozzle_diameter != nozzle_diameter or kp.cali_idx is None:
- continue
- if slot_extruder is not None and kp.extruder is not None and kp.extruder == slot_extruder:
- exact_kp = kp
- break
- if fallback_kp is None:
- fallback_kp = kp
- chosen_kp = exact_kp or fallback_kp
- if chosen_kp is not None:
- matching_cali_idx = chosen_kp.cali_idx
- # The filament_id in extrusion_cali_sel must match the preset
- # under which the K-profile was calibrated. Prefer the spool's
- # slicer_filament setting, falling back to the tray's RFID value.
- matching_filament_id = spool.slicer_filament or tray_info_idx
- # Stage 2: Spoolman SpoolmanSlotAssignment + SpoolmanKProfile match
- # (only when no local spool was matched — local takes priority,
- # including the tag-based fallback above)
- if matching_cali_idx is None and spool is None:
- sm_result = await db.execute(
- sa_select(SpoolmanSlotAssignment).where(
- SpoolmanSlotAssignment.printer_id == printer_id,
- SpoolmanSlotAssignment.ams_id == ams_id,
- SpoolmanSlotAssignment.tray_id == slot_id,
- )
- )
- sm_assignment = sm_result.scalar_one_or_none()
- if sm_assignment:
- kp_result = await db.execute(
- sa_select(SpoolmanKProfile).where(
- SpoolmanKProfile.spoolman_spool_id == sm_assignment.spoolman_spool_id,
- SpoolmanKProfile.printer_id == printer_id,
- )
- )
- for kp in kp_result.scalars().all():
- if kp.nozzle_diameter == nozzle_diameter:
- if slot_extruder is not None and kp.extruder is not None and kp.extruder != slot_extruder:
- continue
- if kp.cali_idx is not None:
- matching_cali_idx = kp.cali_idx
- # Spoolman has no slicer_filament — use the tray's RFID value
- matching_filament_id = tray_info_idx
- break
- # Stage 3: live tray.cali_idx fallback. Re-asserts the printer's current
- # selection so the value sticks across the RFID re-read (otherwise some
- # firmwares clear cali_idx back to -1 mid-cycle).
- if matching_cali_idx is None:
- live_cali_idx = tray.get("cali_idx")
- if live_cali_idx is not None and live_cali_idx >= 0:
- matching_cali_idx = live_cali_idx
- if matching_cali_idx is None:
- logger.debug(
- "PA re-apply AMS%d-T%d: no stored or live cali_idx — skipping MQTT",
- ams_id,
- slot_id,
- )
- return
- logger.info(
- "PA re-apply AMS%d-T%d: cali_idx=%d, filament_id=%s",
- ams_id,
- slot_id,
- matching_cali_idx,
- matching_filament_id,
- )
- # NOTE: Do NOT send ams_set_filament_setting here — it tells the firmware
- # "this is a manual config" which destroys the RFID-detected spool state
- # (changes eye icon to pen icon in slicer).
- client.extrusion_cali_sel(
- ams_id=ams_id,
- tray_id=slot_id,
- cali_idx=matching_cali_idx,
- filament_id=matching_filament_id,
- nozzle_diameter=nozzle_diameter,
- )
- # NOTE: Do NOT send extrusion_cali_set here. extrusion_cali_sel already
- # selected the correct profile by cali_idx. Sending extrusion_cali_set with
- # the same cali_idx would MODIFY the existing profile's metadata (extruder_id,
- # nozzle_id, name), corrupting it.
- logger.info(
- "Applied PA profile cali_idx=%d to printer %d AMS%d-T%d",
- matching_cali_idx,
- printer_id,
- ams_id,
- slot_id,
- )
- except Exception as e:
- logger.warning("Failed to apply PA profile after RFID re-read: %s", e)
- @router.post("/{printer_id}/ams/load")
- async def ams_load(
- printer_id: int,
- tray_id: int = Query(..., description="Tray ID: 0-15 for AMS slots (ams_id*4+slot_id), 254 for external spool"),
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Load filament from a specific AMS slot or external spool.
- Tray ID encoding (matches Bambu firmware convention):
- - 0..15: AMS slot, computed as ams_id * 4 + slot_id
- - 254: external spool (single-external printers, or Ext-L on dual-nozzle H2D)
- - 255: Ext-R on dual-nozzle H2D
- """
- if tray_id not in range(16) and tray_id not in (254, 255):
- raise HTTPException(400, "tray_id must be 0..15 (AMS slot), 254 (external / Ext-L), or 255 (Ext-R)")
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- client = printer_manager.get_client(printer_id)
- if not client:
- raise HTTPException(400, "Printer not connected")
- success = client.ams_load_filament(tray_id)
- if not success:
- raise HTTPException(500, "Failed to send load command")
- if tray_id == 254:
- target = "external spool"
- elif tray_id == 255:
- target = "Ext-R"
- else:
- target = f"AMS {tray_id // 4} slot {tray_id % 4 + 1}"
- return {"success": True, "message": f"Loading filament from {target}"}
- @router.post("/{printer_id}/ams/unload")
- async def ams_unload(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_CONTROL),
- db: AsyncSession = Depends(get_db),
- ):
- """Unload the currently loaded filament."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- client = printer_manager.get_client(printer_id)
- if not client:
- raise HTTPException(400, "Printer not connected")
- success = client.ams_unload_filament()
- if not success:
- raise HTTPException(500, "Failed to send unload command")
- return {"success": True, "message": "Unloading filament"}
- @router.get("/{printer_id}/runtime-debug")
- async def get_runtime_debug(
- printer_id: int,
- _=RequirePermissionIfAuthEnabled(Permission.PRINTERS_READ),
- db: AsyncSession = Depends(get_db),
- ):
- """Debug endpoint: Get runtime tracking status for a printer."""
- result = await db.execute(select(Printer).where(Printer.id == printer_id))
- printer = result.scalar_one_or_none()
- if not printer:
- raise HTTPException(404, "Printer not found")
- state = printer_manager.get_status(printer_id)
- return {
- "printer_name": printer.name,
- "runtime_seconds": printer.runtime_seconds,
- "runtime_hours": printer.runtime_seconds / 3600.0 if printer.runtime_seconds else 0,
- "print_hours_offset": printer.print_hours_offset,
- "total_hours": (printer.runtime_seconds / 3600.0 if printer.runtime_seconds else 0)
- + (printer.print_hours_offset or 0),
- "last_runtime_update": printer.last_runtime_update.isoformat() if printer.last_runtime_update else None,
- "mqtt_state": {
- "connected": state.connected if state else False,
- "state": state.state if state else None,
- "progress": state.progress if state else None,
- "gcode_file": state.gcode_file if state else None,
- }
- if state
- else None,
- "is_active": printer.is_active,
- }
|