| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800380138023803380438053806380738083809381038113812381338143815381638173818381938203821382238233824382538263827382838293830383138323833383438353836383738383839384038413842384338443845384638473848384938503851385238533854385538563857385838593860386138623863386438653866386738683869387038713872387338743875387638773878387938803881388238833884388538863887388838893890389138923893389438953896389738983899390039013902390339043905390639073908390939103911391239133914391539163917391839193920392139223923392439253926392739283929393039313932393339343935393639373938393939403941394239433944394539463947394839493950395139523953395439553956395739583959396039613962396339643965396639673968396939703971397239733974397539763977397839793980398139823983398439853986398739883989399039913992399339943995399639973998399940004001400240034004400540064007400840094010401140124013401440154016401740184019402040214022402340244025402640274028402940304031403240334034403540364037403840394040404140424043404440454046404740484049405040514052405340544055405640574058405940604061406240634064406540664067406840694070407140724073407440754076407740784079408040814082408340844085408640874088408940904091409240934094409540964097409840994100410141024103410441054106410741084109411041114112411341144115411641174118411941204121412241234124412541264127412841294130413141324133413441354136413741384139414041414142414341444145414641474148414941504151415241534154415541564157415841594160416141624163416441654166416741684169417041714172417341744175417641774178417941804181418241834184418541864187418841894190419141924193419441954196419741984199420042014202420342044205420642074208420942104211421242134214421542164217421842194220422142224223422442254226422742284229423042314232423342344235423642374238423942404241424242434244424542464247424842494250425142524253425442554256425742584259426042614262426342644265426642674268426942704271427242734274427542764277427842794280428142824283428442854286428742884289429042914292429342944295429642974298429943004301430243034304430543064307430843094310431143124313431443154316431743184319432043214322432343244325432643274328432943304331433243334334433543364337433843394340434143424343434443454346434743484349435043514352435343544355435643574358435943604361436243634364436543664367436843694370437143724373437443754376437743784379438043814382438343844385438643874388438943904391439243934394439543964397439843994400440144024403440444054406440744084409441044114412441344144415441644174418441944204421442244234424442544264427442844294430443144324433443444354436443744384439444044414442444344444445444644474448444944504451445244534454445544564457445844594460446144624463446444654466446744684469447044714472447344744475447644774478447944804481448244834484448544864487448844894490449144924493449444954496449744984499450045014502450345044505450645074508450945104511451245134514451545164517451845194520452145224523452445254526452745284529453045314532453345344535453645374538453945404541454245434544454545464547454845494550455145524553455445554556455745584559456045614562456345644565456645674568456945704571457245734574457545764577457845794580458145824583458445854586458745884589459045914592459345944595459645974598459946004601460246034604460546064607460846094610461146124613461446154616461746184619462046214622462346244625462646274628462946304631463246334634463546364637463846394640464146424643464446454646464746484649465046514652465346544655465646574658465946604661466246634664466546664667466846694670467146724673467446754676467746784679468046814682468346844685468646874688468946904691469246934694469546964697469846994700470147024703470447054706470747084709471047114712471347144715471647174718471947204721472247234724472547264727472847294730473147324733473447354736473747384739474047414742474347444745474647474748474947504751475247534754475547564757475847594760476147624763476447654766476747684769477047714772477347744775477647774778477947804781478247834784478547864787478847894790479147924793479447954796479747984799480048014802480348044805480648074808480948104811481248134814481548164817481848194820482148224823482448254826482748284829483048314832483348344835483648374838483948404841484248434844484548464847484848494850485148524853485448554856485748584859486048614862486348644865486648674868486948704871487248734874487548764877487848794880488148824883488448854886488748884889489048914892489348944895489648974898489949004901490249034904490549064907490849094910491149124913491449154916491749184919492049214922492349244925492649274928492949304931493249334934493549364937493849394940494149424943494449454946494749484949495049514952495349544955495649574958495949604961496249634964496549664967496849694970497149724973497449754976497749784979498049814982498349844985498649874988498949904991499249934994499549964997499849995000500150025003500450055006500750085009501050115012501350145015501650175018501950205021502250235024502550265027502850295030503150325033503450355036503750385039504050415042504350445045504650475048504950505051505250535054505550565057505850595060506150625063506450655066506750685069507050715072507350745075507650775078 |
- """Bambu Lab MQTT communication service.
- IMPORTANT: Always use qos=1 for all MQTT publish calls!
- The printer ignores qos=0 messages when busy broadcasting status updates.
- Using qos=1 ensures the printer acknowledges and processes our commands immediately.
- This was discovered when K-profile requests with qos=0 took 20-30 seconds,
- but with qos=1 they respond instantly.
- """
- import asyncio
- import json
- import logging
- import os
- import ssl
- import threading
- import time
- from collections import deque
- from collections.abc import Callable
- from dataclasses import dataclass, field
- from datetime import datetime, timezone
- import paho.mqtt.client as mqtt
- logger = logging.getLogger(__name__)
- # AMS module name prefixes used in get_version responses.
- # The numeric suffix after '/' is the AMS unit ID as reported in push_status.
- # "ams/<id>" – original AMS (X1C, X1E, P1S, …)
- # "n3f/<id>" – AMS 2 Pro (H2D Pro and similar)
- # "n3s/<id>" – AMS HT (H2D Pro and similar; IDs typically start at 128)
- _AMS_MODULE_PREFIXES = ("ams/", "n3f/", "n3s/")
- @dataclass
- class MQTTLogEntry:
- """Log entry for MQTT message debugging."""
- timestamp: str
- topic: str
- direction: str # "in" or "out"
- payload: dict
- @dataclass
- class HMSError:
- """Health Management System error from printer."""
- code: str
- attr: int # Attribute value for constructing wiki URL
- module: int
- severity: int # 1=fatal, 2=serious, 3=common, 4=info
- message: str = ""
- # HMS short codes the firmware emits during normal user-cancel sequences.
- # These aren't faults — they're status echoes that confirm the cancel happened.
- # Filtering them at parse-time keeps them out of state.hms_errors entirely,
- # so they don't drive the printer card's "X problem" badge, the red pip, or
- # any other consumer that treats hms_errors as the active-fault list.
- _HMS_USER_ACTION_CODES: frozenset[str] = frozenset(
- {
- "0300_400C", # "The task was canceled."
- "0500_400E", # "Printing was cancelled."
- }
- )
- @dataclass
- class KProfile:
- """Pressure advance (K) calibration profile from printer."""
- slot_id: int
- extruder_id: int
- nozzle_id: str
- nozzle_diameter: str
- filament_id: str
- name: str
- k_value: str
- n_coef: str = "0.000000"
- ams_id: int = 0
- tray_id: int = -1
- setting_id: str | None = None
- @dataclass
- class NozzleInfo:
- """Nozzle hardware configuration."""
- nozzle_type: str = "" # "stainless_steel" or "hardened_steel"
- nozzle_diameter: str = "" # e.g., "0.4"
- @dataclass
- class FilaSwitchState:
- """Filament Track Switch (FTS) accessory state.
- The FTS is an external accessory that mediates filament routing between an
- AMS and the printer's extruders. When installed, the AMS no longer has a
- fixed extruder assignment — any slot can be routed to any extruder via the
- track switch. Detected from print.device.fila_switch in MQTT.
- """
- installed: bool = False
- # in[track] = currently loaded slot for that track (-1 = empty). The slot
- # value is reported as observed in MQTT (treated as a global tray ID).
- in_slots: list[int] = field(default_factory=list)
- # out[track] = extruder this track terminates at (0 = right/main, 1 = left)
- out_extruders: list[int] = field(default_factory=list)
- stat: int = 0 # status flags (0 = idle)
- info: int = 0 # info flags
- @dataclass
- class PrintOptions:
- """AI detection and print options from xcam data."""
- # Core AI detectors
- spaghetti_detector: bool = False
- print_halt: bool = False
- halt_print_sensitivity: str = "medium" # Spaghetti sensitivity
- first_layer_inspector: bool = False
- printing_monitor: bool = False # AI print quality monitoring
- buildplate_marker_detector: bool = False
- allow_skip_parts: bool = False
- # Additional AI detectors - decoded from cfg bitmask
- nozzle_clumping_detector: bool = True
- nozzle_clumping_sensitivity: str = "medium"
- pileup_detector: bool = True
- pileup_sensitivity: str = "medium"
- airprint_detector: bool = True
- airprint_sensitivity: str = "medium"
- auto_recovery_step_loss: bool = True # Uses print.print_option command
- filament_tangle_detect: bool = False
- @dataclass
- class PrinterState:
- connected: bool = False
- state: str = "unknown"
- current_print: str | None = None
- subtask_name: str | None = None
- progress: float = 0.0
- remaining_time: int = 0
- layer_num: int = 0
- total_layers: int = 0
- temperatures: dict = field(default_factory=dict)
- raw_data: dict = field(default_factory=dict)
- gcode_file: str | None = None
- subtask_id: str | None = None
- hms_errors: list = field(default_factory=list) # List of HMSError
- kprofiles: list = field(default_factory=list) # List of KProfile
- sdcard: bool = False # SD card inserted
- store_to_sdcard: bool = False # Store sent files on SD card (home_flag bit 11)
- timelapse: bool = False # Timelapse recording active
- ipcam: bool = False # Live view / camera streaming enabled
- wifi_signal: int | None = None # WiFi signal strength in dBm
- wired_network: bool = False # Ethernet connection detected (home_flag bit 18)
- door_open: bool = False # Enclosure door open (home_flag bit 23, X1/P1S/P2S/H2*)
- # Nozzle hardware info (for dual nozzle printers, index 0 = left, 1 = right)
- nozzles: list = field(default_factory=lambda: [NozzleInfo(), NozzleInfo()])
- # AI detection and print options
- print_options: PrintOptions = field(default_factory=PrintOptions)
- # Calibration stage tracking (from stg_cur and stg fields)
- stg_cur: int = -1 # Current stage index (-1 = not calibrating)
- stg: list = field(default_factory=list) # List of stages to execute
- # Air conditioning mode (0=cooling, 1=heating)
- airduct_mode: int = 0
- # Print speed level (1=silent, 2=standard, 3=sport, 4=ludicrous)
- speed_level: int = 2
- # Chamber light on/off
- chamber_light: bool = False
- # Active extruder for dual nozzle (0=right, 1=left) - from device.extruder.info[X].hnow
- active_extruder: int = 0
- # Currently loaded tray (global ID): 254/255 = external spools, 255 = no filament on legacy printers
- tray_now: int = 255
- # Last valid tray_now (0-253) — survives unload (255) for usage tracking after print completes
- last_loaded_tray: int = -1
- # Pending load target - used to track what tray we're loading for H2D disambiguation
- pending_tray_target: int | None = None
- # AMS status for filament change tracking (from print.ams.ams_status field)
- # ams_status is a combined value: lower 8 bits = sub status, bits 8-15 = main status
- # Main status: 0=idle, 1=filament_change, 2=rfid_identifying, 3=assist, 4=calibration, etc.
- ams_status: int = 0
- ams_status_main: int = 0 # (ams_status >> 8) & 0xFF
- ams_status_sub: int = 0 # ams_status & 0xFF
- # mc_print_sub_stage - filament change step indicator from print.mc_print_sub_stage
- # Used by OrcaSlicer/BambuStudio to track progress during filament load/unload
- mc_print_sub_stage: int = 0
- # AMS mapping for dual nozzle: which slot is active (from ams.ams_exist_bits/tray_exist_bits)
- ams_mapping: list = field(default_factory=list)
- # Per-AMS extruder map: {ams_id: extruder_id} where 0=right/main, 1=left/deputy
- ams_extruder_map: dict = field(default_factory=dict)
- # Filament Track Switch (FTS) accessory — when installed, AMS info reports
- # bits 8-11 = 0xE (uninitialized) because routing is dynamic. See #1162.
- fila_switch: "FilaSwitchState" = field(default_factory=lambda: FilaSwitchState())
- # Plate dispatched by Bambuddy for the current print. Some firmware versions
- # (P1S 01.10.00.00) only put the .3mf filename in print.gcode_file, so the
- # regex used to derive the plate number from the path always falls back to
- # plate 1 — and the printer card shows the wrong thumbnail (#1166). When
- # Bambuddy dispatches the print itself we know the plate authoritatively;
- # we record it here and prefer it over the gcode_file regex. The subtask
- # field guards against staleness: if the printer is currently running a
- # different subtask (e.g. a Studio-direct dispatch), these values are
- # ignored. Cleared on disconnect.
- dispatched_plate_id: int | None = None
- dispatched_subtask: str | None = None
- # H2D per-extruder tray_now from snow field: {extruder_id: normalized_global_tray_id}
- # snow encodes AMS ID in high byte: ams_id = snow >> 8, slot = snow & 0xFF
- h2d_extruder_snow: dict = field(default_factory=dict)
- # H2C nozzle rack: full device.nozzle.info array for tool-changer printers (>2 nozzles)
- nozzle_rack: list = field(default_factory=list)
- # Timestamp of last AMS data update (for RFID refresh detection)
- last_ams_update: float = 0.0
- # Printable objects for skip object functionality: {identify_id: object_name}
- printable_objects: dict = field(default_factory=dict)
- # Objects that have been skipped during the current print
- skipped_objects: list = field(default_factory=list)
- # Fan speeds (0-100 percentage, None if not available for this model)
- cooling_fan_speed: int | None = None # Part cooling fan
- big_fan1_speed: int | None = None # Auxiliary fan
- big_fan2_speed: int | None = None # Chamber/exhaust fan
- heatbreak_fan_speed: int | None = None # Hotend heatbreak fan
- # Tray change history during current print: [(global_tray_id, layer_num), ...]
- # Used by usage tracker to split filament weight on mid-print tray switch
- tray_change_log: list = field(default_factory=list)
- # Firmware version info (from info.module[name="ota"].sw_ver)
- firmware_version: str | None = None
- # Developer LAN mode: parsed from MQTT "fun" field bit 0x20000000
- # True = dev mode ON (no encryption), False = dev mode OFF (encryption required), None = unknown
- developer_mode: bool | None = None
- # Stage name mapping from BambuStudio DeviceManager.cpp
- STAGE_NAMES = {
- 0: "Printing",
- 1: "Auto bed leveling",
- 2: "Heatbed preheating",
- 3: "Vibration compensation",
- 4: "Changing filament",
- 5: "M400 pause",
- 6: "Paused (filament ran out)",
- 7: "Heating nozzle",
- 8: "Calibrating dynamic flow",
- 9: "Scanning bed surface",
- 10: "Inspecting first layer",
- 11: "Identifying build plate type",
- 12: "Calibrating Micro Lidar",
- 13: "Homing toolhead",
- 14: "Cleaning nozzle tip",
- 15: "Checking extruder temperature",
- 16: "Paused by the user",
- 17: "Pause (front cover fall off)",
- 18: "Calibrating the micro lidar",
- 19: "Calibrating flow ratio",
- 20: "Pause (nozzle temperature malfunction)",
- 21: "Pause (heatbed temperature malfunction)",
- 22: "Filament unloading",
- 23: "Pause (step loss)",
- 24: "Filament loading",
- 25: "Motor noise cancellation",
- 26: "Pause (AMS offline)",
- 27: "Pause (low speed of the heatbreak fan)",
- 28: "Pause (chamber temperature control problem)",
- 29: "Cooling chamber",
- 30: "Pause (Gcode inserted by user)",
- 31: "Motor noise showoff",
- 32: "Pause (nozzle clumping)",
- 33: "Pause (cutter error)",
- 34: "Pause (first layer error)",
- 35: "Pause (nozzle clog)",
- 36: "Measuring motion precision",
- 37: "Enhancing motion precision",
- 38: "Measure motion accuracy",
- 39: "Nozzle offset calibration",
- 40: "High temperature auto bed leveling",
- 41: "Auto Check: Quick Release Lever",
- 42: "Auto Check: Door and Upper Cover",
- 43: "Laser Calibration",
- 44: "Auto Check: Platform",
- 45: "Confirming BirdsEye Camera location",
- 46: "Calibrating BirdsEye Camera",
- 47: "Auto bed leveling - phase 1",
- 48: "Auto bed leveling - phase 2",
- 49: "Heating chamber",
- 50: "Cooling heatbed",
- 51: "Printing calibration lines",
- 52: "Auto Check: Material",
- 53: "Live View Camera Calibration",
- 54: "Waiting for heatbed temperature",
- 55: "Auto Check: Material Position",
- 56: "Cutting Module Offset Calibration",
- 57: "Measuring Surface",
- 58: "Thermal Preconditioning",
- 59: "Homing Blade Holder",
- 60: "Calibrating Camera Offset",
- 61: "Calibrating Blade Holder Position",
- 62: "Hotend Pick and Place Test",
- 63: "Waiting for Chamber temperature",
- 64: "Preparing Hotend",
- 65: "Calibrating nozzle clumping detection",
- 66: "Purifying the chamber air",
- 74: "Preparing", # Seen on H2D during print preparation
- 77: "Preparing AMS",
- }
- def get_stage_name(stage: int) -> str:
- """Get human-readable stage name from stage number."""
- return STAGE_NAMES.get(stage, f"Unknown stage ({stage})")
- class BambuMQTTClient:
- """MQTT client for Bambu Lab printer communication."""
- MQTT_PORT = 8883
- # Class-level cache: serial_number -> False when request topic is known unsupported.
- # Persists across client instances so reconnects don't re-trigger failed subscriptions.
- _request_topic_cache: dict[str, bool] = {}
- # Counter for generating unique MQTT client IDs across instances.
- _client_instance_counter: int = 0
- def __init__(
- self,
- ip_address: str,
- serial_number: str,
- access_code: str,
- model: str | None = None,
- on_state_change: Callable[[PrinterState], None] | None = None,
- on_print_start: Callable[[dict], None] | None = None,
- on_print_complete: Callable[[dict], None] | None = None,
- on_ams_change: Callable[[list], None] | None = None,
- on_layer_change: Callable[[int], None] | None = None,
- on_bed_temp_update: Callable[[float], None] | None = None,
- on_drying_complete: Callable[[int], None] | None = None,
- on_print_running_observed: Callable[[dict], None] | None = None,
- ):
- self.ip_address = ip_address
- self.serial_number = serial_number
- self.access_code = access_code
- self.model = model
- self.on_state_change = on_state_change
- self.on_print_start = on_print_start
- self.on_print_complete = on_print_complete
- self.on_ams_change = on_ams_change
- self.on_layer_change = on_layer_change
- self.on_bed_temp_update = on_bed_temp_update
- # #1349: fired when an AMS unit's dry_time falls from >0 to 0 — i.e.
- # the drying cycle just finished (auto- or manually-triggered).
- # Receives the AMS id of the unit that finished drying.
- self.on_drying_complete = on_drying_complete
- # #1485 follow-up: fired the first time we see RUNNING state in a
- # session WHEN on_print_start was suppressed (Bambuddy started mid-
- # print, the #1304 first-push guard skipped the start event). Lets
- # main.py capture a fresh timelapse baseline at restart-recovery
- # time so the completion-time snapshot-diff still works. Receives
- # the same shape as on_print_start (filename / subtask_name /
- # remaining_time / raw_data / ams_mapping).
- self.on_print_running_observed = on_print_running_observed
- # Per-AMS previous dry_time, used to detect the falling edge above.
- # Seeded lazily as we observe each AMS unit.
- self._previous_dry_times: dict[int, int] = {}
- self.state = PrinterState()
- self._client: mqtt.Client | None = None
- self._loop: asyncio.AbstractEventLoop | None = None
- self._previous_gcode_state: str | None = None
- self._previous_gcode_file: str | None = None
- self._was_running: bool = False # Track if we've seen RUNNING state for current print
- self._completion_triggered: bool = False # Prevent duplicate completion triggers
- self._timelapse_during_print: bool = False # Track if timelapse was active during this print
- self._last_valid_progress: float = 0.0 # Last non-zero progress (firmware resets on cancel)
- self._last_valid_layer_num: int = 0 # Last non-zero layer (firmware resets on cancel)
- # The subtask_id minted for the most recent start_print() command. The
- # printer echoes it back in status, but often not within the first few
- # seconds — so on_print_start uses this as the id source when the
- # printer hasn't reported it yet, letting queue/scheduled archives
- # persist a restart-stable id from the moment they dispatch (#1485).
- self.last_dispatch_subtask_id: str | None = None
- self._is_dual_nozzle: bool = False # Set when device.extruder.info has >= 2 entries
- self._message_log: deque[MQTTLogEntry] = deque(maxlen=100)
- self._logging_enabled: bool = False
- self._last_message_time: float = 0.0 # Track when we last received a message
- # Count of report-topic messages received since the last (re)connect.
- # Lets check_staleness() distinguish "printer never sent a status
- # report" (typically a wrong / mis-cased serial) from a normal quiet
- # gap mid-session. _zero_report_hint_logged keeps the actionable hint
- # to once per client lifetime so the stale loop doesn't spam it (#1465).
- self._report_messages_since_connect: int = 0
- self._zero_report_hint_logged: bool = False
- # Raw-message fan-out for VP MQTT bridge (non-proxy modes republish the
- # printer's pushes verbatim to slicers connected to a virtual printer).
- # Handlers receive (topic, payload_bytes) before JSON parsing.
- self._raw_message_handlers: list[Callable[[str, bytes], None]] = []
- self._disconnection_event: threading.Event | None = None
- self._previous_ams_hash: str | None = None # Track AMS changes
- # Cache AMS firmware/SN from get_version in case it arrives before AMS status
- # Key: ams_id (int). Value: {'sw_ver': str, 'sn': str}
- self._ams_version_cache: dict[int, dict[str, str]] = {}
- # Track which (ams_id, field) warnings have already been emitted this connection
- # so that missing-serial / missing-firmware warnings fire only once per connection.
- self._ams_version_warned: set[tuple[int | str, str]] = set()
- # K-profile command tracking
- self._sequence_id: int = 0
- self._pending_kprofile_response: asyncio.Event | None = None
- self._kprofile_response_data: list | None = None
- # Xcam hold timers - OrcaSlicer pattern: ignore incoming data for 3 seconds after command
- # Key: module_name, Value: timestamp when command was sent
- self._xcam_hold_start: dict[str, float] = {}
- self._xcam_hold_time: float = 3.0 # Ignore incoming data for 3 seconds after command
- # Track last requested tray ID for H2D dual-nozzle printers
- # H2D only reports slot number (0-3) in tray_now, not global tray ID
- # We use our tracked value to resolve the correct global ID
- self._last_load_tray_id: int | None = None
- # Captured ams_mapping from print commands on the request topic
- # Intercepts slicer/Bambuddy print commands to get the slot-to-tray mapping
- self._captured_ams_mapping: list[int] | None = None
- # Request topic subscription tracking
- # Some printer MQTT brokers (e.g. P1S, A1) reject subscriptions to the request
- # topic by killing the TCP connection. We detect this and gracefully degrade.
- # Check class-level cache first so new client instances don't retry known-bad subscriptions.
- self._request_topic_supported: bool = BambuMQTTClient._request_topic_cache.get(self.serial_number, True)
- self._request_topic_sub_mid: int | None = None
- self._request_topic_sub_time: float = 0.0
- self._request_topic_confirmed: bool = False
- # Developer mode probe: when the "fun" field is absent (A1/P1 printers),
- # we probe by sending an ams_filament_setting and checking the response.
- # "mqtt message verify failed" → dev mode OFF, success → dev mode ON.
- self._dev_mode_probed: bool = False
- self._dev_mode_needs_probe: bool = False # True after seeing a pushall without "fun"
- self._dev_mode_probe_seq: str | None = None
- self._dev_mode_probe_time: float = 0.0 # monotonic timestamp when probe was sent
- self._dev_mode_probe_failures: int = 0 # consecutive unanswered probes
- self._connect_time: float = 0.0 # monotonic timestamp of last _on_connect
- # Set when check_staleness() force-closes the socket to trigger reconnect.
- # Prevents _on_disconnect from redundantly broadcasting state (already done).
- self._stale_reconnecting: bool = False
- # Timestamp of last stale reconnect — prevents rapid-fire socket closes
- # when the frontend polls status faster than paho can reconnect.
- self._last_stale_reconnect: float = 0.0
- # Zombie session detection via ams_filament_setting response tracking (#887).
- # The dev-mode probe only runs on first connect; this catches zombie sessions
- # that develop later (telemetry flows but publishes silently fail).
- self._last_ams_cmd_time: float = 0.0 # monotonic time of last published command
- self._ams_cmd_unanswered: int = 0 # consecutive commands with no response
- @property
- def topic_subscribe(self) -> str:
- return f"device/{self.serial_number}/report"
- @property
- def topic_publish(self) -> str:
- return f"device/{self.serial_number}/request"
- # Maximum time (seconds) without a message before considering connection stale
- STALE_TIMEOUT = 60.0
- def is_stale(self) -> bool:
- """Check if the connection is stale (no messages for too long)."""
- if self._last_message_time == 0:
- return False # Never received a message yet
- time_since_last = time.time() - self._last_message_time
- return time_since_last > self.STALE_TIMEOUT
- # Minimum seconds between stale reconnect attempts. Frontend polls
- # status every few seconds — without a cooldown, each poll would
- # force-close the socket before paho has time to reconnect.
- STALE_RECONNECT_COOLDOWN = 30.0
- def check_staleness(self) -> bool:
- """Check staleness and update connected state if stale. Returns True if connected."""
- if self.state.connected and self.is_stale():
- # Don't force-close again if we already did recently — give paho
- # time to reconnect and the printer time to send its first message.
- now = time.time()
- if now - self._last_stale_reconnect < self.STALE_RECONNECT_COOLDOWN:
- return self.state.connected
- logger.warning(
- f"[{self.serial_number}] Connection stale - no message for {now - self._last_message_time:.1f}s, forcing reconnect"
- )
- # A connection that keeps going stale without ever receiving a
- # status report is almost always a wrong or mis-cased serial
- # number — the broker accepts the connection and the subscription
- # regardless, but the printer publishes to device/<real-serial>/
- # report, which is case-sensitive. Surface that once so the user
- # has something actionable instead of an endless reconnect loop.
- if self._report_messages_since_connect == 0 and not self._zero_report_hint_logged:
- self._zero_report_hint_logged = True
- logger.warning(
- "[%s] Connected and subscribed, but the printer has sent zero "
- "status reports. The most common cause is a wrong or mis-cased "
- "serial number — the device/<serial>/report MQTT topic is "
- "case-sensitive. Verify the serial number configured in Bambuddy "
- "exactly matches the printer.",
- self.serial_number,
- )
- self._last_stale_reconnect = now
- self.state.connected = False
- if self.on_state_change:
- self.on_state_change(self.state)
- # Route based on caller thread — see force_reconnect_stale_session.
- # check_staleness is normally called from FastAPI handlers (async,
- # gets the hard-reset path) but the dispatcher exists for safety.
- self._stale_reconnecting = True
- self._reset_client_for_reconnect()
- return self.state.connected
- def force_reconnect_stale_session(self, reason: str) -> None:
- # Heals the #887/#936/#1136 half-broken session: telemetry keeps
- # arriving but our publishes don't reach the printer.
- #
- # Two routing paths:
- #
- # Async-context callers (background_dispatch.py:993 — dispatch deadline)
- # → full client teardown + fresh client_id. Wipes paho's client-side
- # QoS 1 queue, which is exactly the #1136 reproducer: an unacked
- # `project_file` from the broken session would otherwise replay on
- # reconnect, mixing stale commands into the next dispatch and
- # triggering 0500_4003 SD R/W on the printer.
- #
- # Paho-network-thread callers (line ~2604/~2623 — dev-mode probe and
- # ams_filament_setting zombie detection inside `_update_state`)
- # → socket-close fallback. Calling `loop_stop()` from inside the
- # network thread would self-join and deadlock; the safe pattern is
- # to close the socket and let paho's own loop detect the broken
- # connection and auto-reconnect (same instance, same client_id —
- # queue replay is theoretically possible here but those paths have
- # always done socket-close and #1136 was specifically triggered
- # from the dispatch path).
- logger.warning("[%s] Forcing MQTT reconnect: %s", self.serial_number, reason)
- self._stale_reconnecting = True
- self.state.connected = False
- if self.on_state_change:
- self.on_state_change(self.state)
- self._reset_client_for_reconnect()
- def _reset_client_for_reconnect(self) -> None:
- """Route between hard-reset and socket-close based on caller thread.
- Hard-reset (preferred) requires we're not running on paho's network
- thread, since `loop_stop()` on the same thread deadlocks. Detect via
- ``asyncio.get_running_loop()`` — paho's callback thread has no loop;
- every legitimate hard-reset caller (FastAPI handlers, background
- async tasks) does."""
- try:
- loop = asyncio.get_running_loop()
- except RuntimeError:
- loop = None
- if loop is not None:
- self._loop = loop
- self._hard_reset_client()
- else:
- self._socket_close_for_reconnect()
- def _hard_reset_client(self) -> None:
- """Tear down the paho client entirely and rebuild it with a fresh
- client_id, so the broker drops the old session and paho's local
- QoS 1 queue is gone. Must NOT be called from paho's network thread.
- Caller is responsible for setting ``_stale_reconnecting`` and
- broadcasting the disconnected state."""
- old_client = self._client
- self._client = None
- if old_client is not None:
- try:
- old_client.disconnect() # MQTT DISCONNECT — broker drops session
- except Exception:
- pass
- try:
- old_client.loop_stop() # blocks briefly until the network thread exits
- except Exception:
- pass
- # Skip reconnect if no asyncio loop is available (test environment or
- # pre-init). The next initial connect() call from PrinterManager will
- # set up the client fresh.
- if self._loop is None:
- return
- try:
- self.connect(loop=self._loop)
- except Exception as e:
- logger.error("[%s] Hard reset reconnect failed: %s", self.serial_number, e)
- def _socket_close_for_reconnect(self) -> None:
- """Close the underlying socket so paho's loop thread detects the
- broken connection and triggers auto-reconnect on the SAME client
- instance. Safe to call from paho's own network thread (the loop
- polls the socket on every iteration and handles a closed socket
- gracefully). Used as a fallback when hard-reset isn't safe; queue
- replay remains theoretically possible here but #1136 specifically
- traced through the dispatch-deadline path which now hard-resets."""
- if self._client:
- try:
- sock = self._client.socket()
- if sock:
- sock.close()
- except Exception:
- pass
- def _on_connect(self, client, userdata, flags, rc, properties=None):
- if rc == 0:
- self.state.connected = True
- self._stale_reconnecting = False # Clear stale-reconnect flag on successful connect
- # Reset per-connection warning state so warnings fire once per (re)connection
- self._ams_version_warned = set()
- # Preserve cached developer_mode across auto-reconnects to avoid
- # re-probing on every reconnect. The probe (ams_filament_setting to
- # ext slot) can destabilize some firmware MQTT brokers, causing a
- # reconnect → probe → disconnect feedback loop (#887). Only probe
- # once when developer_mode is truly unknown (first connect).
- # Reset probe tracking so stale timeout state doesn't carry over.
- self._dev_mode_probed = False
- self._dev_mode_needs_probe = False
- self._dev_mode_probe_seq = None
- self._dev_mode_probe_time = 0.0
- self._dev_mode_probe_failures = 0
- self._connect_time = time.monotonic()
- self._report_messages_since_connect = 0
- self._last_ams_cmd_time = 0.0
- self._ams_cmd_unanswered = 0
- client.subscribe(self.topic_subscribe)
- # Subscribe to request topic for ams_mapping capture (if supported by broker)
- if self._request_topic_supported:
- result, mid = client.subscribe(self.topic_publish)
- if result == mqtt.MQTT_ERR_SUCCESS:
- self._request_topic_sub_mid = mid
- self._request_topic_sub_time = time.time()
- self._request_topic_confirmed = False
- else:
- logger.warning(
- "[%s] Failed to send request topic subscription",
- self.serial_number,
- )
- self._request_topic_supported = False
- BambuMQTTClient._request_topic_cache[self.serial_number] = False
- # Request full status update (includes nozzle info in push_status response)
- self._request_push_all()
- # Request firmware version info
- self._request_version()
- # Note: get_accessories returns stale nozzle data on H2D, so we don't use it.
- # The correct nozzle data comes from push_status.
- # Prime K-profile request (Bambu printers often ignore first request)
- self._prime_kprofile_request()
- # Immediately broadcast connection state change
- if self.on_state_change:
- self.on_state_change(self.state)
- else:
- self.state.connected = False
- def _on_subscribe(self, client, userdata, mid, reason_code_list, properties=None):
- """Handle SUBACK responses to detect request topic subscription rejection."""
- if mid == self._request_topic_sub_mid:
- for rc in reason_code_list:
- if rc.is_failure:
- logger.warning(
- "[%s] Request topic subscription rejected (code=%d: %s). "
- "ams_mapping capture from slicer-initiated prints unavailable.",
- self.serial_number,
- rc.value,
- rc.getName(),
- )
- self._request_topic_supported = False
- BambuMQTTClient._request_topic_cache[self.serial_number] = False
- else:
- logger.info(
- "[%s] Request topic subscription accepted. "
- "ams_mapping capture enabled for slicer-initiated prints.",
- self.serial_number,
- )
- self._request_topic_confirmed = True
- BambuMQTTClient._request_topic_cache[self.serial_number] = True
- self._request_topic_sub_mid = None
- self._request_topic_sub_time = 0.0
- def _on_disconnect(self, client, userdata, disconnect_flags=None, rc=None, properties=None):
- # Always unblock disconnect() callers, regardless of whether we suppress
- # the state broadcast below. disconnect() sets _disconnection_event and
- # waits on it — every callback path must fire it.
- if self._disconnection_event:
- self._disconnection_event.set()
- # If we intentionally closed the socket for stale reconnect, don't broadcast
- # another state change — check_staleness() already set connected=False and
- # notified the UI. Just log and let paho auto-reconnect.
- if self._stale_reconnecting:
- logger.info(
- "[%s] Disconnect callback after stale reconnect (expected), rc=%s",
- self.serial_number,
- rc,
- )
- return
- # Ignore spurious disconnect callbacks if we've received a message recently
- # Paho-mqtt sometimes fires disconnect callbacks while the connection is still active.
- # BUT: never suppress error disconnects (keepalive timeout, connection lost, etc.)
- # — only suppress when rc indicates a clean/normal disconnect.
- is_error_disconnect = rc is not None and hasattr(rc, "is_failure") and rc.is_failure
- time_since_last_message = time.time() - self._last_message_time
- if not is_error_disconnect and time_since_last_message < 10.0 and self._last_message_time > 0:
- logger.debug(
- f"[{self.serial_number}] Ignoring spurious disconnect (last message {time_since_last_message:.1f}s ago)"
- )
- return
- logger.warning("[%s] MQTT disconnected: rc=%s, flags=%s", self.serial_number, rc, disconnect_flags)
- # Detect if request topic subscription caused the disconnect.
- # If we just subscribed and got disconnected before any SUBACK confirmation,
- # the broker likely killed the connection due to the unauthorized subscription.
- if (
- self._request_topic_sub_time > 0
- and not self._request_topic_confirmed
- and time.time() - self._request_topic_sub_time < 10.0
- ):
- logger.warning(
- "[%s] Disconnected shortly after request topic subscription. Disabling request topic for this printer.",
- self.serial_number,
- )
- self._request_topic_supported = False
- BambuMQTTClient._request_topic_cache[self.serial_number] = False
- self._request_topic_sub_mid = None
- self._request_topic_sub_time = 0.0
- self.state.connected = False
- if self.on_state_change:
- self.on_state_change(self.state)
- def _on_message(self, client, userdata, msg):
- for handler in self._raw_message_handlers:
- try:
- handler(msg.topic, msg.payload)
- except Exception:
- logger.exception(
- "[%s] raw-message handler crashed for topic=%s",
- self.serial_number,
- msg.topic,
- )
- try:
- try:
- raw = msg.payload.decode()
- except UnicodeDecodeError:
- # Some firmware versions (e.g. A1 Mini 01.07.02.00) send payloads
- # with non-UTF-8 bytes. Replace invalid bytes to keep JSON parseable.
- raw = msg.payload.decode(errors="replace")
- logger.warning(
- "[%s] MQTT payload contained non-UTF-8 bytes (topic=%s, len=%d)",
- self.serial_number,
- msg.topic,
- len(msg.payload),
- )
- payload = json.loads(raw)
- # Track last message time - receiving a message proves we're connected
- self._last_message_time = time.time()
- self.state.connected = True
- # Intercept request-topic messages (print commands from slicer/Bambuddy)
- if msg.topic == self.topic_publish:
- self._handle_request_message(payload)
- return
- # Count status reports per connection so check_staleness() can tell
- # "printer never sent a report" apart from a mid-session quiet gap.
- if msg.topic == self.topic_subscribe:
- self._report_messages_since_connect += 1
- # Log message if logging is enabled
- if self._logging_enabled:
- self._message_log.append(
- MQTTLogEntry(
- timestamp=datetime.now(timezone.utc).isoformat(),
- topic=msg.topic,
- direction="in",
- payload=payload,
- )
- )
- self._process_message(payload)
- except json.JSONDecodeError:
- pass # Ignore non-JSON MQTT messages (e.g. binary or malformed payloads)
- def _handle_request_message(self, data: dict) -> None:
- """Intercept print commands on the request topic to capture ams_mapping."""
- print_data = data.get("print", {})
- if not isinstance(print_data, dict):
- return
- command = print_data.get("command", "")
- if command == "project_file":
- if "ams_mapping" in print_data:
- self._captured_ams_mapping = print_data["ams_mapping"]
- logger.info(
- "[%s] Captured ams_mapping from print command: %s",
- self.serial_number,
- self._captured_ams_mapping,
- )
- # Diagnostic for #1162 follow-up (X2D + FTS routing): when a
- # slicer-launched project_file passes through the request topic,
- # log the full payload so we can diff Studio's field set against
- # ours. We pin our own sequence_id to "20000" (line ~3195), so
- # any other value means the command came from Studio/Orca, not
- # from us.
- if print_data.get("sequence_id") != "20000":
- logger.info(
- "[%s] External project_file payload: %s",
- self.serial_number,
- json.dumps(print_data),
- )
- def _process_message(self, payload: dict):
- """Process incoming MQTT message from printer."""
- # Handle top-level AMS data (comes outside of "print" key)
- # Wrap in try/except to prevent breaking the MQTT connection
- if "ams" in payload:
- try:
- self._handle_ams_data(payload["ams"])
- except Exception as e:
- logger.error("[%s] Error handling AMS data: %s", self.serial_number, e)
- # Handle xcam data (camera settings and AI detection) at top level
- if "xcam" in payload:
- xcam_data = payload["xcam"]
- logger.debug("[%s] Received xcam data at top level: %s", self.serial_number, xcam_data)
- self._parse_xcam_data(xcam_data)
- # Fire state change callback for top-level xcam (not nested in "print")
- if "print" not in payload and self.on_state_change:
- self.on_state_change(self.state)
- # Handle system responses (accessories info, etc.)
- if "system" in payload:
- system_data = payload["system"]
- logger.debug("[%s] Received system data: %s", self.serial_number, system_data)
- self._handle_system_response(system_data)
- # Handle info responses (firmware version info from get_version command)
- if "info" in payload:
- info_data = payload["info"]
- if isinstance(info_data, dict) and info_data.get("command") == "get_version":
- self._handle_version_info(info_data)
- # Parse WiFi signal at top level (some printers send it here)
- if "wifi_signal" in payload:
- wifi_signal = payload["wifi_signal"]
- if isinstance(wifi_signal, (int, float)):
- self.state.wifi_signal = int(wifi_signal)
- elif isinstance(wifi_signal, str):
- try:
- self.state.wifi_signal = int(wifi_signal.replace("dBm", "").strip())
- except ValueError:
- pass # Ignore unparseable wifi_signal strings; field is non-critical
- # Detect ethernet: wifi_signal == -90 is a sentinel for "WiFi disabled/ethernet"
- from backend.app.utils.printer_models import has_ethernet
- if has_ethernet(self.model):
- self.state.wired_network = self.state.wifi_signal == -90
- # Parse developer LAN mode from top-level "fun" field
- # Some firmware versions send "fun" at the top level, others inside "print"
- if "fun" in payload:
- try:
- fun_val = payload["fun"]
- fun_int = fun_val if isinstance(fun_val, int) else int(fun_val, 16)
- self.state.developer_mode = (fun_int & 0x20000000) == 0
- except (ValueError, TypeError):
- pass
- if "print" in payload:
- print_data = payload["print"]
- # Check if xcam is nested inside print data
- if "xcam" in print_data:
- logger.debug("[%s] Found xcam inside print data: %s", self.serial_number, print_data["xcam"])
- self._parse_xcam_data(print_data["xcam"])
- # Log when we see gcode_state changes
- if "gcode_state" in print_data:
- logger.debug(
- f"[{self.serial_number}] Received gcode_state: {print_data.get('gcode_state')}, "
- f"gcode_file: {print_data.get('gcode_file')}, subtask_name: {print_data.get('subtask_name')}"
- )
- # Detect dual-nozzle BEFORE processing AMS data (tray_now disambiguation needs it)
- # device.extruder.info with >= 2 entries only exists on dual-nozzle printers (H2D, H2D Pro)
- if not self._is_dual_nozzle and "device" in print_data:
- dev = print_data.get("device")
- if isinstance(dev, dict):
- ext_info = dev.get("extruder", {}).get("info", [])
- if isinstance(ext_info, list) and len(ext_info) >= 2:
- self._is_dual_nozzle = True
- logger.info("[%s] Detected dual-nozzle printer from device.extruder.info", self.serial_number)
- # Handle AMS data that comes inside print key
- if "ams" in print_data:
- try:
- self._handle_ams_data(print_data["ams"])
- except Exception as e:
- logger.error("[%s] Error handling AMS data from print: %s", self.serial_number, e)
- # Handle vir_slot (H2-series external spool data) — list of external trays
- # Process vir_slot FIRST so it takes priority over vt_tray
- if "vir_slot" in print_data:
- vir_slot = print_data["vir_slot"]
- if isinstance(vir_slot, list) and vir_slot:
- # Fix: single-nozzle printers (X1C, P1S, A1) report their single
- # external slot with id=255 in vir_slot, but tray_now=254 when active.
- # Remap id=255→254 for single-slot printers so active detection works.
- # Dual-nozzle (H2D) has 2 slots: id=254 (Ext-L) and id=255 (Ext-R).
- if len(vir_slot) == 1 and str(vir_slot[0].get("id", "")) == "255":
- vir_slot[0]["id"] = "254"
- self.state.raw_data["vt_tray"] = vir_slot
- # Handle vt_tray (virtual tray / external spool) data
- # Only use vt_tray if vir_slot is NOT in this message AND we don't already
- # have vir_slot data (H2-series sends vt_tray as a single active spool dict
- # which would overwrite the correct multi-slot vir_slot data)
- if "vt_tray" in print_data and "vir_slot" not in print_data:
- vt_tray = print_data["vt_tray"]
- existing = self.state.raw_data.get("vt_tray")
- # Don't let a single-spool vt_tray dict overwrite multi-slot vir_slot data
- if isinstance(vt_tray, dict) and isinstance(existing, list) and len(existing) > 1:
- pass # Keep the vir_slot data
- else:
- if isinstance(vt_tray, dict):
- vt_tray = [vt_tray]
- self.state.raw_data["vt_tray"] = vt_tray
- # Parse ams_status directly from print data (NOT from print.ams)
- # ams_status is a combined value: lower 8 bits = sub status, bits 8-15 = main status
- # Main status: 0=idle, 1=filament_change, 2=rfid_identifying, 3=assist, 4=calibration
- # Sub status (when main=1): 2=heating, 3=AMS feeding, 4=retract, 6=push, 7=purge
- if "ams_status" in print_data:
- raw_ams_status = print_data["ams_status"]
- if isinstance(raw_ams_status, str):
- try:
- self.state.ams_status = int(raw_ams_status)
- except ValueError:
- self.state.ams_status = 0
- else:
- self.state.ams_status = raw_ams_status if raw_ams_status is not None else 0
- # Compute main and sub status
- self.state.ams_status_sub = self.state.ams_status & 0xFF
- self.state.ams_status_main = (self.state.ams_status >> 8) & 0xFF
- # Log when ams_status changes (for filament change tracking debug)
- logger.debug(
- f"[{self.serial_number}] ams_status: {self.state.ams_status} "
- f"(main={self.state.ams_status_main}, sub={self.state.ams_status_sub})"
- )
- # Check for command responses
- if "command" in print_data:
- cmd = print_data.get("command")
- logger.debug("[%s] Received command response: %s", self.serial_number, cmd)
- if cmd in ("extrusion_cali_sel", "extrusion_cali_set", "extrusion_cali_del", "ams_filament_setting"):
- logger.debug("[%s] %s response: %s", self.serial_number, cmd, print_data)
- # AMS drying responses are rare (user-initiated only) and the
- # full payload — including `result` and any `reason` code —
- # is the only way to diagnose silent rejections like #1447.
- # INFO level so the body lands in support bundles by default.
- elif cmd == "ams_filament_drying":
- logger.info("[%s] ams_filament_drying response: %s", self.serial_number, print_data)
- # Check for developer mode probe response
- if (
- cmd == "ams_filament_setting"
- and self._dev_mode_probe_seq is not None
- and print_data.get("sequence_id") == self._dev_mode_probe_seq
- ):
- self._handle_dev_mode_probe_response(print_data)
- # Track user-initiated ams_filament_setting responses (#887
- # zombie detection). Reset both the timer AND the unanswered
- # counter on ANY response — the response proves the channel is
- # alive, so the counter must not stay armed even when the
- # watchdog already zeroed `_last_ams_cmd_time` on a previous
- # tick. The original `and self._last_ams_cmd_time > 0` guard
- # caused #1164: one sluggish response (>10s) would set the
- # counter to 1 and zero the timer; the late response arrived
- # but was ignored by this branch (timer is 0); the counter
- # stayed at 1 indefinitely; the very next slow response —
- # possibly hours later, on a totally unrelated command — would
- # take it to 2 and force-reconnect, surfacing as "filament
- # config doesn't reach the printer ~6 changes in".
- elif cmd == "ams_filament_setting":
- self._last_ams_cmd_time = 0.0
- self._ams_cmd_unanswered = 0
- if "command" in print_data and print_data.get("command") == "extrusion_cali_get":
- self._handle_kprofile_response(print_data)
- self._update_state(print_data)
- def _handle_system_response(self, data: dict):
- """Handle system responses including accessories info.
- Note: get_accessories returns stale/incorrect nozzle_type data on H2D.
- The correct nozzle data comes from push_status, so we don't update
- nozzle type/diameter from get_accessories. We just log the response
- for debugging purposes.
- """
- command = data.get("command")
- if command == "get_accessories":
- # Log response for debugging - but DON'T use it to update nozzle data
- # because it returns stale values (e.g., 'stainless_steel' when the
- # actual nozzle is 'HH01' hardened steel high-flow)
- logger.debug("[%s] Accessories response (not used for nozzle data): %s", self.serial_number, data)
- def _handle_version_info(self, data: dict):
- """Handle version info response from get_version command.
- Parses firmware version from the 'ota' module in the module list.
- Also extracts AMS unit firmware versions from AMS modules and stores
- them on the corresponding AMS unit in raw_data so the status route can
- expose them to the frontend.
- AMS module naming conventions (numeric suffix is the AMS unit ID):
- - ``ams/<id>`` – original AMS
- - ``n3f/<id>`` – AMS 2 Pro (H2D Pro and similar)
- - ``n3s/<id>`` – AMS HT (H2D Pro and similar)
- Message format:
- {
- "command": "get_version",
- "module": [
- {"name": "ota", "sw_ver": "01.08.05.00"},
- {"name": "rv1126", "sw_ver": "00.00.14.74"},
- {"name": "ams/0", "sw_ver": "00.00.06.96", "sn": "ABC123"},
- {"name": "n3f/0", "sw_ver": "03.00.21.29", "sn": "19C06A552504488"},
- {"name": "n3s/128", "sw_ver": "03.00.21.29", "sn": "19F06A561801096"},
- ...
- ]
- }
- """
- modules = data.get("module", [])
- if not isinstance(modules, list):
- return
- state_changed = False
- for module in modules:
- if not isinstance(module, dict):
- continue
- if module.get("name") == "ota":
- version = module.get("sw_ver")
- if version:
- old_version = self.state.firmware_version
- self.state.firmware_version = version
- if old_version != version:
- logger.info("[%s] Firmware version: %s", self.serial_number, version)
- state_changed = True
- break
- # Extract AMS unit firmware versions from AMS modules.
- # See module-level _AMS_MODULE_PREFIXES for supported naming conventions.
- # Always cache regardless of whether AMS data has arrived yet — get_version
- # often arrives before the first push_status, so caching must be unconditional.
- ams_raw = self.state.raw_data.get("ams")
- for module in modules:
- if not isinstance(module, dict):
- continue
- name = module.get("name", "")
- if not any(name.startswith(prefix) for prefix in _AMS_MODULE_PREFIXES):
- continue
- try:
- ams_id = int(name.split("/", 1)[1])
- except (ValueError, IndexError):
- continue
- sw_ver = module.get("sw_ver", "")
- sn = module.get("sn", "")
- # Extract module type from prefix (e.g. "ams/0" → "ams", "n3f/0" → "n3f")
- module_type = name.split("/", 1)[0]
- # Always cache so _apply_ams_version_cache can apply it when AMS data arrives
- if sw_ver or sn or module_type:
- self._ams_version_cache[ams_id] = {"sw_ver": sw_ver, "sn": sn, "module_type": module_type}
- state_changed = True
- # Also directly update any AMS unit already present in raw_data
- if ams_raw and isinstance(ams_raw, list):
- for ams_unit in ams_raw:
- if not isinstance(ams_unit, dict):
- continue
- try:
- unit_id = int(ams_unit.get("id")) if ams_unit.get("id") is not None else None
- except (ValueError, TypeError):
- unit_id = None
- if unit_id == ams_id:
- if sw_ver:
- ams_unit["sw_ver"] = sw_ver
- logger.debug("[%s] AMS %s firmware: %s", self.serial_number, ams_id, sw_ver)
- # Only set sn from version info if not already present in AMS data
- if sn and not ams_unit.get("sn"):
- ams_unit["sn"] = sn
- if module_type:
- ams_unit["module_type"] = module_type
- break
- # Trigger state change callback AFTER both loops so AMS sn/sw_ver are
- # included in the broadcast (not just the printer firmware version).
- if state_changed and self.on_state_change:
- self.on_state_change(self.state)
- # Warn if any AMS unit is still missing serial number or firmware version
- # after processing the version info response. Warn only once per connection
- # to avoid repeated noise on older firmware that doesn't report these fields.
- if ams_raw and isinstance(ams_raw, list):
- for ams_unit in ams_raw:
- if not isinstance(ams_unit, dict):
- continue
- ams_id = ams_unit.get("id", "?")
- if not ams_unit.get("sn") and not ams_unit.get("serial_number"):
- key = (ams_id, "sn")
- if key not in self._ams_version_warned:
- self._ams_version_warned.add(key)
- logger.warning(
- "[%s] AMS unit %s: serial number not available in version info",
- self.serial_number,
- ams_id,
- )
- if not ams_unit.get("sw_ver"):
- key = (ams_id, "sw_ver")
- if key not in self._ams_version_warned:
- self._ams_version_warned.add(key)
- logger.warning(
- "[%s] AMS unit %s: firmware version not available in version info",
- self.serial_number,
- ams_id,
- )
- def _apply_ams_version_cache(self, ams_list: list) -> None:
- """Apply cached AMS firmware/SN (from get_version) onto an AMS list in-place.
- get_version may arrive before pushall/AMS status, and AMS unit IDs may be
- strings in MQTT payloads. This helper normalizes IDs and fills missing
- sw_ver/sn fields without overwriting values already present.
- """
- if not ams_list or not isinstance(ams_list, list):
- return
- cache = self._ams_version_cache
- if not cache:
- return
- for unit in ams_list:
- if not isinstance(unit, dict):
- continue
- raw_id = unit.get("id")
- try:
- unit_id = int(raw_id) if raw_id is not None else None
- except (ValueError, TypeError):
- unit_id = None
- if unit_id is None:
- continue
- cached = cache.get(unit_id)
- if not cached:
- continue
- sw_ver = cached.get("sw_ver") or ""
- sn = cached.get("sn") or ""
- if sw_ver and not unit.get("sw_ver"):
- unit["sw_ver"] = sw_ver
- # Only set sn if not already present in AMS data
- if sn and not unit.get("sn") and not unit.get("serial_number"):
- unit["sn"] = sn
- module_type = cached.get("module_type") or ""
- if module_type and not unit.get("module_type"):
- unit["module_type"] = module_type
- def _parse_xcam_data(self, xcam_data):
- """Parse xcam data for camera settings and AI detection options."""
- if not isinstance(xcam_data, dict):
- return
- current_time = time.time()
- # Helper to check if we should accept incoming value for a module
- # OrcaSlicer pattern: simple hold timer, ignore ALL data for 3 seconds after command
- def should_accept_value(module_name: str, incoming_value: bool) -> bool:
- """Check if we should accept an incoming xcam value.
- OrcaSlicer pattern: After sending a command, ignore incoming data
- for 3 seconds. After that, accept whatever the printer sends.
- """
- if module_name not in self._xcam_hold_start:
- return True # No hold timer, accept incoming
- hold_start = self._xcam_hold_start[module_name]
- elapsed = current_time - hold_start
- if elapsed > self._xcam_hold_time:
- # Hold timer expired - accept incoming and clear hold
- del self._xcam_hold_start[module_name]
- logger.debug("[%s] Hold expired for %s, accepting %s", self.serial_number, module_name, incoming_value)
- return True
- # Within hold period - ignore incoming data
- logger.debug(
- f"[{self.serial_number}] Ignoring {module_name}={incoming_value} "
- f"(hold active, {elapsed:.1f}s < {self._xcam_hold_time}s)"
- )
- return False
- # Log all xcam fields for debugging
- logger.debug("[%s] Parsing xcam data - all fields: %s", self.serial_number, list(xcam_data.keys()))
- # The cfg bitmask contains the ACTUAL detector states - the individual boolean
- # fields (spaghetti_detector, etc.) are often stale/cached.
- # CFG bitmask structure (each detector uses 3 bits: [sens_low, sens_high, enabled]):
- # - Bits 5-7: spaghetti_detector (sens in 5-6, enabled in 7)
- # - Bits 8-10: pileup_detector (sens in 8-9, enabled in 10)
- # - Bits 11-13: clump_detector/nozzle_clumping (sens in 11-12, enabled in 13)
- # - Bits 14-16: airprint_detector (sens in 14-15, enabled in 16)
- # Sensitivity values: 0=low, 1=medium, 2=high
- if "cfg" in xcam_data:
- cfg = xcam_data["cfg"]
- logger.debug("[%s] xcam cfg bitmask: %s (binary: %s)", self.serial_number, cfg, bin(cfg))
- def decode_detector(start_bit):
- """Decode a detector from cfg: returns (enabled, sensitivity_str)"""
- sens_bits = (cfg >> start_bit) & 0x3
- enabled = bool((cfg >> (start_bit + 2)) & 1)
- sensitivity = {0: "low", 1: "medium", 2: "high"}.get(sens_bits, "medium")
- return enabled, sensitivity
- # Spaghetti detector (bits 5-7)
- cfg_spaghetti, cfg_sensitivity = decode_detector(5)
- if should_accept_value("spaghetti_detector", cfg_spaghetti):
- old_value = self.state.print_options.spaghetti_detector
- if cfg_spaghetti != old_value:
- logger.debug(
- f"[{self.serial_number}] spaghetti_detector changed (from cfg): {old_value} -> {cfg_spaghetti}"
- )
- self.state.print_options.spaghetti_detector = cfg_spaghetti
- # Check hold timer for sensitivity before accepting
- if "halt_print_sensitivity" not in self._xcam_hold_start:
- if cfg_sensitivity != self.state.print_options.halt_print_sensitivity:
- logger.debug(
- f"[{self.serial_number}] Sensitivity changed (from cfg): "
- f"{self.state.print_options.halt_print_sensitivity} -> {cfg_sensitivity}"
- )
- self.state.print_options.halt_print_sensitivity = cfg_sensitivity
- else:
- hold_start = self._xcam_hold_start["halt_print_sensitivity"]
- elapsed = current_time - hold_start
- if elapsed <= self._xcam_hold_time:
- logger.debug(
- f"[{self.serial_number}] Ignoring cfg sensitivity={cfg_sensitivity} "
- f"(hold active, {elapsed:.1f}s < {self._xcam_hold_time}s)"
- )
- else:
- # Hold expired - accept from cfg
- if cfg_sensitivity != self.state.print_options.halt_print_sensitivity:
- logger.debug(
- f"[{self.serial_number}] Sensitivity synced (from cfg after hold): "
- f"{self.state.print_options.halt_print_sensitivity} -> {cfg_sensitivity}"
- )
- self.state.print_options.halt_print_sensitivity = cfg_sensitivity
- del self._xcam_hold_start["halt_print_sensitivity"]
- # Pileup detector (bits 8-10)
- cfg_pileup, cfg_pileup_sens = decode_detector(8)
- if should_accept_value("pileup_detector", cfg_pileup):
- if cfg_pileup != self.state.print_options.pileup_detector:
- logger.debug(
- f"[{self.serial_number}] pileup_detector changed (from cfg): {self.state.print_options.pileup_detector} -> {cfg_pileup}"
- )
- self.state.print_options.pileup_detector = cfg_pileup
- # Pileup sensitivity with hold timer
- if "pileup_sensitivity" not in self._xcam_hold_start:
- if cfg_pileup_sens != self.state.print_options.pileup_sensitivity:
- logger.debug(
- f"[{self.serial_number}] pileup_sensitivity changed (from cfg): {self.state.print_options.pileup_sensitivity} -> {cfg_pileup_sens}"
- )
- self.state.print_options.pileup_sensitivity = cfg_pileup_sens
- else:
- hold_start = self._xcam_hold_start["pileup_sensitivity"]
- elapsed = current_time - hold_start
- if elapsed > self._xcam_hold_time:
- if cfg_pileup_sens != self.state.print_options.pileup_sensitivity:
- logger.debug(
- f"[{self.serial_number}] pileup_sensitivity synced (from cfg after hold): {self.state.print_options.pileup_sensitivity} -> {cfg_pileup_sens}"
- )
- self.state.print_options.pileup_sensitivity = cfg_pileup_sens
- del self._xcam_hold_start["pileup_sensitivity"]
- # Clump/nozzle clumping detector (bits 11-13)
- cfg_clump, cfg_clump_sens = decode_detector(11)
- if should_accept_value("clump_detector", cfg_clump):
- if cfg_clump != self.state.print_options.nozzle_clumping_detector:
- logger.debug(
- f"[{self.serial_number}] nozzle_clumping_detector changed (from cfg): {self.state.print_options.nozzle_clumping_detector} -> {cfg_clump}"
- )
- self.state.print_options.nozzle_clumping_detector = cfg_clump
- # Clump sensitivity with hold timer
- if "nozzle_clumping_sensitivity" not in self._xcam_hold_start:
- if cfg_clump_sens != self.state.print_options.nozzle_clumping_sensitivity:
- logger.debug(
- f"[{self.serial_number}] nozzle_clumping_sensitivity changed (from cfg): {self.state.print_options.nozzle_clumping_sensitivity} -> {cfg_clump_sens}"
- )
- self.state.print_options.nozzle_clumping_sensitivity = cfg_clump_sens
- else:
- hold_start = self._xcam_hold_start["nozzle_clumping_sensitivity"]
- elapsed = current_time - hold_start
- if elapsed > self._xcam_hold_time:
- if cfg_clump_sens != self.state.print_options.nozzle_clumping_sensitivity:
- logger.debug(
- f"[{self.serial_number}] nozzle_clumping_sensitivity synced (from cfg after hold): {self.state.print_options.nozzle_clumping_sensitivity} -> {cfg_clump_sens}"
- )
- self.state.print_options.nozzle_clumping_sensitivity = cfg_clump_sens
- del self._xcam_hold_start["nozzle_clumping_sensitivity"]
- # Airprint detector (bits 14-16)
- cfg_airprint, cfg_airprint_sens = decode_detector(14)
- if should_accept_value("airprint_detector", cfg_airprint):
- if cfg_airprint != self.state.print_options.airprint_detector:
- logger.debug(
- f"[{self.serial_number}] airprint_detector changed (from cfg): {self.state.print_options.airprint_detector} -> {cfg_airprint}"
- )
- self.state.print_options.airprint_detector = cfg_airprint
- # Airprint sensitivity with hold timer
- if "airprint_sensitivity" not in self._xcam_hold_start:
- if cfg_airprint_sens != self.state.print_options.airprint_sensitivity:
- logger.debug(
- f"[{self.serial_number}] airprint_sensitivity changed (from cfg): {self.state.print_options.airprint_sensitivity} -> {cfg_airprint_sens}"
- )
- self.state.print_options.airprint_sensitivity = cfg_airprint_sens
- else:
- hold_start = self._xcam_hold_start["airprint_sensitivity"]
- elapsed = current_time - hold_start
- if elapsed > self._xcam_hold_time:
- if cfg_airprint_sens != self.state.print_options.airprint_sensitivity:
- logger.debug(
- f"[{self.serial_number}] airprint_sensitivity synced (from cfg after hold): {self.state.print_options.airprint_sensitivity} -> {cfg_airprint_sens}"
- )
- self.state.print_options.airprint_sensitivity = cfg_airprint_sens
- del self._xcam_hold_start["airprint_sensitivity"]
- # Camera settings
- if "ipcam_record" in xcam_data:
- self.state.ipcam = xcam_data.get("ipcam_record") == "enable"
- if "timelapse" in xcam_data:
- self.state.timelapse = xcam_data.get("timelapse") == "enable"
- # Track if timelapse was ever active during this print
- if self.state.timelapse and self._was_running:
- self._timelapse_during_print = True
- # Skip spaghetti_detector boolean field - we read from cfg bitmask above
- if "print_halt" in xcam_data:
- self.state.print_options.print_halt = bool(xcam_data.get("print_halt"))
- # Skip halt_print_sensitivity field - it's always stale ("medium")
- # We read the actual sensitivity from cfg bits 5-6 above
- if "first_layer_inspector" in xcam_data:
- new_value = bool(xcam_data.get("first_layer_inspector"))
- if should_accept_value("first_layer_inspector", new_value):
- self.state.print_options.first_layer_inspector = new_value
- if "printing_monitor" in xcam_data:
- new_value = bool(xcam_data.get("printing_monitor"))
- if should_accept_value("printing_monitor", new_value):
- self.state.print_options.printing_monitor = new_value
- if "buildplate_marker_detector" in xcam_data:
- new_value = bool(xcam_data.get("buildplate_marker_detector"))
- if should_accept_value("buildplate_marker_detector", new_value):
- self.state.print_options.buildplate_marker_detector = new_value
- if "allow_skip_parts" in xcam_data:
- new_value = bool(xcam_data.get("allow_skip_parts"))
- if should_accept_value("allow_skip_parts", new_value):
- self.state.print_options.allow_skip_parts = new_value
- # Additional AI detectors - these are decoded from cfg bitmask above, not from
- # individual boolean fields (which are not sent by the printer)
- # pileup_detector, nozzle_clumping_detector, airprint_detector - from cfg
- # auto_recovery_step_loss and filament_tangle_detect - tracked locally only
- if "auto_recovery_step_loss" in xcam_data:
- self.state.print_options.auto_recovery_step_loss = bool(xcam_data.get("auto_recovery_step_loss"))
- if "filament_tangle_detect" in xcam_data:
- self.state.print_options.filament_tangle_detect = bool(xcam_data.get("filament_tangle_detect"))
- @staticmethod
- def _resolve_local_slot_from_mapping(local_slot: int, mapping_raw: list | None) -> int | None:
- """Resolve a local AMS slot ID to a global tray ID using the MQTT mapping field.
- The MQTT mapping field is an array of snow-encoded values:
- each entry = ams_hw_id * 256 + slot_id (65535 = unmapped).
- Finds entries where the local slot matches, then computes the global tray ID.
- Returns the global ID if exactly one AMS matches, or None if ambiguous/unavailable.
- """
- if not isinstance(mapping_raw, list) or not mapping_raw:
- return None
- candidates: set[int] = set()
- for value in mapping_raw:
- if not isinstance(value, int) or value >= 65535:
- continue
- ams_hw_id = value >> 8
- slot = value & 0xFF
- if 0 <= ams_hw_id <= 3 and (slot & 0x03) == local_slot:
- candidates.add(ams_hw_id * 4 + local_slot)
- elif 128 <= ams_hw_id <= 135 and local_slot == 0:
- candidates.add(ams_hw_id)
- if len(candidates) == 1:
- return candidates.pop()
- return None
- def _handle_ams_data(self, ams_data):
- """Handle AMS data changes for Spoolman integration.
- This is called when we receive top-level AMS data in MQTT messages.
- It detects changes and triggers the callback for Spoolman sync.
- """
- import hashlib
- # Handle nested ams structure: {"ams": {"ams": [...]}} or {"ams": [...]}
- # Also handle P1S partial updates: {"tray_now": ..., "tray_tar": ...} without "ams" key
- ams_list = None
- if isinstance(ams_data, dict):
- if "ams" in ams_data:
- ams_list = ams_data["ams"]
- # Log all AMS dict fields to debug tray_now for H2D dual-nozzle
- non_list_fields = {k: v for k, v in ams_data.items() if k != "ams"}
- if non_list_fields:
- logger.debug("[%s] AMS dict fields: %s", self.serial_number, non_list_fields)
- # IMPORTANT: Parse ams_status FIRST before tray_now, so we have fresh status
- # when checking if we're in filament change mode for tray_now disambiguation
- if "ams_status" in ams_data:
- raw_ams_status = ams_data["ams_status"]
- if isinstance(raw_ams_status, str):
- try:
- self.state.ams_status = int(raw_ams_status)
- except ValueError:
- self.state.ams_status = 0
- else:
- self.state.ams_status = raw_ams_status if raw_ams_status is not None else 0
- # Compute main and sub status
- self.state.ams_status_sub = self.state.ams_status & 0xFF
- self.state.ams_status_main = (self.state.ams_status >> 8) & 0xFF
- logger.debug(
- f"[{self.serial_number}] ams_status: {self.state.ams_status} "
- f"(main={self.state.ams_status_main}, sub={self.state.ams_status_sub})"
- )
- # Parse tray_now from AMS dict - this is the currently loaded tray global ID
- # Note: tray_tar is also available but on H2D it's just slot number (0-3), not global ID
- if "tray_now" in ams_data:
- raw_tray_now = ams_data["tray_now"]
- # Convert string to int if needed
- if isinstance(raw_tray_now, str):
- try:
- parsed_tray_now = int(raw_tray_now)
- except ValueError:
- parsed_tray_now = 255
- else:
- parsed_tray_now = raw_tray_now if raw_tray_now is not None else 255
- # H2D dual-nozzle printers report only slot number (0-3), not global tray ID
- # Use active_extruder + ams_extruder_map to determine which AMS the slot belongs to
- # Single-nozzle printers with multiple AMS (e.g. P2S) also report local slot IDs (#420)
- # — disambiguated below using MQTT mapping field
- ams_map = self.state.ams_extruder_map
- if self._is_dual_nozzle and 0 <= parsed_tray_now <= 3:
- # First, check if we have a pending target that matches this slot
- pending_target = self.state.pending_tray_target
- if pending_target is not None:
- pending_slot = pending_target % 4
- if pending_slot == parsed_tray_now:
- # Slot matches our pending target - use the full global ID
- logger.debug(
- f"[{self.serial_number}] H2D tray_now disambiguation: "
- f"slot {parsed_tray_now} matches pending_tray_target {pending_target} -> using global ID {pending_target}"
- )
- self.state.tray_now = pending_target
- # Clear pending target now that load is confirmed
- self.state.pending_tray_target = None
- else:
- # Slot doesn't match our pending target - something changed, use slot as-is
- logger.warning(
- f"[{self.serial_number}] H2D tray_now: slot {parsed_tray_now} doesn't match "
- f"pending_tray_target {pending_target} (slot {pending_slot}) - using slot as global ID"
- )
- self.state.tray_now = parsed_tray_now
- # Clear pending target since it's stale
- self.state.pending_tray_target = None
- else:
- # No pending target - use h2d_extruder_snow for accurate disambiguation
- # H2D sends snow field in device.extruder.info with AMS ID in high byte
- active_ext = self.state.active_extruder # 0=right, 1=left
- # Best source: use snow value from device.extruder.info if available
- snow_tray = self.state.h2d_extruder_snow.get(active_ext)
- if snow_tray is not None and snow_tray != 255:
- # snow_tray is already normalized to global ID
- # Verify the slot matches what we see in tray_now
- # Regular AMS: slot = global_id % 4; AMS HT (128-135): single slot = 0
- snow_slot = snow_tray % 4 if snow_tray < 128 else (0 if snow_tray <= 135 else -1)
- if snow_slot == parsed_tray_now:
- if self.state.tray_now != snow_tray:
- logger.debug(
- f"[{self.serial_number}] H2D tray_now from snow: "
- f"extruder[{active_ext}] snow={snow_tray} (slot {snow_slot})"
- )
- self.state.tray_now = snow_tray
- else:
- # Slot mismatch - snow field may not have updated yet, trust snow
- logger.debug(
- f"[{self.serial_number}] H2D tray_now: ams.tray_now slot {parsed_tray_now} "
- f"!= snow slot {snow_slot}, using snow value {snow_tray}"
- )
- self.state.tray_now = snow_tray
- else:
- # Fallback: snow not available, use ams_extruder_map (less reliable)
- # Find ALL AMS units on the active extruder
- ams_on_extruder = []
- for ams_id_str, ext_id in ams_map.items():
- if ext_id == active_ext:
- try:
- ams_on_extruder.append(int(ams_id_str))
- except ValueError:
- pass # Skip AMS IDs that aren't valid integers
- if len(ams_on_extruder) == 1:
- # Single AMS on this extruder - unambiguous
- active_ams_id = ams_on_extruder[0]
- if 128 <= active_ams_id <= 135:
- # AMS-HT: single slot per unit, global ID = unit ID
- global_tray_id = active_ams_id
- else:
- global_tray_id = active_ams_id * 4 + parsed_tray_now
- logger.debug(
- f"[{self.serial_number}] H2D tray_now fallback: "
- f"slot {parsed_tray_now} + single AMS {active_ams_id} -> global ID {global_tray_id}"
- )
- self.state.tray_now = global_tray_id
- elif len(ams_on_extruder) > 1:
- # Multiple AMS on this extruder - keep current if valid, else try to narrow down
- current_tray = self.state.tray_now
- # Determine which AMS unit and slot the current tray belongs to
- if 0 <= current_tray <= 15:
- current_ams = current_tray // 4
- current_slot = current_tray % 4
- elif 128 <= current_tray <= 135:
- current_ams = current_tray # AMS-HT: ID = tray ID
- current_slot = 0
- else:
- current_ams = -1
- current_slot = -1
- if current_ams in ams_on_extruder and current_slot == parsed_tray_now:
- # Current is valid and matches slot - keep it
- logger.debug(
- f"[{self.serial_number}] H2D tray_now: multiple AMS {ams_on_extruder}, "
- f"keeping current {current_tray} (matches slot {parsed_tray_now})"
- )
- else:
- # Filter candidates: AMS-HT (128-135) only valid for slot 0
- if parsed_tray_now > 0:
- candidates = [a for a in ams_on_extruder if a <= 3]
- else:
- candidates = ams_on_extruder
- if len(candidates) == 1:
- cand = candidates[0]
- resolved = cand if 128 <= cand <= 135 else cand * 4 + parsed_tray_now
- logger.debug(
- f"[{self.serial_number}] H2D tray_now: multiple AMS {ams_on_extruder}, "
- f"narrowed to AMS {cand} -> global ID {resolved}"
- )
- self.state.tray_now = resolved
- else:
- # Genuinely ambiguous - use slot as-is (will be wrong for non-first AMS)
- logger.warning(
- f"[{self.serial_number}] H2D tray_now: multiple AMS {ams_on_extruder} on extruder {active_ext}, "
- f"no snow field, using slot {parsed_tray_now} (may be incorrect)"
- )
- self.state.tray_now = parsed_tray_now
- else:
- # No AMS on this extruder - use slot as-is
- logger.warning(
- f"[{self.serial_number}] H2D tray_now: no AMS on extruder {active_ext}, "
- f"using slot {parsed_tray_now}"
- )
- self.state.tray_now = parsed_tray_now
- elif not self._is_dual_nozzle and 0 <= parsed_tray_now <= 3:
- # Single-nozzle printer with tray_now in 0-3 range.
- # P2S (and possibly other models) with multiple AMS units sends LOCAL slot IDs
- # in tray_now, not global tray IDs (#420). Use the MQTT mapping field
- # (snow-encoded) to resolve the correct AMS unit.
- ams_exist_raw = ams_data.get("ams_exist_bits", "0")
- try:
- ams_exist = int(ams_exist_raw, 16) if isinstance(ams_exist_raw, str) else int(ams_exist_raw)
- except (ValueError, TypeError):
- ams_exist = 0
- num_ams = bin(ams_exist).count("1")
- if num_ams > 1:
- # Multiple AMS on single-nozzle — tray_now is likely a local slot ID.
- # Cross-reference with MQTT mapping field to find the correct AMS unit.
- mapping_raw = self.state.raw_data.get("mapping")
- resolved = self._resolve_local_slot_from_mapping(parsed_tray_now, mapping_raw)
- if resolved is not None:
- if resolved != parsed_tray_now:
- logger.debug(
- f"[{self.serial_number}] Multi-AMS tray_now: "
- f"local slot {parsed_tray_now} -> global ID {resolved} (from mapping)"
- )
- self.state.tray_now = resolved
- else:
- # No mapping available (not printing, or ambiguous) — use as-is.
- # This matches the old behavior and is correct for AMS 0.
- self.state.tray_now = parsed_tray_now
- else:
- # Single AMS — local slot 0-3 equals global ID
- self.state.tray_now = parsed_tray_now
- else:
- # tray_now > 3 means it's already a global ID, or 255 means unloaded
- # Note: Do NOT clear pending_tray_target on tray_now=255 here.
- # During filament change, the printer sends 255 first (unload), then the slot.
- # We only clear pending_tray_target explicitly in ams_unload_filament().
- # Trust the printer's reported value.
- self.state.tray_now = parsed_tray_now
- # Track last valid tray for usage tracking (survives retract → 255 at print end)
- # Valid physical trays: 0-15 (regular AMS), 128-135 (AMS-HT), 254 (external spool)
- tn = self.state.tray_now
- if (0 <= tn <= 15) or (128 <= tn <= 135) or tn == 254:
- # Log tray change for mid-print usage splitting. Gate on the
- # print-lifecycle flags (`_was_running` set on first RUNNING /
- # new print, `_completion_triggered` set when on_print_complete
- # fires) instead of `state in ("RUNNING", "PAUSE")` — P2S
- # firmware briefly transitions out of RUNNING during AMS
- # auto-fallback (#957), so a literal-string gate misses the
- # switch and the usage tracker double-credits at completion.
- if tn != self.state.last_loaded_tray and self._was_running and not self._completion_triggered:
- self.state.tray_change_log.append((tn, self.state.layer_num))
- logger.info(
- "[%s] Tray change during print: tray=%d at layer=%d",
- self.serial_number,
- tn,
- self.state.layer_num,
- )
- self.state.last_loaded_tray = self.state.tray_now
- logger.debug("[%s] tray_now updated: %s", self.serial_number, self.state.tray_now)
- # NOTE: ams_status is parsed BEFORE tray_now (see above) to ensure correct
- # state when checking filament change mode for H2D disambiguation
- # P1S/P1P send partial updates without "ams" key - this is valid, not an error
- # We've already processed the status fields above, so just return if no ams list
- if ams_list is None:
- logger.debug("[%s] AMS partial update (no tray data)", self.serial_number)
- return
- elif isinstance(ams_data, list):
- ams_list = ams_data
- else:
- logger.warning("[%s] Unexpected AMS data format: %s", self.serial_number, type(ams_data))
- return
- # Merge AMS data instead of replacing, to handle partial updates
- # During prints, the printer may only send updates for active AMS units
- # We need deep merging at the tray level to preserve fields like tray_sub_brands
- existing_ams = self.state.raw_data.get("ams", [])
- existing_by_id = {ams.get("id"): ams for ams in existing_ams if ams.get("id") is not None}
- # Update existing units with new data, add new units
- for ams_unit in ams_list:
- ams_id = ams_unit.get("id")
- if ams_id is not None:
- existing_unit = existing_by_id.get(ams_id)
- if existing_unit and "tray" in ams_unit:
- # Deep merge trays to preserve fields from previous updates
- existing_trays = {t.get("id"): t for t in existing_unit.get("tray", []) if t.get("id") is not None}
- merged_trays = []
- for new_tray in ams_unit.get("tray", []):
- tray_id = new_tray.get("id")
- if tray_id is not None and tray_id in existing_trays:
- # Merge: start with existing, update with new non-empty values
- merged_tray = existing_trays[tray_id].copy()
- # Detect slot-clearing updates (spool removal):
- # When tray_type is explicitly empty, clear everything
- # including RFID data (tag_uid/tray_uuid).
- slot_clearing = new_tray.get("tray_type") == ""
- # Some printers (e.g. H2D) only send {id, state} in
- # incremental updates when a tray is not fully loaded.
- # state=11 means loaded; other values (9=empty,
- # 10=spool present but filament not in feeder) indicate
- # the slot should be cleared. Without this, old
- # tray_type/tray_color persist indefinitely (#784).
- tray_state = new_tray.get("state")
- if (
- tray_state is not None
- and tray_state != 11
- and "tray_type" not in new_tray
- and merged_tray.get("tray_type")
- ):
- logger.info(
- "[%s] AMS %s tray %s: state=%s (not loaded) — clearing stale tray data",
- self.serial_number,
- ams_id,
- tray_id,
- tray_state,
- )
- slot_clearing = True
- # The incremental update only has {id, state} — inject
- # empty values for all content fields so the merge loop
- # below clears the stale data from merged_tray.
- new_tray.update(
- {
- "tray_type": "",
- "tray_sub_brands": "",
- "tray_color": "",
- "tray_id_name": "",
- "tray_info_idx": "",
- "tag_uid": "0000000000000000",
- "tray_uuid": "00000000000000000000000000000000",
- "remain": 0,
- "k": None,
- "cali_idx": None,
- }
- )
- for key, value in new_tray.items():
- # Fields that should always be updated (even with empty/zero values):
- # - remain, k, id, cali_idx: status indicators where 0 is valid
- # - tray_type, tray_sub_brands, tray_info_idx, tray_color,
- # tray_id_name: slot content indicators that must be cleared
- # when a spool is removed (fixes #147 - old AMS empty slot)
- # NOTE: tag_uid and tray_uuid are NOT in always_update_fields.
- # They are only cleared during spool removal (slot_clearing=True).
- # Periodic AMS updates often include empty RFID fields which
- # would overwrite valid data from the initial pushall.
- always_update_fields = (
- "remain",
- "k",
- "id",
- "cali_idx",
- "tray_type",
- "tray_sub_brands",
- "tray_info_idx",
- "tray_color",
- "tray_id_name",
- )
- if (
- key in always_update_fields
- or slot_clearing
- or value
- not in (
- None,
- "",
- "0000000000000000",
- "00000000000000000000000000000000",
- )
- ):
- merged_tray[key] = value
- merged_trays.append(merged_tray)
- else:
- merged_trays.append(new_tray)
- # Update ams_unit with merged trays. Spread existing_unit
- # FIRST so top-level fields the partial update omits —
- # dry_time, info (which drives dry_status / dry_sub_status),
- # humidity, temp — are preserved instead of dropped. The
- # printer sends tray-bearing partials that carry no drying
- # fields; without this, dry_time reads as absent → 0 and the
- # falling-edge detector below fires a false "drying complete"
- # (#1462). Mirrors the no-tray branch's merge semantics.
- ams_unit = {**existing_unit, **ams_unit, "tray": merged_trays}
- elif existing_unit:
- # Partial update without tray data: merge new fields into existing
- # unit to preserve tray, sn, sw_ver, and other accumulated data.
- ams_unit = {**existing_unit, **ams_unit}
- existing_by_id[ams_id] = ams_unit
- # Convert back to list, sorted by ID for consistent ordering
- merged_ams = sorted(existing_by_id.values(), key=lambda x: x.get("id", 0))
- # Check tray_exist_bits to clear empty slots (Issue #147)
- # New AMS models don't send empty tray data - they just update tray_exist_bits
- # Each bit in tray_exist_bits represents a slot: bit=0 means empty, bit=1 means has spool
- # Skip ONLY the printer-shutdown pattern: all-zero bits paired with
- # power_on_flag=False (#765). On shutdown that combination would wipe all
- # slot data and cause auto-unlink to remove spool assignments. Non-zero
- # bits with power_on_flag=False are valid AMS state from an idle printer
- # (#1365 — X1C reports power_on_flag=False between prints while the AMS
- # keeps reporting its actual slot inventory); the update MUST be applied
- # so spool removal is detected without requiring a manual reconnect.
- tray_exist_bits_str = ams_data.get("tray_exist_bits") if isinstance(ams_data, dict) else None
- power_on = ams_data.get("power_on_flag", True) if isinstance(ams_data, dict) else True
- if tray_exist_bits_str:
- try:
- tray_exist_bits = int(tray_exist_bits_str, 16)
- except (ValueError, TypeError) as e:
- logger.debug("[%s] Could not parse tray_exist_bits: %s", self.serial_number, e)
- tray_exist_bits = None
- if tray_exist_bits is not None and not (tray_exist_bits == 0 and not power_on):
- for ams_unit in merged_ams:
- ams_id_raw = ams_unit.get("id")
- if ams_id_raw is None:
- continue
- # Convert to int (may be string from JSON)
- ams_id = int(ams_id_raw) if isinstance(ams_id_raw, str) else ams_id_raw
- if ams_id >= 128: # Skip HT AMS (id >= 128)
- continue
- # Bits for this AMS unit: bits (ams_id*4) to (ams_id*4 + 3)
- for tray in ams_unit.get("tray", []):
- tray_id_raw = tray.get("id")
- if tray_id_raw is None:
- continue
- # Convert to int (may be string from JSON)
- tray_id = int(tray_id_raw) if isinstance(tray_id_raw, str) else tray_id_raw
- global_bit = ams_id * 4 + tray_id
- slot_exists = (tray_exist_bits >> global_bit) & 1
- if not slot_exists:
- # #1322 follow-up (by @RosdasHH): the bitmask is
- # BambuStudio's canonical "no spool" signal, and
- # works across every firmware variant (P1S, A1
- # Mini, post-restart, post-Reset-Slot, steady-
- # state). Promote to state=9 (firmware's
- # explicit "no spool" code) so downstream
- # readers — printers.py's API serializer,
- # inventory.py's `tray_state in {9, 10}`
- # short-circuit, the AMS card — see one
- # canonical signal instead of guessing from
- # payload shape. Int (not "9") to match the
- # downstream `==` comparison.
- tray["state"] = 9
- if tray.get("tray_type"):
- # Stale data from before the slot went empty
- # — clear it so the AMS view doesn't render a
- # colour/material that's no longer there.
- logger.debug(
- f"[{self.serial_number}] Clearing empty slot: AMS {ams_id} slot {tray_id} "
- f"(tray_exist_bits bit {global_bit} = 0)"
- )
- tray["tray_type"] = ""
- tray["tray_sub_brands"] = ""
- tray["tray_color"] = ""
- tray["tray_id_name"] = ""
- tray["tag_uid"] = "0000000000000000"
- tray["tray_uuid"] = "00000000000000000000000000000000"
- tray["tray_info_idx"] = ""
- tray["remain"] = 0
- self.state.raw_data["ams"] = merged_ams
- # Apply cached AMS firmware/SN from get_version (handles ordering and id type mismatches)
- self._apply_ams_version_cache(merged_ams)
- # Update timestamp for RFID refresh detection (frontend can detect "new data arrived")
- self.state.last_ams_update = time.time()
- logger.debug("[%s] Merged AMS data: %s new units, %s total", self.serial_number, len(ams_list), len(merged_ams))
- # Extract ams_extruder_map from each AMS unit's info field
- # BambuStudio DevFilaSystem.cpp parses info as hex string:
- # type_id = get_flag_bits(info, 0, 4) // bits 0-3: AMS type
- # extruder_id = get_flag_bits(info, 8, 4) // bits 8-11: extruder assignment
- # where get_flag_bits uses std::stoull(str, nullptr, 16) — hex parsing.
- # extruder_id: 0=right/main, 1=left/deputy, 0xE=uninitialized (skip)
- #
- # Use merged_ams (not ams_list) to avoid partial MQTT updates overwriting
- # the full map. Merge into existing map to preserve entries from prior updates.
- ams_extruder_map = dict(self.state.ams_extruder_map) if self.state.ams_extruder_map else {}
- for ams_unit in merged_ams:
- ams_id = ams_unit.get("id")
- info = ams_unit.get("info")
- if ams_id is not None and info is not None:
- try:
- # info is a hex-encoded string in MQTT JSON (e.g. "10001003")
- info_val = int(str(info), 16)
- # Extract 4 bits starting at bit 8 for extruder assignment
- extruder_id = (info_val >> 8) & 0xF
- if extruder_id == 0xE:
- # 0xE = uninitialized AMS, skip
- continue
- ams_extruder_map[str(ams_id)] = extruder_id
- logger.debug(f"[{self.serial_number}] AMS {ams_id} info=0x{info} -> extruder {extruder_id}")
- except (ValueError, TypeError):
- pass # Skip AMS units with unparseable info bitmask values
- if ams_extruder_map:
- self.state.raw_data["ams_extruder_map"] = ams_extruder_map
- self.state.ams_extruder_map = ams_extruder_map
- logger.debug("[%s] ams_extruder_map: %s", self.serial_number, ams_extruder_map)
- # Extract drying status from info hex string and dry_sf_reason per AMS unit
- # BambuStudio DevFilaSystem.cpp parses info bits:
- # dry_status = get_flag_bits(info, 4, 4) // bits 4-7
- # dry_sub_status = get_flag_bits(info, 22, 4) // bits 22-25
- for ams_unit in merged_ams:
- info = ams_unit.get("info")
- if info is not None:
- try:
- info_val = int(str(info), 16)
- ams_unit["dry_status"] = (info_val >> 4) & 0xF
- ams_unit["dry_sub_status"] = (info_val >> 22) & 0xF
- except (ValueError, TypeError):
- pass # Skip unparseable info values
- # dry_sf_reason is a per-unit array of cannot-dry reason codes
- if "dry_sf_reason" in ams_unit:
- sf_reason = ams_unit["dry_sf_reason"]
- if isinstance(sf_reason, list):
- ams_unit["dry_sf_reason"] = [
- int(r) for r in sf_reason if isinstance(r, int) or (isinstance(r, str) and r.isdigit())
- ]
- else:
- ams_unit["dry_sf_reason"] = []
- # Persist updated drying fields back to raw_data
- self.state.raw_data["ams"] = merged_ams
- # Detect AMS drying-complete falling edge per-unit (#1349). When an
- # AMS's `dry_time` transitions from >0 to 0 the cycle just finished
- # — fire the callback so smart-plug auto-off-after-drying can run.
- # Works identically for queue-triggered, ambient, and manual drying
- # because we observe the firmware-reported state, not our own intent.
- if self.on_drying_complete:
- for ams_unit in merged_ams:
- try:
- ams_id = int(ams_unit.get("id", -1))
- except (TypeError, ValueError):
- continue
- if ams_id < 0:
- continue
- # Only evaluate the edge when this update carries an explicit
- # dry_time. An absent / unparseable value is NOT zero — treating
- # it as 0 lets a tray-only partial fake a drying-complete edge
- # (#1462). Skip without touching the remembered value so the
- # next update that DOES carry dry_time sees the true previous.
- raw_dry_time = ams_unit.get("dry_time")
- if raw_dry_time is None:
- continue
- try:
- current = int(raw_dry_time)
- except (TypeError, ValueError):
- continue
- previous = self._previous_dry_times.get(ams_id, 0)
- self._previous_dry_times[ams_id] = current
- if previous > 0 and current == 0:
- logger.info(
- "[%s] AMS %d drying complete (dry_time %d → 0)",
- self.serial_number,
- ams_id,
- previous,
- )
- self.on_drying_complete(ams_id)
- # Create a hash of relevant AMS data to detect changes
- ams_hash_data = []
- for ams_unit in ams_list:
- for tray in ams_unit.get("tray", []):
- # Include fields that matter for filament tracking
- ams_hash_data.append(
- f"{ams_unit.get('id')}:{tray.get('id')}:"
- f"{tray.get('tray_type')}:{tray.get('tag_uid')}:{tray.get('remain')}"
- )
- ams_hash = hashlib.md5(":".join(ams_hash_data).encode(), usedforsecurity=False).hexdigest()
- # Only trigger callback if AMS data actually changed
- if ams_hash != self._previous_ams_hash:
- self._previous_ams_hash = ams_hash
- if self.on_ams_change:
- logger.debug("[%s] AMS data changed, triggering sync callback", self.serial_number)
- # Pass merged AMS data (not raw ams_list) — partial MQTT updates
- # may lack fields like 'remain' that the merged state preserves
- self.on_ams_change(merged_ams)
- def _update_state(self, data: dict):
- """Update printer state from message data."""
- _previous_state = self.state.state
- # Update state fields
- if "gcode_state" in data:
- self.state.state = data["gcode_state"]
- if "gcode_file" in data:
- self.state.gcode_file = data["gcode_file"]
- self.state.current_print = data["gcode_file"]
- if "subtask_name" in data:
- self.state.subtask_name = data["subtask_name"]
- # Prefer subtask_name as current_print if available
- if data["subtask_name"]:
- self.state.current_print = data["subtask_name"]
- if "subtask_id" in data:
- self.state.subtask_id = data["subtask_id"]
- if "mc_percent" in data:
- # Save last non-zero progress for usage tracking (firmware resets to 0 on cancel)
- if self.state.progress > 0:
- self._last_valid_progress = self.state.progress
- self.state.progress = float(data["mc_percent"])
- if "mc_remaining_time" in data:
- self.state.remaining_time = int(data["mc_remaining_time"])
- if "mc_print_sub_stage" in data:
- new_sub_stage = int(data["mc_print_sub_stage"])
- if new_sub_stage != self.state.mc_print_sub_stage:
- logger.debug(
- f"[{self.serial_number}] mc_print_sub_stage changed: "
- f"{self.state.mc_print_sub_stage} -> {new_sub_stage}"
- )
- self.state.mc_print_sub_stage = new_sub_stage
- if "layer_num" in data:
- new_layer = int(data["layer_num"])
- old_layer = self.state.layer_num
- # Save last non-zero layer for usage tracking (firmware resets to 0 on cancel)
- if old_layer > 0:
- self._last_valid_layer_num = old_layer
- self.state.layer_num = new_layer
- # Trigger layer change callback if layer increased
- if new_layer > old_layer and self.on_layer_change:
- self.on_layer_change(new_layer)
- if "total_layer_num" in data:
- self.state.total_layers = int(data["total_layer_num"])
- # Fan speeds (MQTT sends as string "0"-"15" representing speed levels, or percentage)
- # Convert to 0-100 percentage for display
- def parse_fan_speed(value: str | int | None) -> int | None:
- if value is None:
- return None
- try:
- speed = int(value)
- # MQTT reports 0-15 speed levels, convert to percentage (0-100)
- # 15 = 100%, so multiply by 100/15 ≈ 6.67
- if speed <= 15:
- return round(speed * 100 / 15)
- # If already a percentage (0-255 scale from some printers), convert
- elif speed <= 255:
- return round(speed * 100 / 255)
- return speed
- except (ValueError, TypeError):
- return None
- # Log fan fields once for debugging
- if not hasattr(self, "_fan_fields_logged"):
- fan_fields = {k: v for k, v in data.items() if "fan" in k.lower()}
- if fan_fields:
- logger.debug("[%s] Fan fields in MQTT data: %s", self.serial_number, fan_fields)
- self._fan_fields_logged = True
- if "cooling_fan_speed" in data:
- self.state.cooling_fan_speed = parse_fan_speed(data["cooling_fan_speed"])
- if "big_fan1_speed" in data:
- self.state.big_fan1_speed = parse_fan_speed(data["big_fan1_speed"])
- if "big_fan2_speed" in data:
- self.state.big_fan2_speed = parse_fan_speed(data["big_fan2_speed"])
- if "heatbreak_fan_speed" in data:
- self.state.heatbreak_fan_speed = parse_fan_speed(data["heatbreak_fan_speed"])
- # Calibration stage tracking
- if "stg_cur" in data:
- new_stg = data["stg_cur"]
- # Always log ANY stg_cur change for debugging filament operations
- if new_stg != self.state.stg_cur:
- logger.debug(
- f"[{self.serial_number}] stg_cur changed: {self.state.stg_cur} -> {new_stg} ({get_stage_name(new_stg)})"
- )
- self.state.stg_cur = new_stg
- if "stg" in data:
- self.state.stg = data["stg"] if isinstance(data["stg"], list) else []
- # Temperature data
- temps = {}
- # Log all fields for debugging dual-nozzle temperature discovery (only once)
- if "bed_temper" in data and not hasattr(self, "_temp_fields_logged"):
- temp_fields = {k: v for k, v in data.items() if "temp" in k.lower() or "chamber" in k.lower()}
- logger.debug("[%s] Temperature-related fields: %s", self.serial_number, temp_fields)
- # Log ALL keys in print data for H2D temperature discovery
- all_keys = sorted(data.keys())
- logger.debug("[%s] ALL print data keys (%s): %s", self.serial_number, len(all_keys), all_keys)
- self._temp_fields_logged = True
- # Log vir_slot data (once) - this may contain per-extruder slot mapping for H2D
- if "vir_slot" in data and not hasattr(self, "_vir_slot_logged"):
- logger.debug("[%s] vir_slot data: %s", self.serial_number, data["vir_slot"])
- self._vir_slot_logged = True
- # Log nozzle hardware info fields (once)
- nozzle_fields = {
- k: v
- for k, v in data.items()
- if "nozzle" in k.lower() or "hw" in k.lower() or "extruder" in k.lower() or "upgrade" in k.lower()
- }
- if nozzle_fields and not hasattr(self, "_nozzle_fields_logged"):
- logger.debug("[%s] Nozzle/hardware fields in MQTT data: %s", self.serial_number, nozzle_fields)
- self._nozzle_fields_logged = True
- # Parse active extruder from device.extruder.state bit 8
- # bit 8 = 0 → RIGHT extruder (active_extruder=0)
- # bit 8 = 1 → LEFT extruder (active_extruder=1)
- if "device" in data and isinstance(data.get("device"), dict):
- device = data["device"]
- if "extruder" in device and "state" in device["extruder"]:
- state_val = device["extruder"]["state"]
- # Extract bit 8 for extruder position
- new_extruder = (state_val >> 8) & 0x1
- if new_extruder != self.state.active_extruder:
- logger.debug(
- f"[{self.serial_number}] ACTIVE EXTRUDER CHANGED (state bit 8): {self.state.active_extruder} -> {new_extruder} (0=right, 1=left) [state={state_val}]"
- )
- self.state.active_extruder = new_extruder
- # Log device.extruder structure for active extruder
- if "device" in data and isinstance(data.get("device"), dict):
- device = data["device"]
- if "extruder" in device:
- ext_data = device["extruder"]
- # Log 'state' field - OrcaSlicer uses bits 12-14 for switch state
- if "state" in ext_data:
- state_val = ext_data["state"]
- # Extract bits 12-14 (3 bits) for switch state
- switch_state = (state_val >> 12) & 0x7
- logger.debug(
- f"[{self.serial_number}] device.extruder.state={state_val} (switch_state bits 12-14: {switch_state})"
- )
- # Log 'cur' field if present (might indicate current/active extruder)
- if "cur" in ext_data:
- logger.debug("[%s] device.extruder.cur: %s", self.serial_number, ext_data["cur"])
- # Filament Track Switch (FTS) detection — #1162. Presence of
- # device.fila_switch in MQTT means the FTS accessory is installed.
- if "device" in data and isinstance(data.get("device"), dict):
- fs_data = data["device"].get("fila_switch")
- if isinstance(fs_data, dict):
- in_raw = fs_data.get("in")
- out_raw = fs_data.get("out")
- self.state.fila_switch = FilaSwitchState(
- installed=True,
- in_slots=list(in_raw) if isinstance(in_raw, list) else [],
- out_extruders=list(out_raw) if isinstance(out_raw, list) else [],
- stat=int(fs_data.get("stat", 0) or 0),
- info=int(fs_data.get("info", 0) or 0),
- )
- if "bed_temper" in data:
- temps["bed"] = float(data["bed_temper"])
- if "bed_target_temper" in data:
- temps["bed_target"] = float(data["bed_target_temper"])
- # Check if this is H2D (has device.extruder.info with 2 extruders)
- has_h2d_extruder_info = (
- "device" in data
- and isinstance(data.get("device"), dict)
- and "extruder" in data["device"]
- and isinstance(data["device"]["extruder"].get("info"), list)
- and len(data["device"]["extruder"]["info"]) >= 2
- )
- # Standard nozzle fields: these are for the RIGHT/default nozzle on H2D
- # For H2D, we use these for nozzle_2 (RIGHT), for others use as nozzle (primary)
- # NOTE: On H2D, nozzle_temper seems to mirror left nozzle - we override with extruder_info[0] later
- if "nozzle_temper" in data:
- if has_h2d_extruder_info:
- temps["nozzle_2"] = float(data["nozzle_temper"]) # Will be overridden by extruder_info[0]
- else:
- temps["nozzle"] = float(data["nozzle_temper"])
- if "nozzle_target_temper" in data:
- if has_h2d_extruder_info:
- temps["nozzle_2_target"] = float(data["nozzle_target_temper"]) # RIGHT target on H2D
- else:
- temps["nozzle_target"] = float(data["nozzle_target_temper"])
- # Second nozzle for dual-extruder printers - skip for H2D (uses device.extruder.info instead)
- if not has_h2d_extruder_info:
- # Try multiple possible field names used by different firmware versions
- if "nozzle_temper_2" in data:
- val = float(data["nozzle_temper_2"])
- if -50 < val < 500: # Valid temp range
- temps["nozzle_2"] = val
- else:
- logger.debug("[%s] nozzle_temper_2=%s out of range", self.serial_number, val)
- elif "right_nozzle_temper" in data:
- val = float(data["right_nozzle_temper"])
- if -50 < val < 500: # Valid temp range
- temps["nozzle_2"] = val
- else:
- logger.debug("[%s] right_nozzle_temper=%s out of range", self.serial_number, val)
- if "nozzle_target_temper_2" in data:
- val = float(data["nozzle_target_temper_2"])
- if 0 <= val < 500: # Valid temp range
- temps["nozzle_2_target"] = val
- else:
- logger.debug("[%s] nozzle_target_temper_2=%s out of range", self.serial_number, val)
- elif "right_nozzle_target_temper" in data:
- val = float(data["right_nozzle_target_temper"])
- if 0 <= val < 500: # Valid temp range
- temps["nozzle_2_target"] = val
- else:
- logger.debug("[%s] right_nozzle_target_temper=%s out of range", self.serial_number, val)
- # Also check for left nozzle as primary (some H2 models)
- if "left_nozzle_temper" in data and "nozzle" not in temps:
- temps["nozzle"] = float(data["left_nozzle_temper"])
- if "left_nozzle_target_temper" in data and "nozzle_target" not in temps:
- temps["nozzle_target"] = float(data["left_nozzle_target_temper"])
- if "chamber_temper" in data:
- chamber_val = float(data["chamber_temper"])
- logger.debug("[%s] chamber_temper raw value: %s", self.serial_number, chamber_val)
- # Check if we recently set the target locally (within 5 seconds)
- local_set_time = self.state.temperatures.get("_chamber_target_set_time", 0)
- respect_local = (time.time() - local_set_time) < 5.0
- # H2D protocol: chamber_temper encoding indicates heater state
- # - When > 500: encoded as (target * 65536 + current) - heater is ON
- # - When < 500: direct Celsius current temp only - heater is OFF
- if -50 < chamber_val < 100:
- # Direct value = heater is OFF
- temps["chamber"] = chamber_val
- if not respect_local:
- temps["chamber_target"] = 0.0 # Heater off means target = 0
- logger.debug("[%s] chamber_temper direct value: %s°C (heater OFF)", self.serial_number, chamber_val)
- else:
- logger.debug("[%s] chamber_temper %s out of direct range", self.serial_number, chamber_val)
- # Try to decode if it looks like an encoded value
- if chamber_val > 500:
- mqtt_target = int(chamber_val) // 65536
- current = int(chamber_val) % 65536
- logger.debug(
- f"[{self.serial_number}] chamber_temper decoded: mqtt_target={mqtt_target}, current={current}, respect_local={respect_local}"
- )
- if -50 < current < 100:
- temps["chamber"] = float(current)
- # Store decoded target for later use, but DON'T set chamber_heating here!
- # Heating state will be calculated later after parsing ctc.info.target (explicit target)
- # which is the authoritative source the slicer uses.
- if not respect_local:
- if 0 <= mqtt_target <= 60:
- # Store as "decoded" target - may be overridden by explicit target fields
- temps["_chamber_decoded_target"] = float(mqtt_target)
- # Chamber target temperature (set by print file or display)
- if "mc_target_cham" in data:
- mc_target = float(data["mc_target_cham"])
- logger.debug("[%s] mc_target_cham raw value: %s", self.serial_number, mc_target)
- # Filter out encoded/invalid values - valid chamber target is 0-60°C
- if 0 <= mc_target <= 60:
- temps["chamber_target"] = mc_target
- # H2D series: Chamber temp is in info.temp (may be encoded or direct °C)
- # NOTE: Don't set chamber_heating here - let ctc.info.target or fallback logic handle it
- # The encoded target in info.temp may be stale (slicer uses ctc.info.target as source of truth)
- try:
- if "info" in data and isinstance(data["info"], dict):
- info_temp = data["info"].get("temp")
- if info_temp is not None and "chamber" not in temps:
- # Check for encoded value (target * 65536 + current)
- if info_temp > 500:
- # Decode: extract current temperature and target
- target = info_temp // 65536
- current = info_temp % 65536
- temps["chamber"] = float(current)
- # Store decoded target as fallback (may be overridden by ctc.info.target)
- if "_chamber_decoded_target" not in temps:
- temps["_chamber_decoded_target"] = float(target)
- logger.debug(
- f"[{self.serial_number}] info.temp encoded: {info_temp} -> current={current}, decoded_target={target}"
- )
- elif -50 < info_temp < 100:
- # Valid direct temperature - heater is OFF
- temps["chamber"] = float(info_temp)
- temps["chamber_target"] = 0.0 # Direct value means heater off
- logger.debug("[%s] info.temp direct: %s°C (heater OFF)", self.serial_number, info_temp)
- # H2D series: Dual extruder temps are in device.extruder.info array
- # Temperature values are encoded as fixed-point (value / 65536 = °C)
- if "device" in data and isinstance(data["device"], dict):
- device = data["device"]
- # Parse dual extruder temperatures
- extruder_data = device.get("extruder", {})
- extruder_info = extruder_data.get("info", [])
- if isinstance(extruder_info, list) and len(extruder_info) >= 1:
- # H2D nozzle mapping: id=0 is RIGHT nozzle (default), id=1 is LEFT nozzle
- # Only parse dual nozzle temps if this is actually a dual nozzle printer (H2D)
- # has_h2d_extruder_info requires len(extruder_info) >= 2
- if has_h2d_extruder_info:
- # Right nozzle (extruder 0) - use extruder_info for actual temp, not nozzle_temper
- # nozzle_temper field seems to mirror left nozzle on H2D, so use extruder_info[0]
- if "temp" in extruder_info[0]:
- temp_val = extruder_info[0]["temp"]
- if temp_val > 500:
- # Encoded format: temp = target * 65536 + current
- target = temp_val // 65536
- current = temp_val % 65536
- if -50 < current < 500:
- temps["nozzle_2"] = float(current)
- if 0 < target < 500:
- temps["nozzle_2_target"] = float(target)
- temps["nozzle_2_heating"] = target > 0 and current < target
- elif -50 < temp_val < 500:
- # Direct Celsius value = heater is OFF
- temps["nozzle_2"] = float(temp_val)
- temps["nozzle_2_target"] = 0.0
- temps["nozzle_2_heating"] = False
- # Left nozzle (extruder 1) - only for dual nozzle printers
- # H2D protocol: temp field encoding depends on value
- # - When > 500: encoded as (target * 65536 + current) - heater is ON
- # - When < 500: direct Celsius current temp only - heater is OFF
- if len(extruder_info) >= 2 and "temp" in extruder_info[1]:
- ext1 = extruder_info[1]
- temp_val = ext1["temp"]
- # Check if we recently set the target locally (within 5 seconds)
- # If so, don't let MQTT data overwrite it
- local_set_time = self.state.temperatures.get("_nozzle_target_set_time", 0)
- respect_local_target = (time.time() - local_set_time) < 5.0
- if temp_val > 500:
- # Encoded format: temp = target * 65536 + current
- target = temp_val // 65536
- current = temp_val % 65536
- if 0 < target < 500 and not respect_local_target:
- temps["nozzle_target"] = float(target)
- if -50 < current < 500:
- temps["nozzle"] = float(current)
- # Heating = encoded AND we're using the MQTT target (not local override)
- # If local target is being respected, use local target to determine heating
- if respect_local_target:
- local_target = self.state.temperatures.get("nozzle_target", 0)
- temps["nozzle_heating"] = local_target > 0 and current < local_target
- else:
- temps["nozzle_heating"] = target > 0 and current < target
- elif -50 < temp_val < 500:
- # Direct Celsius = heater is OFF (or at target with heater off)
- temps["nozzle"] = float(temp_val)
- if not respect_local_target:
- temps["nozzle_target"] = 0.0
- temps["nozzle_heating"] = False # Direct = not heating
- # Parse H2D snow field (slot now) for accurate tray_now disambiguation
- # snow encodes AMS ID in high byte: ams_id = snow >> 8, slot = snow & 0xFF
- if has_h2d_extruder_info:
- for ext_info in extruder_info:
- ext_id = ext_info.get("id")
- snow = ext_info.get("snow")
- if ext_id is not None and snow is not None and ext_id <= 1:
- # Normalize H2D snow value to global tray ID
- ams_id = snow >> 8
- slot = snow & 0xFF
- if 0 <= ams_id <= 3:
- # Regular AMS slot
- global_tray = ams_id * 4 + (slot & 0x03)
- old_val = self.state.h2d_extruder_snow.get(ext_id)
- if old_val != global_tray:
- logger.debug(
- f"[{self.serial_number}] H2D extruder[{ext_id}] snow: "
- f"raw={snow} (AMS {ams_id} slot {slot}) -> global tray {global_tray}"
- )
- self.state.h2d_extruder_snow[ext_id] = global_tray
- elif ams_id == 254 or ams_id == 255:
- # External spool or unloaded
- normalized = 254 if slot != 255 else 255
- old_val = self.state.h2d_extruder_snow.get(ext_id)
- if old_val != normalized:
- logger.debug(
- f"[{self.serial_number}] H2D extruder[{ext_id}] snow: "
- f"raw={snow} -> {'external' if normalized == 254 else 'unloaded'}"
- )
- self.state.h2d_extruder_snow[ext_id] = normalized
- elif 128 <= ams_id <= 135:
- # External spool with hub mapping
- old_val = self.state.h2d_extruder_snow.get(ext_id)
- if old_val != ams_id:
- logger.debug(
- f"[{self.serial_number}] H2D extruder[{ext_id}] snow: "
- f"raw={snow} -> external hub {ams_id}"
- )
- self.state.h2d_extruder_snow[ext_id] = ams_id
- # Parse bed heating state from device.bed.info.temp encoding
- # temp > 500 means encoded (target*65536+current), heating = target > 0 AND current < target
- bed_data = device.get("bed", {})
- bed_info = bed_data.get("info", {})
- if "temp" in bed_info:
- temp_val = bed_info["temp"]
- if temp_val > 500:
- target = temp_val // 65536
- current = temp_val % 65536
- temps["bed_heating"] = target > 0 and current < target
- else:
- temps["bed_heating"] = False
- # Parse chamber temp from device.ctc.info.temp if not already set
- ctc_data = device.get("ctc", {})
- ctc_info = ctc_data.get("info", {})
- # Parse airduct mode (0=cooling, 1=heating)
- airduct_data = device.get("airduct", {})
- if "modeCur" in airduct_data:
- new_mode = airduct_data["modeCur"]
- if new_mode != self.state.airduct_mode:
- logger.debug(
- f"[{self.serial_number}] airduct_mode changed: {self.state.airduct_mode} -> {new_mode}"
- )
- self.state.airduct_mode = new_mode
- # Parse chamber temp - may be encoded as (target*65536+current) when > 500
- # Check if we recently set the target locally (within 5 seconds)
- local_set_time = self.state.temperatures.get("_chamber_target_set_time", 0)
- respect_local_target = (time.time() - local_set_time) < 5.0
- # Log ctc_info contents for debugging
- if ctc_info:
- logger.debug("[%s] ctc_info keys: %s", self.serial_number, list(ctc_info.keys()))
- # FIRST: Parse explicit ctc.info.target if available - this is the authoritative target
- # (what the slicer shows). This OVERRIDES any previously decoded target.
- explicit_target = None
- if "target" in ctc_info:
- target_val = ctc_info["target"]
- logger.debug(
- f"[{self.serial_number}] ctc_info.target explicit value: {target_val}, respect_local={respect_local_target}"
- )
- # Filter out invalid values (valid chamber target is 0-60°C)
- if 0 <= target_val <= 60 and not respect_local_target:
- explicit_target = float(target_val)
- temps["chamber_target"] = explicit_target # Override any previous value
- logger.debug(
- f"[{self.serial_number}] Setting chamber_target from ctc_info.target: {explicit_target}"
- )
- # Parse chamber temp from ctc.info.temp - may be encoded
- if "temp" in ctc_info and "chamber" not in temps:
- temp_val = ctc_info["temp"]
- logger.debug("[%s] ctc_info.temp raw value: %s", self.serial_number, temp_val)
- if temp_val > 500:
- # Encoded value: decode target and current
- decoded_target = temp_val // 65536
- current = temp_val % 65536
- temps["chamber"] = float(current)
- logger.debug(
- f"[{self.serial_number}] ctc_info.temp decoded: target={decoded_target}, current={current}, explicit_target={explicit_target}"
- )
- # Determine which target to use for heating state:
- # Priority: local target > explicit target > decoded target
- if respect_local_target:
- local_target = self.state.temperatures.get("chamber_target", 0)
- temps["chamber_heating"] = local_target > 0 and current < local_target
- elif explicit_target is not None:
- # Use explicit ctc.info.target - this is what slicer sees
- temps["chamber_heating"] = explicit_target > 0 and current < explicit_target
- else:
- # Fallback to decoded target only if no explicit target available
- if not respect_local_target and "chamber_target" not in temps:
- temps["chamber_target"] = float(decoded_target)
- temps["chamber_heating"] = decoded_target > 0 and current < decoded_target
- else:
- # Direct value (not encoded) - heater is OFF
- temps["chamber"] = float(temp_val)
- temps["chamber_heating"] = False
- except Exception as e:
- logger.warning("[%s] Error parsing H2D temperatures: %s", self.serial_number, e)
- if temps:
- # Handle chamber_target: prefer explicit over decoded
- if "_chamber_decoded_target" in temps and "chamber_target" not in temps:
- # No explicit target available, use decoded target from chamber_temper
- temps["chamber_target"] = temps["_chamber_decoded_target"]
- # Remove internal temp key before merging
- temps.pop("_chamber_decoded_target", None)
- # Merge new temps into existing, preserving valid values when new ones are filtered out
- for key, value in temps.items():
- self.state.temperatures[key] = value
- # Notify bed temperature updates (used by event-driven bed cooldown monitor)
- if "bed" in temps and self.on_bed_temp_update:
- self.on_bed_temp_update(temps["bed"])
- # Calculate chamber_heating after all targets are known
- # Priority: local target (if recent) > explicit target (chamber_target) > 0
- if "chamber" in temps and "chamber_heating" not in temps:
- current = self.state.temperatures.get("chamber", 0)
- local_set_time = self.state.temperatures.get("_chamber_target_set_time", 0)
- respect_local = (time.time() - local_set_time) < 5.0
- if respect_local:
- # Use locally-set target
- target = self.state.temperatures.get("chamber_target", 0)
- else:
- # Use explicit/decoded target from MQTT
- target = self.state.temperatures.get("chamber_target", 0)
- self.state.temperatures["chamber_heating"] = target > 0 and current < target
- logger.debug(
- f"[{self.serial_number}] Chamber heating calculated: target={target}, current={current}, heating={self.state.temperatures['chamber_heating']}, respect_local={respect_local}"
- )
- # Debug: log chamber value if it was updated
- if "chamber" in temps:
- logger.debug(
- f"[{self.serial_number}] Chamber temp updated to: {self.state.temperatures.get('chamber')}, target: {self.state.temperatures.get('chamber_target')}, heating: {self.state.temperatures.get('chamber_heating')}"
- )
- # Calculate nozzle_heating for single nozzle printers (not set by H2D parsing)
- # For H2D, nozzle_heating is set in temps dict; for single nozzle, calculate here
- if "nozzle" in temps and "nozzle_heating" not in temps:
- current = self.state.temperatures.get("nozzle", 0)
- target = self.state.temperatures.get("nozzle_target", 0)
- self.state.temperatures["nozzle_heating"] = target > 0 and current < target
- # Parse HMS (Health Management System) errors
- if "hms" in data:
- hms_list = data["hms"]
- logger.debug("[%s] HMS data received: %s", self.serial_number, hms_list)
- self.state.hms_errors = []
- if isinstance(hms_list, list):
- for hms in hms_list:
- if isinstance(hms, dict):
- # HMS format: {"attr": attribute_code, "code": error_code}
- # attr contains module/severity info, code contains error number
- # Both are needed to construct the wiki URL
- attr = hms.get("attr", 0)
- code = hms.get("code", 0)
- if isinstance(attr, str):
- attr = int(attr.replace("0x", ""), 16) if attr else 0
- if isinstance(code, str):
- code = int(code.replace("0x", ""), 16) if code else 0
- # Severity is in attr byte 1 (bits 8-15)
- severity = (attr >> 8) & 0xF
- # Module is in attr byte 3 (bits 24-31)
- module = (attr >> 24) & 0xFF
- # Skip non-error status codes — all real HMS errors
- # have code >= 0x4000. Lower values are status/phase
- # indicators that some firmware sends during normal printing.
- if code < 0x4000:
- continue
- # Skip user-action echoes — the printer firmware emits these
- # as part of normal user-cancel sequences. They're not faults
- # and shouldn't count toward "X problem" badges or surface as
- # red pips on the printer card. Backend's notification path
- # already suppresses 0500_400E for the same reason.
- short_code = f"{(attr >> 16) & 0xFFFF:04X}_{code & 0xFFFF:04X}"
- if short_code in _HMS_USER_ACTION_CODES:
- continue
- self.state.hms_errors.append(
- HMSError(
- code=f"0x{code:x}" if code else "0x0",
- attr=attr,
- module=module,
- severity=severity if severity > 0 else 2,
- )
- )
- # Parse print_error - this is a different error format than HMS
- # print_error is a 32-bit integer where:
- # - High 16 bits contain module info (e.g., 0x0500)
- # - Low 16 bits contain error code (e.g., 0x8061)
- # Format on printer screen: [0500-8061] -> short code: 0500_8061
- if "print_error" in data:
- print_error = data["print_error"]
- if print_error and print_error != 0:
- # Extract components: MMMMEEEE -> MMMM_EEEE
- module = (print_error >> 16) & 0xFFFF # High 16 bits (e.g., 0x0500)
- error = print_error & 0xFFFF # Low 16 bits (e.g., 0x8061)
- # Values below 0x4000 are status/phase indicators, not real errors.
- # All known HMS errors use 0x4xxx (fatal), 0x8xxx (warning), 0xCxxx (prompt).
- # Some firmware sends low values like 0x0002 during normal printing.
- if error < 0x4000:
- pass # Skip — not a real error
- else:
- # Store in a format that matches the community error database
- # attr stores the full 32-bit value for reconstruction
- # code stores the short format string for lookup
- short_code = f"{module:04X}_{error:04X}"
- logger.debug(
- f"[{self.serial_number}] print_error: {print_error} (0x{print_error:08x}) -> short_code={short_code}"
- )
- # Same user-action filter as the hms[] branch above — print_error
- # carries the same cancel echoes (e.g. 0500_400E) and they must
- # not surface as faults on the printer card.
- if short_code in _HMS_USER_ACTION_CODES:
- pass # cancel echo — silently drop
- else:
- # Only add if not already in HMS errors (avoid duplicates)
- existing_short_codes = set()
- for e in self.state.hms_errors:
- # Extract short code from existing errors
- e_module = (e.attr >> 16) & 0xFFFF
- e_error = int(e.code.replace("0x", ""), 16) if e.code else 0
- existing_short_codes.add(f"{e_module:04X}_{e_error:04X}")
- if short_code not in existing_short_codes:
- self.state.hms_errors.append(
- HMSError(
- code=f"0x{error:x}",
- attr=print_error, # Store full value for display
- module=module >> 8, # High byte of module (e.g., 0x05)
- severity=3, # Warning level for print_error
- )
- )
- # Parse home_flag first so SD-card detection below can prefer it.
- # Bit 8 = HAS_SDCARD_NORMAL, bit 9 = HAS_SDCARD_ABNORMAL, bit 11 = store-to-SD,
- # bit 23 = door-open (X1 family only).
- home_flag = None
- if "home_flag" in data:
- home_flag = data["home_flag"]
- if home_flag < 0:
- home_flag = home_flag & 0xFFFFFFFF
- # SD card presence: the only remaining consumer is the firmware-update
- # precondition check (firmware_update.py). Use the top-level `sdcard`
- # field when present with a permissive truthy check covering the
- # bool/int/"HAS_SDCARD_NORMAL" variants real firmware emits. We do NOT
- # derive this from home_flag — heartbeat pushes clear bits 8-9 even
- # when a card is inserted, which caused the badge to flap before the
- # badge was removed entirely.
- if "sdcard" in data:
- raw_sdcard = data["sdcard"]
- if isinstance(raw_sdcard, str):
- self.state.sdcard = "HAS_SDCARD" in raw_sdcard.upper() or raw_sdcard.lower() in ("true", "normal", "1")
- else:
- self.state.sdcard = bool(raw_sdcard)
- if home_flag is not None:
- store_to_sdcard = bool((home_flag >> 11) & 1)
- if store_to_sdcard != self.state.store_to_sdcard:
- logger.debug(
- f"[{self.serial_number}] store_to_sdcard changed: {self.state.store_to_sdcard} -> {store_to_sdcard}"
- )
- self.state.store_to_sdcard = store_to_sdcard
- # Door open detection — source depends on printer family:
- # X1 series (X1, X1C, X1E): home_flag bit 23
- # All others (P1/P2/H2/A1/N-series): top-level `stat` field (hex string), bit 23
- # Both share the same bitmask (0x00800000) but live in different fields.
- model_upper = (self.model or "").upper().strip()
- is_x1_family = model_upper in ("X1", "X1C", "X1E")
- if is_x1_family and home_flag is not None:
- door_open = (home_flag & 0x00800000) != 0
- if door_open != self.state.door_open:
- logger.debug(
- "[%s] door_open changed: %s -> %s (home_flag=0x%08X)",
- self.serial_number,
- self.state.door_open,
- door_open,
- home_flag,
- )
- self.state.door_open = door_open
- elif not is_x1_family and "stat" in data:
- try:
- stat_value = int(data["stat"], 16) if isinstance(data["stat"], str) else int(data["stat"])
- door_open = (stat_value & 0x00800000) != 0
- if door_open != self.state.door_open:
- logger.debug(
- "[%s] door_open changed: %s -> %s (stat=0x%08X)",
- self.serial_number,
- self.state.door_open,
- door_open,
- stat_value,
- )
- self.state.door_open = door_open
- except (ValueError, TypeError):
- logger.debug("[%s] could not parse stat field: %r", self.serial_number, data["stat"])
- # Parse timelapse status (recording active during print)
- if "timelapse" in data:
- logger.debug("[%s] timelapse field: %s", self.serial_number, data["timelapse"])
- self.state.timelapse = data["timelapse"] is True
- # Track if timelapse was ever active during this print
- if self.state.timelapse and self._was_running:
- self._timelapse_during_print = True
- # Parse ipcam/live view status
- if "ipcam" in data:
- ipcam_data = data["ipcam"]
- logger.debug("[%s] ipcam field: %s", self.serial_number, ipcam_data)
- if isinstance(ipcam_data, dict):
- # Check ipcam_record field for live view status
- self.state.ipcam = ipcam_data.get("ipcam_record") == "enable"
- # Check timelapse field (H2D sends it here, not in xcam)
- if "timelapse" in ipcam_data:
- timelapse_enabled = ipcam_data.get("timelapse") == "enable"
- if timelapse_enabled != self.state.timelapse:
- logger.debug(
- f"[{self.serial_number}] timelapse changed (from ipcam): {self.state.timelapse} -> {timelapse_enabled}"
- )
- self.state.timelapse = timelapse_enabled
- # Track if timelapse was ever active during this print
- if self.state.timelapse and self._was_running:
- self._timelapse_during_print = True
- logger.debug("[%s] Timelapse detected during print (from ipcam)", self.serial_number)
- else:
- self.state.ipcam = ipcam_data is True
- # Parse WiFi signal strength (dBm)
- if "wifi_signal" in data:
- wifi_signal = data["wifi_signal"]
- logger.debug("[%s] wifi_signal received: %s", self.serial_number, wifi_signal)
- if isinstance(wifi_signal, (int, float)):
- self.state.wifi_signal = int(wifi_signal)
- elif isinstance(wifi_signal, str):
- # Handle string format like "-52dBm"
- try:
- self.state.wifi_signal = int(wifi_signal.replace("dBm", "").strip())
- except ValueError:
- pass # Ignore unparseable wifi_signal strings; field is non-critical
- # Detect ethernet connection: printers on ethernet with WiFi disabled
- # report a hardcoded wifi_signal of -90 dBm. Real WiFi signals vary
- # (typically -30 to -80 dBm). Only check models with an ethernet port.
- from backend.app.utils.printer_models import has_ethernet
- if has_ethernet(self.model):
- self.state.wired_network = self.state.wifi_signal == -90
- # Parse print speed level (1=silent, 2=standard, 3=sport, 4=ludicrous)
- if "spd_lvl" in data:
- new_speed = data["spd_lvl"]
- if new_speed != self.state.speed_level:
- logger.debug(
- "[%s] speed_level changed: %s -> %s", self.serial_number, self.state.speed_level, new_speed
- )
- self.state.speed_level = new_speed
- # Parse skipped objects from printer status (s_obj field)
- # This allows us to restore skipped objects state after reconnection
- if "s_obj" in data:
- s_obj = data["s_obj"]
- if isinstance(s_obj, list):
- # Update skipped objects from printer's list
- new_skipped = [int(oid) for oid in s_obj if isinstance(oid, (int, str))]
- if new_skipped != self.state.skipped_objects:
- logger.debug("[%s] skipped_objects updated from printer: %s", self.serial_number, new_skipped)
- self.state.skipped_objects = new_skipped
- # Parse chamber light status from lights_report
- if "lights_report" in data:
- lights = data["lights_report"]
- logger.debug("[%s] lights_report: %s", self.serial_number, lights)
- if isinstance(lights, list):
- for light in lights:
- if isinstance(light, dict) and light.get("node") == "chamber_light":
- new_light_state = light.get("mode") == "on"
- if new_light_state != self.state.chamber_light:
- logger.debug(
- f"[{self.serial_number}] chamber_light changed: {self.state.chamber_light} -> {new_light_state}"
- )
- self.state.chamber_light = new_light_state
- break
- # Parse nozzle hardware info (single nozzle printers)
- if "nozzle_type" in data:
- self.state.nozzles[0].nozzle_type = str(data["nozzle_type"])
- if "nozzle_diameter" in data:
- self.state.nozzles[0].nozzle_diameter = str(data["nozzle_diameter"])
- # Parse nozzle hardware info (dual nozzle printers - H2D series)
- # Left nozzle
- if "left_nozzle_type" in data:
- self.state.nozzles[0].nozzle_type = str(data["left_nozzle_type"])
- if "left_nozzle_diameter" in data:
- self.state.nozzles[0].nozzle_diameter = str(data["left_nozzle_diameter"])
- # Right nozzle
- if "right_nozzle_type" in data:
- self.state.nozzles[1].nozzle_type = str(data["right_nozzle_type"])
- if "right_nozzle_diameter" in data:
- self.state.nozzles[1].nozzle_diameter = str(data["right_nozzle_diameter"])
- # Alternative format for dual nozzle (nozzle_type_2, etc.)
- if "nozzle_type_2" in data:
- self.state.nozzles[1].nozzle_type = str(data["nozzle_type_2"])
- if "nozzle_diameter_2" in data:
- self.state.nozzles[1].nozzle_diameter = str(data["nozzle_diameter_2"])
- # H2D/H2C series: Nozzle hardware info is in device.nozzle.info array
- if "device" in data and isinstance(data["device"], dict):
- device = data["device"]
- nozzle_data = device.get("nozzle", {})
- nozzle_info = nozzle_data.get("info", [])
- if isinstance(nozzle_info, list):
- # H2 series: nozzle_info contains extended nozzle data (wear, serial,
- # max_temp, etc.) for all nozzles: L/R hotend (IDs 0,1) and rack slots
- # (IDs 16-21 on H2C). Store ALL entries so the frontend can use them
- # for hover cards on both the L/R indicator and the nozzle rack card.
- if nozzle_info:
- self.state.nozzle_rack = sorted(
- [
- {
- "id": n.get("id", i),
- "type": str(n.get("type", "")),
- "diameter": str(n.get("diameter", "")),
- "wear": n.get("wear"),
- "stat": n.get("stat"),
- # H2C uses "tm", H2D uses "max_temp"
- "max_temp": n.get("max_temp") or n.get("tm", 0),
- # H2C uses "sn", H2D uses "serial_number"
- "serial_number": str(n.get("serial_number") or n.get("sn", "")),
- # H2C uses "color_m", H2D uses "filament_colour"
- "filament_color": str(n.get("filament_colour") or n.get("color_m", "")),
- # H2C uses "fila_id", H2D uses "filament_id"
- "filament_id": str(n.get("filament_id") or n.get("fila_id", "")),
- "filament_type": str(n.get("tray_type", "") or n.get("filament_type", "")),
- }
- for i, n in enumerate(nozzle_info)
- ],
- key=lambda x: x["id"],
- )
- if not hasattr(self, "_nozzle_rack_logged") and nozzle_info:
- self._nozzle_rack_logged = True
- logger.debug(
- "[%s] Nozzle info: %d entries, IDs: %s",
- self.serial_number,
- len(nozzle_info),
- [n.get("id") for n in nozzle_info],
- )
- for nozzle in nozzle_info:
- idx = nozzle.get("id", 0)
- if idx < len(self.state.nozzles):
- if "type" in nozzle and nozzle["type"]:
- self.state.nozzles[idx].nozzle_type = str(nozzle["type"])
- if "diameter" in nozzle:
- self.state.nozzles[idx].nozzle_diameter = str(nozzle["diameter"])
- # Preserve AMS, vt_tray, ams_extruder_map, and mapping data when updating raw_data
- # (these fields aren't sent in every MQTT push, only when changed)
- ams_data = self.state.raw_data.get("ams")
- vt_tray_data = self.state.raw_data.get("vt_tray")
- ams_extruder_map_data = self.state.raw_data.get("ams_extruder_map")
- mapping_data = self.state.raw_data.get("mapping")
- # Normalize vt_tray in data before assigning to raw_data: MQTT sends it
- # as a dict but consumers expect a list. Without this, the dev mode probe
- # below can release the GIL (via publish), letting the event-loop thread
- # read raw_data["vt_tray"] as a dict and crash iterating over string keys.
- if "vt_tray" in data and isinstance(data["vt_tray"], dict):
- data["vt_tray"] = [data["vt_tray"]]
- self.state.raw_data = data
- # Restore preserved fields BEFORE any work that may release the GIL
- # (e.g. _probe_developer_mode publishes an MQTT message).
- if ams_data is not None:
- self.state.raw_data["ams"] = ams_data
- if vt_tray_data is not None:
- self.state.raw_data["vt_tray"] = vt_tray_data
- if ams_extruder_map_data is not None:
- self.state.raw_data["ams_extruder_map"] = ams_extruder_map_data
- if mapping_data is not None and "mapping" not in data:
- self.state.raw_data["mapping"] = mapping_data
- # Parse developer LAN mode from "fun" field
- if "fun" in data:
- try:
- fun_val = data["fun"]
- fun_int = fun_val if isinstance(fun_val, int) else int(fun_val, 16)
- self.state.developer_mode = (fun_int & 0x20000000) == 0
- except (ValueError, TypeError):
- pass
- elif self.state.developer_mode is None and not self._dev_mode_probed:
- # No "fun" field — A1/P1 series never send it, so we need to probe.
- # Two gates: (1) wait for a full pushall (30+ keys) so we don't probe
- # before a pushall that might contain "fun" arrives, and (2) delay 5s
- # after connect to let the MQTT session stabilize — probing too early
- # can destabilize some firmware MQTT brokers (#887).
- if not self._dev_mode_needs_probe and len(data) > 30:
- # First full status without "fun" — mark that probe is needed
- self._dev_mode_needs_probe = True
- if self._dev_mode_needs_probe and time.monotonic() - self._connect_time >= 5.0:
- self._probe_developer_mode()
- elif self._dev_mode_needs_probe:
- logger.debug(
- "[%s] Deferring developer mode probe (%.1fs since connect, need 5s)",
- self.serial_number,
- time.monotonic() - self._connect_time,
- )
- elif self._dev_mode_probed and self._dev_mode_probe_seq is not None:
- # Probe was sent but no response yet — check for timeout.
- # A half-broken MQTT session (e.g. after keep-alive timeout reconnect)
- # may deliver status pushes but silently drop commands (#887).
- elapsed = time.monotonic() - self._dev_mode_probe_time
- if elapsed > 10.0:
- self._dev_mode_probe_failures += 1
- logger.warning(
- "[%s] Developer mode probe timed out after %.0fs (attempt %d)",
- self.serial_number,
- elapsed,
- self._dev_mode_probe_failures,
- )
- self._dev_mode_probe_seq = None
- if self._dev_mode_probe_failures >= 2:
- self.force_reconnect_stale_session("developer mode probe unanswered 2×")
- else:
- # Allow retry on next full status message
- self._dev_mode_probed = False
- # Zombie session detection: if an ams_filament_setting command has been
- # pending for >10s with no response, the publish path is likely dead (#887).
- if self._last_ams_cmd_time > 0:
- elapsed = time.monotonic() - self._last_ams_cmd_time
- if elapsed > 10.0:
- self._ams_cmd_unanswered += 1
- logger.warning(
- "[%s] ams_filament_setting unanswered for %.0fs (count=%d)",
- self.serial_number,
- elapsed,
- self._ams_cmd_unanswered,
- )
- self._last_ams_cmd_time = 0.0 # don't re-trigger on next push_status
- if self._ams_cmd_unanswered >= 2:
- self.force_reconnect_stale_session("ams_filament_setting unanswered 2\u00d7")
- self._ams_cmd_unanswered = 0
- # Log mapping data when received (for usage tracking debugging)
- if "mapping" in data:
- logger.debug("[%s] MQTT mapping field: %s", self.serial_number, data["mapping"])
- # Log state transitions for debugging
- if "gcode_state" in data:
- logger.debug(
- f"[{self.serial_number}] gcode_state: {self._previous_gcode_state} -> {self.state.state}, "
- f"file: {self.state.gcode_file}, subtask: {self.state.subtask_name}"
- )
- # Detect print start (state changes TO RUNNING with a file)
- current_file = self.state.gcode_file or self.state.current_print
- is_new_print = (
- self.state.state == "RUNNING"
- and self._previous_gcode_state is not None # #1304: skip on first push after Bambuddy startup
- and self._previous_gcode_state != "RUNNING"
- and current_file
- and not self._was_running # Prevent duplicates when resuming from PAUSE
- )
- # Also detect if file changed while running (new print started)
- is_file_change = (
- self.state.state == "RUNNING"
- and current_file
- and current_file != self._previous_gcode_file
- and self._previous_gcode_file is not None
- )
- # Track RUNNING state for more robust completion detection
- running_first_observed = False
- if self.state.state == "RUNNING" and current_file:
- if not self._was_running:
- logger.debug("[%s] Now tracking RUNNING state for %s", self.serial_number, current_file)
- # Check if timelapse was enabled in the same message (xcam parsed before this)
- if self.state.timelapse:
- self._timelapse_during_print = True
- logger.debug("[%s] Timelapse detected when entering RUNNING state", self.serial_number)
- # Mark this as the first RUNNING observation of the session.
- # If is_new_print also fires below, on_print_start handles
- # baseline capture and we suppress on_print_running_observed
- # to avoid double-capture. If is_new_print does NOT fire
- # (Bambuddy started mid-print — the #1304 guard suppressed
- # it), main.py needs this hook to catch the restart-recovery
- # case (#1485 follow-up).
- running_first_observed = True
- self._was_running = True
- self._completion_triggered = False
- if is_new_print or is_file_change:
- # Clear any old HMS errors when a new print starts
- self.state.hms_errors = []
- # Reset layer tracking for new print (needed for layer-based timelapse)
- self.state.layer_num = 0
- # Reset completion tracking for new print
- self._was_running = True
- self._completion_triggered = False
- # Reset last valid progress/layer for usage tracking
- self._last_valid_progress = 0.0
- self._last_valid_layer_num = 0
- # Clear and seed tray change log for mid-print usage splitting
- self.state.tray_change_log.clear()
- tn = self.state.tray_now
- if (0 <= tn <= 15) or (128 <= tn <= 135) or tn == 254:
- self.state.tray_change_log.append((tn, 0))
- # Initialize timelapse tracking based on current state
- # NOTE: xcam data is parsed BEFORE this code runs in _process_message,
- # so self.state.timelapse may already be set from this message.
- # We preserve that value instead of blindly resetting to False.
- if self.state.timelapse:
- self._timelapse_during_print = True
- logger.debug("[%s] Timelapse detected at print start", self.serial_number)
- else:
- self._timelapse_during_print = False
- if (is_new_print or is_file_change) and self.on_print_start:
- logger.info(
- f"[{self.serial_number}] PRINT START detected - file: {current_file}, "
- f"subtask: {self.state.subtask_name}, is_new: {is_new_print}, is_file_change: {is_file_change}"
- )
- self.on_print_start(
- {
- "filename": current_file,
- "subtask_name": self.state.subtask_name,
- "remaining_time": self.state.remaining_time * 60
- if self.state.remaining_time > 0
- else None, # Convert minutes to seconds
- "raw_data": data,
- "ams_mapping": self._captured_ams_mapping,
- }
- )
- elif running_first_observed and self.on_print_running_observed:
- # Restart-recovery hook (#1485 follow-up): Bambuddy started mid-
- # print, so the #1304 first-push guard suppressed on_print_start,
- # but we still need main.py to capture a fresh timelapse baseline
- # before the printer uploads the in-flight MP4. Same payload
- # shape as on_print_start so the consumer can reuse fields.
- logger.info(
- f"[{self.serial_number}] RUNNING observed without PRINT START "
- f"(restart-recovery) - file: {current_file}, subtask: {self.state.subtask_name}"
- )
- self.on_print_running_observed(
- {
- "filename": current_file,
- "subtask_name": self.state.subtask_name,
- "remaining_time": self.state.remaining_time * 60 if self.state.remaining_time > 0 else None,
- "raw_data": data,
- "ams_mapping": self._captured_ams_mapping,
- }
- )
- # Detect print completion (FINISH = success, FAILED = error, IDLE = aborted)
- # Use _was_running flag in addition to _previous_gcode_state for more robust detection
- # This handles cases where server restarts during a print
- should_trigger_completion = (
- self.state.state in ("FINISH", "FAILED")
- and not self._completion_triggered
- and self.on_print_complete
- and (
- self._previous_gcode_state == "RUNNING" # Normal transition
- or (self._was_running and self._previous_gcode_state != self.state.state) # After server restart
- # Pre-print failure (#1111): printer rejected the job during setup
- # — wrong nozzle size, AMS error, etc. The print never reaches
- # RUNNING, so without this branch neither the RUNNING check nor
- # _was_running match and the queue item stays stuck at "printing".
- # Restricted to FAILED from pre-print states so a stale FAILED on
- # first connection (prev=None) still can't accidentally fire.
- or (self.state.state == "FAILED" and self._previous_gcode_state in ("PREPARE", "SLICING"))
- )
- )
- # For IDLE, only trigger if we just came from RUNNING (explicit abort/cancel)
- if (
- self.state.state == "IDLE"
- and self._previous_gcode_state == "RUNNING"
- and not self._completion_triggered
- and self.on_print_complete
- ):
- should_trigger_completion = True
- # Log when we FIRST see a terminal state but DON'T trigger completion (diagnostics)
- # Only log on the transition (prev != current) to avoid flooding logs every MQTT update
- if (
- not should_trigger_completion
- and self.state.state in ("FINISH", "FAILED")
- and self._previous_gcode_state != self.state.state
- ):
- logger.info(
- f"[{self.serial_number}] State is {self.state.state} but completion NOT triggered: "
- f"prev={self._previous_gcode_state}, was_running={self._was_running}, "
- f"already_triggered={self._completion_triggered}, has_callback={bool(self.on_print_complete)}"
- )
- # Mark as triggered so state is clean for the next print cycle
- self._completion_triggered = True
- if should_trigger_completion:
- if self.state.state == "FINISH":
- status = "completed"
- elif self.state.state == "FAILED":
- status = "failed"
- else:
- status = "aborted"
- logger.info(
- f"[{self.serial_number}] PRINT COMPLETE detected - state: {self.state.state}, "
- f"status: {status}, file: {self._previous_gcode_file or current_file}, "
- f"subtask: {self.state.subtask_name}, was_running: {self._was_running}, "
- f"timelapse_during_print: {self._timelapse_during_print}"
- )
- timelapse_was_active = self._timelapse_during_print
- self._completion_triggered = True
- self._was_running = False
- self._timelapse_during_print = False # Reset for next print
- # Include HMS errors for failure reason detection
- hms_errors_data = (
- [
- {"code": e.code, "attr": e.attr, "module": e.module, "severity": e.severity}
- for e in self.state.hms_errors
- ]
- if self.state.hms_errors
- else []
- )
- self.on_print_complete(
- {
- "status": status,
- "filename": self._previous_gcode_file or current_file,
- "subtask_name": self.state.subtask_name,
- "raw_data": data,
- "timelapse_was_active": timelapse_was_active,
- "hms_errors": hms_errors_data,
- "ams_mapping": self._captured_ams_mapping,
- # Last valid progress/layer before firmware reset (for partial usage tracking)
- "last_progress": self._last_valid_progress,
- "last_layer_num": self._last_valid_layer_num,
- }
- )
- self._captured_ams_mapping = None
- self._previous_gcode_state = self.state.state
- if current_file:
- self._previous_gcode_file = current_file
- if self.on_state_change:
- self.on_state_change(self.state)
- def _request_push_all(self):
- """Request full status update from printer."""
- if self._client:
- message = {"pushing": {"command": "pushall"}}
- self._client.publish(self.topic_publish, json.dumps(message), qos=1)
- def _probe_developer_mode(self):
- """Probe developer mode by sending an ams_filament_setting for the external slot.
- Some printers (A1/P1 series) never send the "fun" field in MQTT status.
- For these, we detect developer mode by sending a harmless command and
- checking whether the printer accepts or rejects it:
- - result="success" → developer mode ON (commands accepted)
- - result="failed", reason="mqtt message verify failed" → developer mode OFF
- The probe re-sends the current external slot configuration so it's a no-op
- when the command succeeds. If there's no external slot data yet, we send a
- reset (empty filament) which is also safe.
- """
- if not self._client or not self.state.connected:
- return
- self._dev_mode_probed = True
- self._dev_mode_probe_time = time.monotonic()
- self._sequence_id += 1
- seq = str(self._sequence_id)
- self._dev_mode_probe_seq = seq
- # Build probe command: re-send current external slot config (no-op on success)
- vt_tray = self.state.raw_data.get("vt_tray", []) if self.state.raw_data else []
- current = vt_tray[0] if vt_tray else {}
- command = {
- "print": {
- "command": "ams_filament_setting",
- "ams_id": 255,
- "tray_id": 0,
- "slot_id": 0,
- "tray_info_idx": current.get("tray_info_idx", ""),
- "tray_type": current.get("tray_type", ""),
- "tray_sub_brands": current.get("tray_sub_brands", ""),
- "tray_color": current.get("tray_color", "00000000"),
- "nozzle_temp_min": current.get("nozzle_temp_min", 0),
- "nozzle_temp_max": current.get("nozzle_temp_max", 0),
- "sequence_id": seq,
- }
- }
- setting_id = current.get("setting_id")
- if setting_id:
- command["print"]["setting_id"] = setting_id
- logger.info("[%s] Probing developer mode via ams_filament_setting (seq=%s)", self.serial_number, seq)
- self._client.publish(self.topic_publish, json.dumps(command), qos=1)
- def _handle_dev_mode_probe_response(self, data: dict):
- """Handle response to the developer mode probe command.
- Sets developer_mode based on whether the printer accepted or rejected the command.
- """
- self._dev_mode_probe_seq = None # One-shot: don't match future responses
- self._dev_mode_probe_failures = 0 # Reset on any response
- result = data.get("result", "")
- reason = data.get("reason", "")
- if result == "failed" and "verify failed" in reason:
- self.state.developer_mode = False
- logger.info("[%s] Developer mode probe: DISABLED (reason=%r)", self.serial_number, reason)
- else:
- # Success or any other response — commands are accepted
- self.state.developer_mode = True
- logger.info("[%s] Developer mode probe: ENABLED (result=%r)", self.serial_number, result)
- if self.on_state_change:
- self.on_state_change(self.state)
- def _request_version(self):
- """Request firmware version info from printer."""
- if self._client:
- self._sequence_id += 1
- message = {
- "info": {
- "sequence_id": str(self._sequence_id),
- "command": "get_version",
- }
- }
- logger.debug("[%s] Requesting firmware version info", self.serial_number)
- self._client.publish(self.topic_publish, json.dumps(message), qos=1)
- def request_status_update(self) -> bool:
- """Request a full status update from the printer (public API).
- Sends both pushall and get_accessories commands to refresh all data
- including nozzle hardware info.
- Returns:
- True if the request was sent, False if not connected.
- """
- if not self._client or not self.state.connected:
- logger.warning("[%s] request_status_update: not connected", self.serial_number)
- return False
- logger.debug("[%s] Requesting status update (pushall)", self.serial_number)
- self._request_push_all()
- # Note: get_accessories returns stale nozzle data on H2D.
- # The correct nozzle data comes from push_status response.
- return True
- def _request_accessories(self):
- """Request accessories info (nozzle type, etc.) from printer."""
- if self._client:
- self._sequence_id += 1
- message = {
- "system": {
- "sequence_id": str(self._sequence_id),
- "command": "get_accessories",
- "accessory_type": "none",
- }
- }
- logger.debug("[%s] Requesting accessories info", self.serial_number)
- self._client.publish(self.topic_publish, json.dumps(message), qos=1)
- def _prime_kprofile_request(self):
- """Send a priming K-profile request on connect.
- Bambu printers often ignore the first K-profile request after connection,
- so we send a dummy request on connect to 'prime' the system.
- """
- if self._client:
- self._sequence_id += 1
- command = {
- "print": {
- "command": "extrusion_cali_get",
- "filament_id": "",
- "nozzle_diameter": "0.4",
- "sequence_id": str(self._sequence_id),
- }
- }
- logger.debug("[%s] Sending K-profile priming request", self.serial_number)
- self._client.publish(self.topic_publish, json.dumps(command), qos=1)
- def connect(self, loop: asyncio.AbstractEventLoop | None = None):
- """Connect to the printer MQTT broker.
- Args:
- loop: The asyncio event loop to use for thread-safe callbacks.
- If not provided, will try to get the running loop.
- """
- self._loop = loop
- BambuMQTTClient._client_instance_counter += 1
- client_id = f"bambuddy_{self.serial_number}_{os.getpid()}_{BambuMQTTClient._client_instance_counter}"
- self._client = mqtt.Client(
- callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
- client_id=client_id,
- protocol=mqtt.MQTTv311,
- )
- # Bambu's broker has racy PUBACK matching with paho's QoS=1 inflight
- # tracking (#1164). The default ceiling of 20 wedges sessions after
- # ~16-20 cumulative commands; lifting it well above any realistic
- # session count keeps QoS=1 working without changing wire-protocol
- # behaviour across printer models.
- self._client.max_inflight_messages_set(1000)
- self._client.username_pw_set("bblp", self.access_code)
- self._client.on_connect = self._on_connect
- self._client.on_disconnect = self._on_disconnect
- self._client.on_subscribe = self._on_subscribe
- self._client.on_message = self._on_message
- # TLS setup - Bambu uses self-signed certs
- ssl_context = ssl.create_default_context()
- ssl_context.check_hostname = False
- ssl_context.verify_mode = ssl.CERT_NONE
- self._client.tls_set_context(ssl_context)
- # Backoff reconnects to avoid tight reconnect loops on unstable brokers.
- self._client.reconnect_delay_set(min_delay=1, max_delay=30)
- # Keepalive: paho sends PINGREQs at this interval, broker considers
- # client dead at 1.5x. 30s is a good balance — fast enough to detect
- # real network loss (45s), not so aggressive that transient hiccups
- # trigger false disconnects. Stale detection (60s no messages) handles
- # the P1S/P1P firmware bug where the broker stops publishing but the
- # TCP connection stays alive.
- self._client.connect_async(self.ip_address, self.MQTT_PORT, keepalive=30)
- self._client.loop_start()
- def start_print(
- self,
- filename: str,
- plate_id: int = 1,
- ams_mapping: list[int] | None = None,
- bed_levelling: bool = True,
- flow_cali: bool = False,
- vibration_cali: bool = True,
- layer_inspect: bool = False,
- timelapse: bool = False,
- use_ams: bool = True,
- ):
- """Start a print job on the printer.
- The file should already be uploaded to the printer's root directory via FTP.
- Args:
- filename: Name of the uploaded file
- plate_id: Plate number to print (default 1)
- ams_mapping: List of tray IDs for each filament slot in the 3MF.
- Global tray ID = (ams_id * 4) + slot_id, external = 254
- timelapse: Record timelapse video
- bed_levelling: Auto bed levelling before print
- flow_cali: Flow/pressure advance calibration
- vibration_cali: Vibration compensation calibration
- layer_inspect: First layer AI inspection
- use_ams: Use AMS for automatic filament changes
- """
- if self._client and self.state.connected:
- # Bambu print command format — matches Bambu Studio's format.
- # The calibration/leveling fields (timelapse, bed_leveling,
- # flow_cali, vibration_cali, layer_inspect) are JSON booleans for
- # every model. An earlier revision integer-encoded them for the H2
- # family (H2D/H2S/H2C/X2D) on the belief that H2 firmware required
- # 0/1 — but a BambuStudio request-topic capture from a real H2D
- # sends plain booleans, and the integer encoding made the H2S
- # silently skip flow-dynamics calibration (#1478). use_ams is the
- # one field that genuinely must stay boolean: H2D Pro firmware
- # reads an integer use_ams as a nozzle index (1 = deputy), which is
- # what actually caused the wrong-extruder routing behind #1386.
- # Dual-nozzle routing for external spool (254 = deputy/left,
- # 255 = main/right) and the use_ams=False fallback. H2S is in the
- # H2 firmware family but is single-nozzle, despite sharing serial
- # prefix "094" with H2D. Prefer runtime detection from
- # device.extruder.info (set in _handle_push_status); fall back to
- # model name for the brief window after connect before push data
- # arrives. _is_dual_nozzle only ever flips False→True, so it's safe
- # as the primary signal.
- from backend.app.utils.printer_models import is_dual_nozzle_model
- is_dual_nozzle = self._is_dual_nozzle or is_dual_nozzle_model(self.model)
- # Build ams_mapping2 from ams_mapping (detailed format with ams_id/slot_id)
- ams_mapping2 = []
- # BambuStudio converts virtual tray IDs (254/255) to -1 in the flat
- # ams_mapping and relies on ams_mapping2 for external spool details.
- # Passing raw 254/255 in the flat array causes H2D firmware to fail
- # with 0700_8012 "Failed to get AMS mapping table".
- flat_ams_mapping = []
- if ams_mapping is not None:
- for tray_id in ams_mapping:
- # Ensure tray_id is an integer (may be string from JSON)
- tray_id = int(tray_id) if tray_id is not None else -1
- if tray_id == -1:
- # Unmapped filament slot
- flat_ams_mapping.append(-1)
- ams_mapping2.append({"ams_id": 255, "slot_id": 255})
- elif tray_id >= 254:
- # External/virtual spool. BambuStudio convention:
- # 255 = VIRTUAL_TRAY_MAIN_ID (main/right nozzle)
- # 254 = VIRTUAL_TRAY_DEPUTY_ID (deputy/left nozzle)
- # Flat mapping must use -1 (firmware doesn't accept raw 254/255).
- # Single-nozzle printers (X1C, P1S, A1, etc.) report tray_now=254
- # for external spool, but BambuStudio always sends ams_id=255
- # (VIRTUAL_TRAY_MAIN_ID) in ams_mapping2. Sending 254 causes the
- # firmware to target AMS tray 0 instead of external spool, leading
- # to 07FF_8012 "Failed to get AMS mapping table" or stuck prints.
- # Only H2D dual-nozzle printers use 254 (deputy/left nozzle).
- flat_ams_mapping.append(-1)
- ext_ams_id = tray_id if is_dual_nozzle else 255
- ams_mapping2.append({"ams_id": ext_ams_id, "slot_id": 0})
- elif tray_id >= 128:
- # AMS-HT: global tray ID IS the ams_id (single tray per unit)
- flat_ams_mapping.append(tray_id)
- ams_mapping2.append({"ams_id": tray_id, "slot_id": 0})
- else:
- # Regular AMS tray: Global tray ID = (ams_id * 4) + slot_id
- ams_id = tray_id // 4
- slot_id = tray_id % 4
- flat_ams_mapping.append(tray_id)
- ams_mapping2.append({"ams_id": ams_id, "slot_id": slot_id})
- # If all mapped slots are external spool (no real AMS trays), force use_ams=False.
- # P1S/P1P with no AMS rejects use_ams=True with "Failed to get AMS mapping table".
- # Skip for dual-nozzle printers — use_ams controls nozzle routing there.
- # H2S falls through this gate now (#1386): it is single-nozzle and was
- # hitting the dual-nozzle bypass, which caused 07FF_8012 when printing
- # without an AMS attached.
- if ams_mapping and use_ams and not is_dual_nozzle:
- if all(t is None or int(t) < 0 or int(t) >= 254 for t in ams_mapping):
- use_ams = False
- logger.info(
- "[%s] All filament slots use external spool — setting use_ams=False",
- self.serial_number,
- )
- # Unique per-submission identity fields. Hardcoded "0" values caused
- # third-party MQTT observers (OctoEverywhere, etc.) to see reprints as
- # continuations of the same job: the printer reuses gcode_start_time
- # from the prior print with task_id=0, so observers latch onto a stale
- # timestamp and report compounding durations on repeat replays (#1011).
- # BambuStudio mints fresh IDs per submission; matching that behavior
- # makes the printer emit a clean state-transition for each job.
- # md5 is left empty — firmware historically accepts "" as "skip
- # validation" (unlike Studio, we don't have the file's real md5 here
- # without re-reading the upload, and sending a synthetic wrong digest
- # risks activation of md5 verification on some firmwares).
- # Cap at signed int32 max: P1S firmware (01.10.00.00) clamps oversized
- # task identity fields to 2**31-1, so raw epoch-ms (13 digits, ~1.7e12)
- # overflows and every submission ends up with the same task_id from
- # the printer's perspective — the printer then treats a fresh dispatch
- # as a continuation of the last FAILED job and never leaves IDLE (#1042).
- # Modulo keeps uniqueness within a ~24-day wrap window; `or 1` guards
- # the (astronomically unlikely) zero case since task_id=0 is rejected.
- submission_id = str(int(time.time() * 1000) % 2_147_483_647 or 1)
- # Remember it so on_print_start can persist a restart-stable id on
- # the archive even before the printer echoes subtask_id back (#1485).
- self.last_dispatch_subtask_id = submission_id
- command = {
- "print": {
- "sequence_id": "20000",
- "command": "project_file",
- "param": f"Metadata/plate_{plate_id}.gcode",
- "url": f"ftp://{filename}",
- "file": filename,
- "md5": "",
- "bed_type": "auto",
- "timelapse": timelapse,
- "bed_leveling": bed_levelling,
- "auto_bed_leveling": 1 if bed_levelling else 0,
- "flow_cali": flow_cali,
- "vibration_cali": vibration_cali,
- "layer_inspect": layer_inspect,
- "use_ams": use_ams,
- "cfg": "0",
- # extrude_cali_flag gates flow-dynamics calibration:
- # 1 = run it, 2 = skip and reuse the stored PA value.
- # BambuStudio always pairs this with flow_cali and never
- # sends 0; a hardcoded 0 made the printer skip calibration
- # regardless of the flow_cali toggle (#1478).
- "extrude_cali_flag": 1 if flow_cali else 2,
- "extrude_cali_manual_mode": 0,
- "nozzle_offset_cali": 2,
- "subtask_name": filename.replace(".3mf", "").replace(".gcode", ""),
- "profile_id": "0",
- "project_id": submission_id,
- "subtask_id": submission_id,
- "task_id": submission_id,
- }
- }
- # P2S-specific parameter adjustments
- # P2S printer doesn't support vibration calibration like X1/P1 series
- if self.model and self.model.upper().strip() in ("P2S", "N7"):
- command["print"]["vibration_cali"] = False
- logger.debug("[%s] P2S detected: disabling vibration_cali", self.serial_number)
- # Add AMS mapping if provided
- if ams_mapping is not None:
- command["print"]["ams_mapping"] = flat_ams_mapping
- command["print"]["ams_mapping2"] = ams_mapping2
- logger.info("[%s] Sending print command: %s", self.serial_number, json.dumps(command))
- self._client.publish(self.topic_publish, json.dumps(command), qos=1)
- # Record what we dispatched so /cover can pick the right plate
- # thumbnail even when the printer's gcode_file echo is just the
- # 3MF filename without a plate path (#1166). Match the same
- # subtask_name shape we send so the comparison in the cover route
- # works against state.subtask_name reflected back via MQTT.
- self.state.dispatched_plate_id = plate_id
- self.state.dispatched_subtask = command["print"]["subtask_name"]
- return True
- else:
- # Log why we couldn't send the command
- if not self._client:
- logger.error("[%s] Cannot start print: MQTT client not initialized", self.serial_number)
- elif not self.state.connected:
- logger.error(
- f"[{self.serial_number}] Cannot start print: Printer not connected (client exists but disconnected). "
- f"Connection state: {self.state.connected}, Last message: {self._last_message_time}"
- )
- return False
- def stop_print(self) -> bool:
- """Stop the current print job."""
- if self._client and self.state.connected:
- command = {"print": {"command": "stop", "sequence_id": "0"}}
- self._client.publish(self.topic_publish, json.dumps(command), qos=1)
- logger.info("[%s] Sent stop print command", self.serial_number)
- return True
- return False
- def set_xcam_option(
- self, module_name: str, enabled: bool, print_halt: bool = True, sensitivity: str = "medium"
- ) -> bool:
- """Set an xcam (AI detection) option on the printer.
- Args:
- module_name: The xcam module to control (e.g., "spaghetti_detector",
- "first_layer_inspector", "printing_monitor", "buildplate_marker_detector")
- enabled: Whether to enable or disable the feature
- print_halt: Whether to halt print on detection (only applies to some detectors)
- sensitivity: Sensitivity level ("low", "medium", "high", or "never_halt")
- Returns:
- True if command was sent, False if not connected
- """
- if not self._client or not self.state.connected:
- return False
- # auto_recovery_step_loss uses a different command format (print.print_option)
- if module_name == "auto_recovery_step_loss":
- return self._set_print_option("auto_recovery", enabled)
- self._sequence_id += 1
- # Build the xcam control command (exact OrcaSlicer format)
- # Key findings from OrcaSlicer source:
- # - Uses "xcam" wrapper (not "print")
- # - print_halt is ALWAYS true (legacy protocol requirement)
- # - Both "control" and "enable" are set to the same value
- # - halt_print_sensitivity controls actual halt behavior
- command = {
- "xcam": {
- "command": "xcam_control_set",
- "sequence_id": str(self._sequence_id),
- "module_name": module_name,
- "control": enabled,
- "enable": enabled, # old protocol compatibility
- "print_halt": True, # ALWAYS true per OrcaSlicer
- }
- }
- # Only add sensitivity if not "never_halt"
- # OrcaSlicer uses halt_print_sensitivity for ALL detectors
- # The module_name field determines which detector's sensitivity is being set
- if sensitivity and sensitivity != "never_halt":
- command["xcam"]["halt_print_sensitivity"] = sensitivity
- command_json = json.dumps(command)
- self._client.publish(self.topic_publish, command_json, qos=1)
- logger.debug(
- "[%s] Set xcam option: %s=%s, sensitivity=%s", self.serial_number, module_name, enabled, sensitivity
- )
- logger.debug("[%s] MQTT command sent: %s", self.serial_number, command_json)
- # OrcaSlicer pattern: Set hold timer to ignore incoming data for 3 seconds
- # This prevents stale MQTT data from immediately overwriting our change
- self._xcam_hold_start[module_name] = time.time()
- # Update local state immediately for responsive UI
- # NOTE: Spaghetti and Pileup sensitivities are linked in firmware
- # When spaghetti_detector sensitivity is changed, pileup also changes
- if module_name == "spaghetti_detector":
- self.state.print_options.spaghetti_detector = enabled
- self.state.print_options.print_halt = print_halt
- if sensitivity and sensitivity != "never_halt":
- # spaghetti_detector controls BOTH spaghetti and pileup sensitivities
- self.state.print_options.halt_print_sensitivity = sensitivity
- self.state.print_options.pileup_sensitivity = sensitivity
- self._xcam_hold_start["halt_print_sensitivity"] = time.time()
- self._xcam_hold_start["pileup_sensitivity"] = time.time()
- elif module_name == "first_layer_inspector":
- self.state.print_options.first_layer_inspector = enabled
- elif module_name == "printing_monitor":
- self.state.print_options.printing_monitor = enabled
- elif module_name == "buildplate_marker_detector":
- self.state.print_options.buildplate_marker_detector = enabled
- elif module_name == "allow_skip_parts":
- self.state.print_options.allow_skip_parts = enabled
- elif module_name == "pileup_detector":
- self.state.print_options.pileup_detector = enabled
- # Pileup sensitivity is linked to spaghetti - both are set via spaghetti_detector
- elif module_name == "clump_detector":
- self.state.print_options.nozzle_clumping_detector = enabled
- if sensitivity and sensitivity != "never_halt":
- self.state.print_options.nozzle_clumping_sensitivity = sensitivity
- self._xcam_hold_start["nozzle_clumping_sensitivity"] = time.time()
- elif module_name == "airprint_detector":
- self.state.print_options.airprint_detector = enabled
- if sensitivity and sensitivity != "never_halt":
- self.state.print_options.airprint_sensitivity = sensitivity
- self._xcam_hold_start["airprint_sensitivity"] = time.time()
- elif module_name == "auto_recovery_step_loss":
- self.state.print_options.auto_recovery_step_loss = enabled
- return True
- def _set_print_option(self, option_name: str, enabled: bool) -> bool:
- """Set a print option using the print.print_option command.
- This is different from xcam_control_set and is used for options like:
- - auto_recovery
- - air_print_detect
- - filament_tangle_detect
- - nozzle_blob_detect
- - sound_enable
- Args:
- option_name: The option to control (e.g., "auto_recovery")
- enabled: Whether to enable or disable the option
- Returns:
- True if command was sent, False if not connected
- """
- if not self._client or not self.state.connected:
- return False
- self._sequence_id += 1
- command = {
- "print": {
- "command": "print_option",
- "sequence_id": str(self._sequence_id),
- option_name: enabled,
- }
- }
- command_json = json.dumps(command)
- self._client.publish(self.topic_publish, command_json, qos=1)
- logger.debug("[%s] Set print option: %s=%s", self.serial_number, option_name, enabled)
- # Set hold timer
- hold_key = f"print_option_{option_name}"
- self._xcam_hold_start[hold_key] = time.time()
- # Update local state immediately
- if option_name == "auto_recovery":
- self.state.print_options.auto_recovery_step_loss = enabled
- return True
- def start_calibration(
- self,
- bed_leveling: bool = False,
- vibration: bool = False,
- motor_noise: bool = False,
- nozzle_offset: bool = False,
- high_temp_heatbed: bool = False,
- ) -> bool:
- """Start printer calibration with selected options.
- Args:
- bed_leveling: Run bed leveling calibration
- vibration: Run vibration compensation calibration
- motor_noise: Run motor noise cancellation calibration
- nozzle_offset: Run nozzle offset calibration (dual nozzle printers)
- high_temp_heatbed: Run high-temperature heatbed calibration
- Returns:
- True if command was sent, False if not connected
- """
- if not self._client or not self.state.connected:
- return False
- # Build calibration bitmask based on OrcaSlicer DeviceManager.cpp
- # Bit 0: xcam_cali (not exposed in UI)
- # Bit 1: bed_leveling
- # Bit 2: vibration
- # Bit 3: motor_noise
- # Bit 4: nozzle_cali
- # Bit 5: bed_cali (high-temp heatbed)
- # Bit 6: clumppos_cali (not exposed in UI)
- option = 0
- if bed_leveling:
- option |= 1 << 1
- if vibration:
- option |= 1 << 2
- if motor_noise:
- option |= 1 << 3
- if nozzle_offset:
- option |= 1 << 4
- if high_temp_heatbed:
- option |= 1 << 5
- if option == 0:
- logger.warning("[%s] No calibration options selected", self.serial_number)
- return False
- self._sequence_id += 1
- command = {
- "print": {
- "command": "calibration",
- "sequence_id": str(self._sequence_id),
- "option": option,
- }
- }
- command_json = json.dumps(command)
- self._client.publish(self.topic_publish, command_json, qos=1)
- logger.info(
- f"[{self.serial_number}] Starting calibration: "
- f"bed_leveling={bed_leveling}, vibration={vibration}, "
- f"motor_noise={motor_noise}, nozzle_offset={nozzle_offset}, "
- f"high_temp_heatbed={high_temp_heatbed} (option={option})"
- )
- return True
- def disconnect(self, timeout: float = 0):
- """Disconnect from the printer."""
- if self._client:
- self._disconnection_event = threading.Event()
- self._client.disconnect()
- self._disconnection_event.wait(timeout=timeout)
- self._client.loop_stop()
- self._client = None
- self.state.connected = False
- def send_command(self, command: dict):
- """Send a command to the printer."""
- if self._client and self.state.connected:
- # Log outgoing message if logging is enabled
- if self._logging_enabled:
- self._message_log.append(
- MQTTLogEntry(
- timestamp=datetime.now(timezone.utc).isoformat(),
- topic=self.topic_publish,
- direction="out",
- payload=command,
- )
- )
- self._client.publish(self.topic_publish, json.dumps(command), qos=1)
- def enable_logging(self, enabled: bool = True):
- """Enable or disable MQTT message logging."""
- self._logging_enabled = enabled
- # Don't clear logs when stopping - user can manually clear with clear_logs()
- def get_logs(self) -> list[MQTTLogEntry]:
- """Get all logged MQTT messages."""
- return list(self._message_log)
- def clear_logs(self):
- """Clear the message log."""
- self._message_log.clear()
- @property
- def logging_enabled(self) -> bool:
- """Check if logging is enabled."""
- return self._logging_enabled
- def register_raw_message_handler(self, handler: Callable[[str, bytes], None]) -> None:
- """Register a handler invoked for every incoming MQTT message.
- Used by the VP MQTT bridge to republish the printer's report pushes to
- slicers connected to a virtual printer in non-proxy mode. Handlers run
- on paho's network thread and must not block; exceptions are caught.
- """
- if handler not in self._raw_message_handlers:
- self._raw_message_handlers.append(handler)
- def unregister_raw_message_handler(self, handler: Callable[[str, bytes], None]) -> None:
- """Unregister a previously-registered raw-message handler."""
- try:
- self._raw_message_handlers.remove(handler)
- except ValueError:
- pass
- def publish_raw(self, topic: str, payload: bytes | str, qos: int = 1) -> bool:
- """Publish a pre-formed payload directly to the printer's MQTT broker.
- Used by the VP MQTT bridge to forward slicer-originated commands without
- going through send_command's sequence-id mangling. Returns False if the
- underlying paho client isn't ready.
- """
- if self._client is None:
- return False
- try:
- info = self._client.publish(topic, payload, qos=qos)
- return info.rc == mqtt.MQTT_ERR_SUCCESS
- except Exception:
- logger.exception("[%s] publish_raw failed for topic=%s", self.serial_number, topic)
- return False
- def send_drying_command(
- self, ams_id: int, temp: int, duration: int, mode: int = 1, filament: str = "", rotate_tray: bool = False
- ):
- """Send AMS drying start/stop command.
- Args:
- ams_id: AMS unit ID (0-3 for AMS 2 Pro, 128-135 for AMS-HT)
- temp: Target drying temperature (45-65 for AMS 2 Pro, 45-85 for AMS-HT)
- duration: Drying duration in hours
- mode: 1=start, 0=stop
- filament: Filament type string (e.g. "PLA", "PETG")
- rotate_tray: Whether to rotate the spool during drying for even heat
- """
- if not self._client:
- return False
- self._sequence_id += 1
- command = {
- "print": {
- "sequence_id": str(self._sequence_id),
- "command": "ams_filament_drying",
- "ams_id": ams_id,
- "temp": temp,
- "cooling_temp": 20 if mode == 1 else 0,
- "duration": duration,
- "humidity": 0,
- "mode": mode,
- "rotate_tray": rotate_tray,
- "filament": filament,
- "close_power_conflict": False,
- }
- }
- # Log the full wire JSON at INFO so support bundles capture exactly
- # what we sent — needed to diagnose silent rejections (#1447) where
- # the printer ACKs the command but never starts/stops drying.
- # Paired with the ams_filament_drying response-payload INFO log so
- # both halves of the conversation land in the bundle by default.
- wire_json = json.dumps(command)
- self._client.publish(self.topic_publish, wire_json, qos=1)
- logger.info(
- "[%s] Sent ams_filament_drying: %s",
- self.serial_number,
- wire_json,
- )
- return True
- def _handle_kprofile_response(self, data: dict):
- """Handle K-profile response from printer."""
- response_nozzle = data.get("nozzle_diameter")
- response_seq_id = data.get("sequence_id", "?")
- filaments = data.get("filaments", [])
- expected_nozzle = getattr(self, "_expected_kprofile_nozzle", None)
- has_pending_request = self._pending_kprofile_response is not None
- # Log all incoming responses when we have a pending request (for debugging)
- if has_pending_request:
- logger.info(
- f"[{self.serial_number}] K-profile response: nozzle={response_nozzle}, "
- f"seq_id={response_seq_id}, {len(filaments)} profiles, expected={expected_nozzle}"
- )
- # If we have a pending request, only accept responses with matching nozzle_diameter
- # The printer broadcasts 0.4mm profiles constantly - we need to wait for the actual response
- if has_pending_request and expected_nozzle and response_nozzle != expected_nozzle:
- # Ignore this broadcast, keep waiting for matching response
- logger.debug(
- f"[{self.serial_number}] Ignoring broadcast: got nozzle={response_nozzle}, waiting for {expected_nozzle}"
- )
- return
- # If no pending request, this is just a broadcast - update state silently and return early
- if not has_pending_request:
- # Still parse profiles to keep state updated, but don't log
- profiles = []
- for f in filaments:
- if isinstance(f, dict):
- try:
- cali_idx = f.get("cali_idx", 0)
- profiles.append(
- KProfile(
- slot_id=cali_idx,
- extruder_id=int(f.get("extruder_id", 0)),
- nozzle_id=str(f.get("nozzle_id", "")),
- nozzle_diameter=str(f.get("nozzle_diameter", "0.4")),
- filament_id=str(f.get("filament_id", "")),
- name=str(f.get("name", "")),
- k_value=str(f.get("k_value", "0.000000")),
- n_coef=str(f.get("n_coef", "0.000000")),
- ams_id=int(f.get("ams_id", 0)),
- tray_id=int(f.get("tray_id", -1)),
- setting_id=f.get("setting_id"),
- )
- )
- except (ValueError, TypeError):
- pass # Skip malformed K-profile entries; remaining profiles still usable
- self.state.kprofiles = profiles
- return
- profiles = []
- for i, f in enumerate(filaments):
- if isinstance(f, dict):
- try:
- # cali_idx is the actual slot/calibration index from the printer
- cali_idx = f.get("cali_idx", i)
- profiles.append(
- KProfile(
- slot_id=cali_idx,
- extruder_id=int(f.get("extruder_id", 0)),
- nozzle_id=str(f.get("nozzle_id", "")),
- nozzle_diameter=str(f.get("nozzle_diameter", "0.4")),
- filament_id=str(f.get("filament_id", "")),
- name=str(f.get("name", "")),
- k_value=str(f.get("k_value", "0.000000")),
- n_coef=str(f.get("n_coef", "0.000000")),
- ams_id=int(f.get("ams_id", 0)),
- tray_id=int(f.get("tray_id", -1)),
- setting_id=f.get("setting_id"),
- )
- )
- except (ValueError, TypeError) as e:
- logger.warning("Failed to parse K-profile: %s", e)
- self.state.kprofiles = profiles
- self._kprofile_response_data = profiles
- # Signal that we received the response (only if we were waiting for one)
- # Use thread-safe method since MQTT callbacks run in a different thread
- # Capture in local var to avoid TOCTOU race: asyncio thread can clear
- # self._pending_kprofile_response between the check and the .set() call
- event = self._pending_kprofile_response
- if event:
- logger.info("[%s] Got %s K-profiles for nozzle=%s", self.serial_number, len(profiles), response_nozzle)
- if self._loop and self._loop.is_running():
- self._loop.call_soon_threadsafe(event.set)
- else:
- # Fallback for when loop is not available
- event.set()
- async def get_kprofiles(
- self, nozzle_diameter: str = "0.4", timeout: float = 5.0, max_retries: int = 3
- ) -> list[KProfile]:
- """Request K-profiles from the printer with retry logic.
- Bambu printers sometimes ignore the first K-profile request, so we
- implement retry logic to ensure reliable retrieval.
- Args:
- nozzle_diameter: Filter by nozzle diameter (e.g., "0.4")
- timeout: Timeout in seconds to wait for each response attempt
- max_retries: Maximum number of retry attempts
- Returns:
- List of KProfile objects
- """
- if not self._client or not self.state.connected:
- logger.warning("[%s] Cannot get K-profiles: not connected", self.serial_number)
- return []
- # Capture current event loop for thread-safe callback
- try:
- self._loop = asyncio.get_running_loop()
- except RuntimeError:
- logger.warning("[%s] No running event loop", self.serial_number)
- return []
- for attempt in range(max_retries):
- # Set up response event for this attempt
- self._sequence_id += 1
- self._pending_kprofile_response = asyncio.Event()
- self._kprofile_response_data = None
- self._expected_kprofile_nozzle = nozzle_diameter # Track which nozzle response we expect
- # Send the command with nozzle_diameter filter
- command = {
- "print": {
- "command": "extrusion_cali_get",
- "filament_id": "",
- "nozzle_diameter": nozzle_diameter,
- "sequence_id": str(self._sequence_id),
- }
- }
- logger.info(
- f"[{self.serial_number}] Requesting K-profiles for nozzle_diameter={nozzle_diameter} (attempt {attempt + 1}/{max_retries})"
- )
- logger.debug("[%s] K-profile request JSON: %s", self.serial_number, json.dumps(command))
- self._client.publish(self.topic_publish, json.dumps(command), qos=1)
- # Wait for response (response handler already filters by nozzle_diameter)
- try:
- await asyncio.wait_for(self._pending_kprofile_response.wait(), timeout=timeout)
- profiles = self._kprofile_response_data or []
- logger.info(
- f"[{self.serial_number}] Got {len(profiles)} K-profiles for nozzle={nozzle_diameter} on attempt {attempt + 1}"
- )
- return profiles
- except TimeoutError:
- logger.warning(
- f"[{self.serial_number}] Timeout on K-profiles request attempt {attempt + 1}/{max_retries}"
- )
- if attempt < max_retries - 1:
- # Brief delay before retry
- await asyncio.sleep(0.5)
- finally:
- self._pending_kprofile_response = None
- self._expected_kprofile_nozzle = None
- logger.error("[%s] Failed to get K-profiles after %s attempts", self.serial_number, max_retries)
- return []
- def set_kprofile(
- self,
- filament_id: str,
- name: str,
- k_value: str,
- nozzle_diameter: str = "0.4",
- nozzle_id: str = "HS00-0.4",
- extruder_id: int = 0,
- setting_id: str | None = None,
- slot_id: int = 0,
- cali_idx: int | None = None,
- ) -> bool:
- """Set/update a K-profile on the printer.
- Args:
- filament_id: Bambu filament identifier
- name: Profile name
- k_value: Pressure advance value (e.g., "0.020000")
- nozzle_diameter: Nozzle diameter (e.g., "0.4")
- nozzle_id: Nozzle identifier (e.g., "HS00-0.4")
- extruder_id: Extruder ID (0 or 1 for dual nozzle)
- setting_id: Existing setting ID for updates, None for new
- slot_id: Calibration index (cali_idx) for the profile
- cali_idx: For edits, the existing slot being edited (enables in-place edit)
- Returns:
- True if command was sent, False otherwise
- """
- if not self._client or not self.state.connected:
- logger.warning("[%s] Cannot set K-profile: not connected", self.serial_number)
- return False
- self._sequence_id += 1
- # Build the filament entry - printer uses cali_idx for profile identification
- # For new profiles (slot_id=0), use cali_idx=-1 to tell printer to create new slot
- # For edits, use the provided cali_idx or slot_id
- if cali_idx is not None:
- effective_cali_idx = cali_idx
- else:
- effective_cali_idx = -1 if slot_id == 0 else slot_id
- # Generate a setting_id for new profiles (required by printer)
- # Format: "PF" + 17 random digits
- import random
- if not setting_id and slot_id == 0:
- setting_id = f"PF{random.randint(10000000000000000, 99999999999999999)}"
- filament_entry = {
- "ams_id": 0,
- "cali_idx": effective_cali_idx,
- "extruder_id": extruder_id,
- "filament_id": filament_id,
- "k_value": k_value,
- "n_coef": "0.000000",
- "name": name,
- "nozzle_diameter": nozzle_diameter,
- "nozzle_id": nozzle_id,
- "setting_id": setting_id if setting_id else "",
- "tray_id": -1,
- }
- command = {
- "print": {
- "command": "extrusion_cali_set",
- "filaments": [filament_entry],
- "nozzle_diameter": nozzle_diameter,
- "sequence_id": str(self._sequence_id),
- }
- }
- command_json = json.dumps(command)
- logger.info(
- f"[{self.serial_number}] Setting K-profile: {name} = {k_value} (cali_idx={effective_cali_idx}, new={slot_id == 0})"
- )
- logger.debug("[%s] K-profile SET command: %s", self.serial_number, command_json)
- self._client.publish(self.topic_publish, command_json, qos=1)
- return True
- def set_kprofiles_batch(
- self,
- profiles: list[dict],
- nozzle_diameter: str = "0.4",
- ) -> bool:
- """Set multiple K-profiles in a single command (for dual-nozzle).
- Args:
- profiles: List of profile dicts, each with:
- - filament_id, name, k_value, nozzle_id, extruder_id, setting_id (optional), slot_id
- nozzle_diameter: Common nozzle diameter for all profiles
- Returns:
- True if command was sent, False otherwise
- """
- if not self._client or not self.state.connected:
- logger.warning("[%s] Cannot set K-profiles batch: not connected", self.serial_number)
- return False
- import random
- self._sequence_id += 1
- filament_entries = []
- for p in profiles:
- slot_id = p.get("slot_id", 0)
- cali_idx = p.get("cali_idx")
- if cali_idx is not None:
- effective_cali_idx = cali_idx
- else:
- effective_cali_idx = -1 if slot_id == 0 else slot_id
- setting_id = p.get("setting_id")
- if not setting_id and slot_id == 0:
- setting_id = f"PF{random.randint(10000000000000000, 99999999999999999)}"
- filament_entries.append(
- {
- "ams_id": 0,
- "cali_idx": effective_cali_idx,
- "extruder_id": p.get("extruder_id", 0),
- "filament_id": p.get("filament_id", ""),
- "k_value": p.get("k_value", "0.020000"),
- "n_coef": "0.000000",
- "name": p.get("name", ""),
- "nozzle_diameter": nozzle_diameter,
- "nozzle_id": p.get("nozzle_id", f"HS00-{nozzle_diameter}"),
- "setting_id": setting_id if setting_id else "",
- "tray_id": -1,
- }
- )
- command = {
- "print": {
- "command": "extrusion_cali_set",
- "filaments": filament_entries,
- "nozzle_diameter": nozzle_diameter,
- "sequence_id": str(self._sequence_id),
- }
- }
- command_json = json.dumps(command)
- logger.info("[%s] Setting %s K-profiles in batch", self.serial_number, len(filament_entries))
- logger.debug("[%s] K-profile SET batch command: %s", self.serial_number, command_json)
- self._client.publish(self.topic_publish, command_json, qos=1)
- return True
- def delete_kprofile(
- self,
- cali_idx: int,
- filament_id: str,
- nozzle_id: str,
- nozzle_diameter: str = "0.4",
- extruder_id: int = 0,
- setting_id: str | None = None,
- ) -> bool:
- """Delete a K-profile from the printer.
- Args:
- cali_idx: The calibration index (slot_id) of the profile to delete
- filament_id: Bambu filament identifier
- nozzle_id: Nozzle identifier (e.g., "HH00-0.4")
- nozzle_diameter: Nozzle diameter (e.g., "0.4")
- extruder_id: Extruder ID (0 or 1 for dual nozzle)
- setting_id: Unique setting identifier (for X1C series)
- Returns:
- True if command was sent, False otherwise
- """
- if not self._client or not self.state.connected:
- logger.warning("[%s] Cannot delete K-profile: not connected", self.serial_number)
- return False
- self._sequence_id += 1
- # Dual-nozzle K-profile delete uses the extruder_id/nozzle_id format;
- # single-nozzle printers (X1C/P1/A1/P2S/H2S) need the setting_id form.
- # Prefer runtime detection from device.extruder.info; fall back to
- # model name. H2S is single-nozzle but shares serial prefix "094" with
- # H2D, so a prefix-only check misclassified it (#1386).
- from backend.app.utils.printer_models import is_dual_nozzle_model
- is_dual_nozzle = self._is_dual_nozzle or is_dual_nozzle_model(self.model)
- if is_dual_nozzle:
- # H2D format: uses extruder_id, nozzle_id, nozzle_diameter
- command = {
- "print": {
- "command": "extrusion_cali_del",
- "sequence_id": str(self._sequence_id),
- "extruder_id": extruder_id,
- "nozzle_id": nozzle_id,
- "filament_id": filament_id,
- "cali_idx": cali_idx,
- "nozzle_diameter": nozzle_diameter,
- }
- }
- else:
- # X1C/P1/A1 format: include all fields like the set command
- # The delete command structure should match what set uses
- command = {
- "print": {
- "command": "extrusion_cali_del",
- "sequence_id": str(self._sequence_id),
- "filament_id": filament_id,
- "cali_idx": cali_idx,
- "setting_id": setting_id if setting_id else "",
- "nozzle_diameter": nozzle_diameter,
- "nozzle_id": nozzle_id,
- "extruder_id": extruder_id,
- }
- }
- command_json = json.dumps(command)
- logger.info(
- f"[{self.serial_number}] Deleting K-profile: cali_idx={cali_idx}, filament={filament_id}, setting_id={setting_id}, dual={is_dual_nozzle}"
- )
- logger.debug("[%s] K-profile DELETE command: %s", self.serial_number, command_json)
- # Use QoS 1 for reliable delivery (at least once)
- self._client.publish(self.topic_publish, command_json, qos=1)
- return True
- # =========================================================================
- # Printer Control Commands
- # =========================================================================
- def pause_print(self) -> bool:
- """Pause the current print job."""
- if not self._client or not self.state.connected:
- logger.warning("[%s] Cannot pause print: not connected", self.serial_number)
- return False
- command = {"print": {"command": "pause", "sequence_id": "0"}}
- self._client.publish(self.topic_publish, json.dumps(command), qos=1)
- logger.info("[%s] Sent pause print command", self.serial_number)
- return True
- def resume_print(self) -> bool:
- """Resume a paused print job."""
- if not self._client or not self.state.connected:
- logger.warning("[%s] Cannot resume print: not connected", self.serial_number)
- return False
- command = {"print": {"command": "resume", "sequence_id": "0"}}
- self._client.publish(self.topic_publish, json.dumps(command), qos=1)
- logger.info("[%s] Sent resume print command", self.serial_number)
- return True
- def clear_hms_errors(self) -> bool:
- """Clear HMS/print errors on the printer and locally."""
- if not self._client or not self.state.connected:
- logger.warning("[%s] Cannot clear HMS errors: not connected", self.serial_number)
- return False
- command = {"print": {"command": "clean_print_error", "sequence_id": "0"}}
- self._client.publish(self.topic_publish, json.dumps(command), qos=1)
- self.state.hms_errors = []
- logger.info("[%s] Sent clear HMS errors command", self.serial_number)
- return True
- def skip_objects(self, object_ids: list[int]) -> bool:
- """Skip specific objects during a print.
- This command tells the printer to skip printing the specified objects.
- The object IDs come from the slice_info.config file in the 3MF.
- Args:
- object_ids: List of identify_id values from slice_info.config
- Returns:
- True if command was sent, False otherwise
- """
- if not self._client or not self.state.connected:
- logger.warning("[%s] Cannot skip objects: not connected", self.serial_number)
- return False
- if self.state.state != "RUNNING" and self.state.state != "PAUSE":
- logger.warning(
- f"[{self.serial_number}] Cannot skip objects: printer not printing (state={self.state.state})"
- )
- return False
- if not object_ids:
- logger.warning("[%s] Cannot skip objects: no object IDs provided", self.serial_number)
- return False
- # Validate all IDs are integers
- try:
- obj_list = [int(oid) for oid in object_ids]
- except (ValueError, TypeError) as e:
- logger.warning("[%s] Invalid object IDs: %s", self.serial_number, e)
- return False
- self._sequence_id += 1
- command = {"print": {"sequence_id": str(self._sequence_id), "command": "skip_objects", "obj_list": obj_list}}
- self._client.publish(self.topic_publish, json.dumps(command), qos=1)
- logger.info("[%s] Sent skip_objects command: %s", self.serial_number, obj_list)
- # Track skipped objects in state
- for oid in obj_list:
- if oid not in self.state.skipped_objects:
- self.state.skipped_objects.append(oid)
- return True
- def send_gcode(self, gcode: str) -> bool:
- """Send G-code command(s) to the printer.
- Multiple commands can be separated by newlines.
- Args:
- gcode: G-code command(s) to send
- Returns:
- True if command was sent, False otherwise
- """
- if not self._client or not self.state.connected:
- logger.warning("[%s] Cannot send G-code: not connected", self.serial_number)
- return False
- self._sequence_id += 1
- command = {"print": {"command": "gcode_line", "param": gcode, "sequence_id": str(self._sequence_id)}}
- # Use QoS 1 for reliable delivery (at least once)
- self._client.publish(self.topic_publish, json.dumps(command), qos=1)
- logger.debug("[%s] Sent G-code: %s...", self.serial_number, gcode[:50])
- return True
- def set_bed_temperature(self, target: int) -> bool:
- """Set the bed target temperature.
- Args:
- target: Target temperature in Celsius (0 to turn off)
- Returns:
- True if command was sent, False otherwise
- """
- return self.send_gcode(f"M140 S{target}")
- def set_nozzle_temperature(self, target: int, nozzle: int = 0) -> bool:
- """Set the nozzle target temperature.
- Args:
- target: Target temperature in Celsius (0 to turn off)
- nozzle: Nozzle index (0 for right/default, 1 for left on H2D)
- Returns:
- True if command was sent, False otherwise
- """
- # Use M104 for non-blocking
- # Always use T parameter for H2D compatibility
- result = self.send_gcode(f"M104 T{nozzle} S{target}")
- # H2D quirk: left nozzle (nozzle=1) target isn't reported in MQTT
- # Track it locally so we can display it correctly
- if result and nozzle == 1:
- self.state.temperatures["nozzle_target"] = float(target)
- self.state.temperatures["_nozzle_target_set_time"] = time.time()
- logger.info("[%s] Tracking LEFT nozzle target locally: %s°C", self.serial_number, target)
- return result
- def set_chamber_temperature(self, target: int) -> bool:
- """Set the chamber target temperature.
- Args:
- target: Target temperature in Celsius (0 to turn off heating)
- Returns:
- True if command was sent, False otherwise
- """
- # M141 sets chamber temperature
- result = self.send_gcode(f"M141 S{target}")
- # Track chamber target locally (MQTT reports encoded values that need filtering)
- if result:
- self.state.temperatures["chamber_target"] = float(target)
- self.state.temperatures["_chamber_target_set_time"] = time.time()
- # Update heating state immediately based on new target
- current_temp = self.state.temperatures.get("chamber", 0)
- self.state.temperatures["chamber_heating"] = target > 0 and current_temp < target
- logger.info(
- f"[{self.serial_number}] Tracking chamber target locally: {target}°C (heating={self.state.temperatures['chamber_heating']})"
- )
- return result
- def set_print_speed(self, mode: int) -> bool:
- """Set the print speed mode.
- Args:
- mode: Speed mode (1=silent, 2=standard, 3=sport, 4=ludicrous)
- Returns:
- True if command was sent, False otherwise
- """
- if not self._client or not self.state.connected:
- logger.warning("[%s] Cannot set print speed: not connected", self.serial_number)
- return False
- if mode not in (1, 2, 3, 4):
- logger.warning("[%s] Invalid speed mode: %s", self.serial_number, mode)
- return False
- command = {"print": {"command": "print_speed", "param": str(mode), "sequence_id": "0"}}
- self._client.publish(self.topic_publish, json.dumps(command), qos=1)
- logger.info("[%s] Set print speed mode to %s", self.serial_number, mode)
- return True
- def set_fan_speed(self, fan: int, speed: int) -> bool:
- """Set fan speed.
- Args:
- fan: Fan index (1=part cooling, 2=auxiliary, 3=chamber)
- speed: Speed 0-255 (0=off, 255=full)
- Returns:
- True if command was sent, False otherwise
- """
- if fan not in (1, 2, 3):
- logger.warning("[%s] Invalid fan index: %s", self.serial_number, fan)
- return False
- speed = max(0, min(255, speed)) # Clamp to 0-255
- return self.send_gcode(f"M106 P{fan} S{speed}")
- def set_part_fan(self, speed: int) -> bool:
- """Set part cooling fan speed (0-255)."""
- return self.set_fan_speed(1, speed)
- def set_aux_fan(self, speed: int) -> bool:
- """Set auxiliary fan speed (0-255)."""
- return self.set_fan_speed(2, speed)
- def set_chamber_fan(self, speed: int) -> bool:
- """Set chamber fan speed (0-255)."""
- return self.set_fan_speed(3, speed)
- def set_airduct_mode(self, mode: str) -> bool:
- """Set air conditioning mode (cooling or heating).
- Args:
- mode: "cooling" (modeId=0) or "heating" (modeId=1)
- - Cooling: Suitable for PLA/PETG/TPU, filters and cools chamber air
- - Heating: Suitable for ABS/ASA/PC/PA, circulates and heats chamber air,
- closes top exhaust flap
- Returns:
- True if command was sent, False otherwise
- """
- if not self._client or not self.state.connected:
- logger.warning("[%s] Cannot set airduct mode: not connected", self.serial_number)
- return False
- self._sequence_id += 1
- mode_id = 0 if mode == "cooling" else 1
- command = {
- "print": {"command": "set_airduct", "modeId": mode_id, "sequence_id": str(self._sequence_id), "submode": -1}
- }
- # Use QoS 1 for reliable delivery
- self._client.publish(self.topic_publish, json.dumps(command), qos=1)
- logger.info(
- "[%s] Set airduct mode to %s (modeId=%s, seq=%s)", self.serial_number, mode, mode_id, self._sequence_id
- )
- return True
- def set_chamber_light(self, on: bool) -> bool:
- """Turn chamber light on or off.
- Args:
- on: True to turn on, False to turn off
- Returns:
- True if command was sent, False otherwise
- """
- if not self._client or not self.state.connected:
- logger.warning("[%s] Cannot set chamber light: not connected", self.serial_number)
- return False
- mode = "on" if on else "off"
- # Control both chamber lights (some printers like H2D have two)
- for led_node in ["chamber_light", "chamber_light2"]:
- self._sequence_id += 1
- command = {
- "system": {
- "command": "ledctrl",
- "led_node": led_node,
- "led_mode": mode,
- "led_on_time": 500,
- "led_off_time": 500,
- "loop_times": 0,
- "interval_time": 0,
- "sequence_id": str(self._sequence_id),
- }
- }
- self._client.publish(self.topic_publish, json.dumps(command), qos=1)
- logger.info("[%s] Set chamber lights %s (seq=%s)", self.serial_number, "on" if on else "off", self._sequence_id)
- return True
- def select_extruder(self, extruder: int) -> bool:
- """Select the active extruder for dual-nozzle printers (H2D).
- Args:
- extruder: Extruder index (0=right, 1=left for H2D)
- Returns:
- True if command was sent, False otherwise
- """
- if extruder not in (0, 1):
- logger.warning("[%s] Invalid extruder: %s", self.serial_number, extruder)
- return False
- if not self._client or not self.state.connected:
- logger.warning("[%s] Cannot switch extruder: not connected", self.serial_number)
- return False
- # H2D extruder switching via select_extruder command
- # Command format captured from OrcaSlicer:
- # {"print": {"command": "select_extruder", "extruder_index": 0, "sequence_id": "..."}}
- # extruder_index: 0 = RIGHT, 1 = LEFT
- self._sequence_id += 1
- command = {
- "print": {"command": "select_extruder", "extruder_index": extruder, "sequence_id": str(self._sequence_id)}
- }
- self._client.publish(self.topic_publish, json.dumps(command), qos=1)
- logger.info(
- "[%s] Sent select_extruder command: extruder_index=%s (0=right, 1=left)", self.serial_number, extruder
- )
- return True
- def home_axes(self, axes: str = "XYZ") -> bool:
- """Run the printer's full auto-home sequence.
- The ``axes`` argument is ignored: a bare ``G28`` is always sent so
- Bambu firmware runs its safe multi-step routine (park toolhead →
- home XY → home Z). Partial-axis variants like ``G28 Z`` skip the
- toolhead-park step and can crash the bed into the toolhead on H2C
- / H2D / H2S / X1 where Z-home moves the bed UP — see #1052.
- """
- return self.send_gcode("G28")
- def move_axis(self, axis: str, distance: float, speed: int = 3000) -> bool:
- """Move an axis by a relative distance.
- Args:
- axis: Axis to move ("X", "Y", or "Z")
- distance: Distance to move in mm (positive or negative)
- speed: Movement speed in mm/min
- Returns:
- True if command was sent, False otherwise
- """
- axis = axis.upper()
- if axis not in ("X", "Y", "Z"):
- logger.warning("[%s] Invalid axis: %s", self.serial_number, axis)
- return False
- # G91 = relative mode, G0 = rapid move, G90 = back to absolute
- gcode = f"G91\nG0 {axis}{distance:.2f} F{speed}\nG90"
- return self.send_gcode(gcode)
- def disable_motors(self) -> bool:
- """Disable all stepper motors.
- Warning: This will cause the printer to lose its position.
- A homing operation will be required before printing.
- Returns:
- True if command was sent, False otherwise
- """
- return self.send_gcode("M18")
- def enable_motors(self) -> bool:
- """Enable all stepper motors.
- Returns:
- True if command was sent, False otherwise
- """
- return self.send_gcode("M17")
- def ams_load_filament(self, tray_id: int, extruder_id: int | None = None) -> bool:
- """Load filament from a specific AMS tray.
- Args:
- tray_id: Global tray ID — 0..15 for AMS slots, 254 for external spool
- (single-external printers and Ext-L on dual-nozzle H2D),
- 255 for Ext-R on dual-nozzle H2D.
- extruder_id: Unused - kept for API compatibility
- Returns:
- True if command was sent, False otherwise
- """
- if not self._client or not self.state.connected:
- logger.warning("[%s] Cannot load filament: not connected", self.serial_number)
- return False
- # Build the ams_change_filament command. Encoding differs by target type:
- # - AMS slots (0..15): slot_id is the local slot, curr/tar_temp = -1.
- # - External spool (tray_id=254): legacy capture from a single-extruder
- # printer used slot_id=254, curr/tar_temp=-1; preserved here.
- # - Ext-R on dual-nozzle H2D (tray_id=255): captured shape from
- # BambuStudio uses slot_id=0 (extruder index, 0=right), and
- # curr_temp/tar_temp = the actual right-nozzle temp. See #891.
- self._sequence_id += 1
- if tray_id == 255:
- ams_id = 255
- slot_id = 0 # extruder index for the right nozzle
- right_temp = int(self.state.temperatures.get("nozzle_2", 0) or 0)
- if right_temp < 180:
- right_temp = 215 # Reasonable default if right nozzle is cold/unknown
- curr_temp = right_temp
- tar_temp = right_temp
- elif tray_id == 254:
- ams_id = 255
- slot_id = 254
- curr_temp = -1
- tar_temp = -1
- else:
- ams_id = tray_id // 4
- slot_id = tray_id % 4
- curr_temp = -1
- tar_temp = -1
- command = {
- "print": {
- "command": "ams_change_filament",
- "sequence_id": str(self._sequence_id),
- "ams_id": ams_id,
- "slot_id": slot_id,
- "target": tray_id,
- "curr_temp": curr_temp,
- "tar_temp": tar_temp,
- }
- }
- command_json = json.dumps(command)
- logger.info("[%s] Publishing ams_change_filament command: %s", self.serial_number, command_json)
- self._client.publish(self.topic_publish, command_json, qos=1)
- logger.info("[%s] Loading filament from tray %s (AMS %s slot %s)", self.serial_number, tray_id, ams_id, slot_id)
- # Track this load request for H2D dual-nozzle disambiguation
- # H2D reports only slot number (0-3) in tray_now, so we use our tracked value
- self._last_load_tray_id = tray_id
- self.state.pending_tray_target = tray_id
- logger.info("[%s] Set pending_tray_target=%s for H2D disambiguation", self.serial_number, tray_id)
- return True
- def ams_unload_filament(self) -> bool:
- """Unload the currently loaded filament.
- Returns:
- True if command was sent, False otherwise
- """
- if not self._client or not self.state.connected:
- logger.warning("[%s] Cannot unload filament: not connected", self.serial_number)
- return False
- # Get the currently loaded tray info
- tray_now = self.state.tray_now
- logger.info("[%s] Unload requested, tray_now=%s", self.serial_number, tray_now)
- # Determine source ams_id for the unload command
- if tray_now == 255 or tray_now == 254:
- ams_id = 255 # No filament or external spool
- else:
- ams_id = tray_now // 4 # Source AMS
- # Command format from BambuStudio traffic capture:
- # - No extruder_id field
- # - For UNLOAD: curr_temp and tar_temp are the actual nozzle temp (e.g., 210)
- # - slot_id=255 and target=255 for unload
- # Get current nozzle temperature for the unload command
- nozzle_temp = int(self.state.temperatures.get("nozzle", 210))
- if nozzle_temp < 180:
- nozzle_temp = 210 # Default to PLA temp if nozzle is cold
- self._sequence_id += 1
- command = {
- "print": {
- "command": "ams_change_filament",
- "sequence_id": str(self._sequence_id),
- "ams_id": ams_id,
- "slot_id": 255, # 255 = unload marker
- "target": 255, # 255 = unload destination
- "curr_temp": nozzle_temp,
- "tar_temp": nozzle_temp,
- }
- }
- command_json = json.dumps(command)
- logger.info("[%s] Publishing ams_change_filament (unload) command: %s", self.serial_number, command_json)
- self._client.publish(self.topic_publish, command_json, qos=1)
- logger.info("[%s] Unloading filament (tray_now was %s)", self.serial_number, tray_now)
- # Clear tracked load request since we're unloading
- self._last_load_tray_id = None
- self.state.pending_tray_target = None
- logger.info("[%s] Cleared pending_tray_target (unload)", self.serial_number)
- return True
- def ams_control(self, action: str) -> bool:
- """Control AMS operations.
- Args:
- action: "resume", "reset", or "pause"
- Returns:
- True if command was sent, False otherwise
- """
- if not self._client or not self.state.connected:
- logger.warning("[%s] Cannot control AMS: not connected", self.serial_number)
- return False
- if action not in ("resume", "reset", "pause"):
- logger.warning("[%s] Invalid AMS action: %s", self.serial_number, action)
- return False
- command = {"print": {"command": "ams_control", "param": action, "sequence_id": "0"}}
- self._client.publish(self.topic_publish, json.dumps(command), qos=1)
- logger.info("[%s] AMS control: %s", self.serial_number, action)
- return True
- def ams_refresh_tray(self, ams_id: int, tray_id: int) -> tuple[bool, str]:
- """Trigger RFID re-read for a specific AMS tray.
- Args:
- ams_id: AMS unit ID (0-3, or 128 for H2D external tray)
- tray_id: Tray ID within the AMS (0-3)
- Returns:
- Tuple of (success, message)
- """
- if not self._client or not self.state.connected:
- logger.warning("[%s] Cannot refresh AMS tray: not connected", self.serial_number)
- return False, "Printer not connected"
- # Check if filament is currently loaded (tray_now != 255)
- # RFID refresh requires the AMS to move filament, which can't happen if one is loaded
- tray_now = self.state.tray_now
- if tray_now != 255:
- # Decode which tray is loaded for the message
- if tray_now == 254:
- loaded_tray = "external spool"
- elif tray_now >= 0 and tray_now < 128:
- loaded_ams = tray_now // 4
- loaded_slot = tray_now % 4
- loaded_tray = f"AMS {loaded_ams + 1} slot {loaded_slot + 1}"
- else:
- loaded_tray = f"tray {tray_now}"
- logger.warning("[%s] Cannot refresh AMS tray: filament loaded from %s", self.serial_number, loaded_tray)
- return False, f"Please unload filament first. Currently loaded: {loaded_tray}"
- # Use ams_get_rfid command to trigger RFID re-read
- # This command is used by Bambu Studio to re-read the RFID tag
- command = {"print": {"command": "ams_get_rfid", "ams_id": ams_id, "slot_id": tray_id, "sequence_id": "0"}}
- self._client.publish(self.topic_publish, json.dumps(command), qos=1)
- logger.info("[%s] Triggering RFID re-read: AMS %s, slot %s", self.serial_number, ams_id, tray_id)
- return True, f"Refreshing AMS {ams_id} tray {tray_id}"
- def ams_set_filament_setting(
- self,
- ams_id: int,
- tray_id: int,
- tray_info_idx: str,
- tray_type: str,
- tray_sub_brands: str,
- tray_color: str,
- nozzle_temp_min: int,
- nozzle_temp_max: int,
- setting_id: str = "",
- ) -> bool:
- """Set AMS tray filament settings (type, color, temperature).
- Note: K value is set separately via extrusion_cali_sel command.
- Args:
- ams_id: AMS unit ID (0-3 for regular AMS, 128-135 for HT AMS)
- tray_id: Tray ID within the AMS (0-3)
- tray_info_idx: Filament ID short format (e.g., "GFL05")
- tray_type: Filament type (e.g., "PLA", "PETG")
- tray_sub_brands: Sub-brand name (e.g., "PLA Basic", "PETG HF")
- tray_color: Color in RRGGBBAA hex format (e.g., "FFFF00FF")
- nozzle_temp_min: Minimum nozzle temperature
- nozzle_temp_max: Maximum nozzle temperature
- setting_id: Full setting ID with version (e.g., "GFSL05_07") - optional
- Returns:
- True if command was sent, False otherwise
- """
- if not self._client or not self.state.connected:
- logger.warning("[%s] Cannot set AMS filament setting: not connected", self.serial_number)
- return False
- # Calculate mqtt IDs based on AMS type.
- # External-spool convention verified against a BambuStudio→X1C packet capture
- # (issue #1279, May 2026): for `ams_filament_setting` Studio sends the
- # *global* tray index in `tray_id`, not a local position within the virtual
- # unit. The printer's response echoes `tray_id: 0` (slot position), which
- # is what the original code was matching — but the request and response
- # use different semantics for that field. Sending `tray_id: 0` is what
- # the P1S in #1279 rejected with `result: "fail"`.
- if ams_id == 255:
- vt_tray = self.state.raw_data.get("vt_tray", []) if self.state.raw_data else []
- if len(vt_tray) > 1:
- # Dual external slots (H2D): each ext slot is its own virtual AMS unit
- # (254=ext-L / slot 0, 255=ext-R / slot 1). The dual case is NOT
- # covered by the X1C capture — left at `mqtt_tray_id = 0` until a
- # captured Studio→H2D exchange confirms the correct value.
- mqtt_ams_id = 254 + tray_id
- mqtt_tray_id = 0
- else:
- # Single external slot (X1C, P1S, A1): global tray_id=254.
- mqtt_ams_id = 255
- mqtt_tray_id = 254
- slot_id = 0
- elif ams_id <= 3:
- mqtt_ams_id = ams_id
- mqtt_tray_id = tray_id
- slot_id = tray_id
- else:
- # AMS-HT: single tray per unit
- mqtt_ams_id = ams_id
- mqtt_tray_id = tray_id
- slot_id = 0
- command = {
- "print": {
- "command": "ams_filament_setting",
- "ams_id": mqtt_ams_id,
- "tray_id": mqtt_tray_id,
- "slot_id": slot_id,
- "tray_info_idx": tray_info_idx,
- "tray_type": tray_type,
- "tray_sub_brands": tray_sub_brands,
- "tray_color": tray_color,
- "nozzle_temp_min": nozzle_temp_min,
- "nozzle_temp_max": nozzle_temp_max,
- "sequence_id": "0",
- }
- }
- # Include setting_id if provided (helps slicer show correct profile)
- if setting_id:
- command["print"]["setting_id"] = setting_id
- command_json = json.dumps(command)
- logger.info(
- f"[{self.serial_number}] Publishing ams_filament_setting: AMS {ams_id}, tray {tray_id}, tray_info_idx={tray_info_idx}, setting_id={setting_id}"
- )
- logger.debug("[%s] ams_filament_setting command: %s", self.serial_number, command_json)
- self._client.publish(self.topic_publish, command_json, qos=1)
- self._last_ams_cmd_time = time.monotonic()
- return True
- def reset_ams_slot(self, ams_id: int, tray_id: int) -> bool:
- """Reset an AMS slot to empty/unconfigured state.
- Args:
- ams_id: AMS unit ID (0-3 for regular AMS, 128-135 for HT AMS)
- tray_id: Tray ID within the AMS (0-3)
- Returns:
- True if command was sent, False otherwise
- """
- if not self._client or not self.state.connected:
- logger.warning("[%s] Cannot reset AMS slot: not connected", self.serial_number)
- return False
- # Calculate mqtt IDs based on AMS type — same convention as
- # ams_set_filament_setting above. See its comment for the #1279 capture rationale.
- if ams_id == 255:
- vt_tray = self.state.raw_data.get("vt_tray", []) if self.state.raw_data else []
- if len(vt_tray) > 1:
- # Dual external slots (H2D): each ext slot is its own virtual AMS unit
- mqtt_ams_id = 254 + tray_id
- mqtt_tray_id = 0
- else:
- # Single external slot (X1C, P1S, A1): global tray_id=254.
- mqtt_ams_id = 255
- mqtt_tray_id = 254
- slot_id = 0
- elif ams_id <= 3:
- mqtt_ams_id = ams_id
- mqtt_tray_id = tray_id
- slot_id = tray_id
- else:
- # AMS-HT: single tray per unit
- mqtt_ams_id = ams_id
- mqtt_tray_id = tray_id
- slot_id = 0
- command = {
- "print": {
- "command": "ams_filament_setting",
- "ams_id": mqtt_ams_id,
- "tray_id": mqtt_tray_id,
- "slot_id": slot_id,
- "tray_info_idx": "",
- "tray_type": "",
- "tray_sub_brands": "",
- "tray_color": "00000000",
- "nozzle_temp_min": 0,
- "nozzle_temp_max": 0,
- "sequence_id": "0",
- }
- }
- command_json = json.dumps(command)
- logger.info("[%s] Resetting AMS slot: AMS %s, tray %s", self.serial_number, ams_id, tray_id)
- logger.debug("[%s] reset_ams_slot command: %s", self.serial_number, command_json)
- self._client.publish(self.topic_publish, command_json, qos=1)
- self._last_ams_cmd_time = time.monotonic()
- return True
- def extrusion_cali_sel(
- self,
- ams_id: int,
- tray_id: int,
- cali_idx: int,
- filament_id: str,
- nozzle_diameter: str = "0.4",
- ) -> bool:
- """Set calibration profile (K value) for an AMS slot.
- This command selects a K profile from the printer's calibration list.
- Use cali_idx=-1 to use the default K value (0.020).
- Note: Do NOT send setting_id in this command — BambuStudio never includes
- it, and adding it causes the firmware to mislink the profile on X1C/P1S.
- Args:
- ams_id: AMS unit ID (0-3 for regular AMS, 128-135 for HT AMS)
- tray_id: Tray ID within the AMS (0-3)
- cali_idx: Calibration profile index (-1 for default)
- filament_id: Filament preset ID (same as tray_info_idx)
- nozzle_diameter: Nozzle diameter string (e.g., "0.4")
- Returns:
- True if command was sent, False otherwise
- """
- if not self._client or not self.state.connected:
- logger.warning("[%s] Cannot set calibration: not connected", self.serial_number)
- return False
- # Calculate mqtt IDs based on AMS type.
- # IMPORTANT: extrusion_cali_sel uses GLOBAL tray_id (unlike ams_filament_setting
- # which uses LOCAL). BambuStudio confirms: tray_id = ams_id * 4 + slot.
- if ams_id == 255:
- # External spool: extrusion_cali_sel uses GLOBAL tray_id (unlike
- # ams_filament_setting which uses LOCAL tray_id=0).
- vt_tray = self.state.raw_data.get("vt_tray", []) if self.state.raw_data else []
- if len(vt_tray) > 1:
- # Dual external slots (H2D): each ext slot is its own virtual AMS unit
- # Confirmed from BambuStudio logs: ext-R sends ams_id=255, tray_id=255
- mqtt_ams_id = 254 + tray_id
- mqtt_tray_id = 254 + tray_id
- else:
- # Single external slot (X1C, P1S, A1): global tray_id=254
- mqtt_ams_id = 254
- mqtt_tray_id = 254
- slot_id = 0
- elif ams_id <= 3:
- mqtt_ams_id = ams_id
- mqtt_tray_id = ams_id * 4 + tray_id
- slot_id = tray_id
- elif ams_id >= 128 and ams_id <= 135:
- mqtt_ams_id = ams_id
- mqtt_tray_id = tray_id
- slot_id = 0
- else:
- mqtt_ams_id = ams_id
- mqtt_tray_id = tray_id
- slot_id = 0
- command = {
- "print": {
- "command": "extrusion_cali_sel",
- "cali_idx": cali_idx,
- "filament_id": filament_id,
- "nozzle_diameter": nozzle_diameter,
- "ams_id": mqtt_ams_id,
- "tray_id": mqtt_tray_id,
- "slot_id": slot_id,
- "sequence_id": "0",
- }
- }
- command_json = json.dumps(command)
- logger.info(
- f"[{self.serial_number}] Publishing extrusion_cali_sel: AMS {ams_id}, tray {tray_id}, cali_idx={cali_idx}"
- )
- logger.debug("[%s] extrusion_cali_sel command: %s", self.serial_number, command_json)
- self._client.publish(self.topic_publish, command_json, qos=1)
- return True
- def extrusion_cali_set(
- self,
- tray_id: int,
- k_value: float,
- nozzle_diameter: str = "0.4",
- nozzle_temp: int = 220,
- filament_id: str = "",
- setting_id: str = "",
- name: str = "",
- cali_idx: int = -1,
- ) -> bool:
- """Directly set K value (pressure advance) for a tray.
- Uses the filaments array format required by current firmware.
- Args:
- tray_id: Global tray ID (ams_id * 4 + slot)
- k_value: Pressure advance K value (e.g., 0.020)
- nozzle_diameter: Nozzle diameter string (e.g., "0.4")
- nozzle_temp: Nozzle temperature for calibration reference
- filament_id: Filament preset ID (e.g., "GFA02")
- setting_id: Setting ID (e.g., "GFSA02_07")
- name: Profile display name
- cali_idx: Calibration index (-1 for new)
- Returns:
- True if command was sent, False otherwise
- """
- if not self._client or not self.state.connected:
- logger.warning("[%s] Cannot set K value: not connected", self.serial_number)
- return False
- nozzle_id = f"HS00-{nozzle_diameter}"
- filament_entry = {
- "ams_id": 0,
- "cali_idx": cali_idx,
- "extruder_id": 0,
- "filament_id": filament_id,
- "k_value": f"{k_value:.6f}",
- "n_coef": "1.400000",
- "name": name,
- "nozzle_diameter": nozzle_diameter,
- "nozzle_id": nozzle_id,
- "setting_id": setting_id,
- "tray_id": tray_id,
- }
- command = {
- "print": {
- "command": "extrusion_cali_set",
- "filaments": [filament_entry],
- "nozzle_diameter": nozzle_diameter,
- "sequence_id": str(self._sequence_id),
- }
- }
- command_json = json.dumps(command)
- logger.info("[%s] Publishing extrusion_cali_set: tray %s, k_value=%s", self.serial_number, tray_id, k_value)
- logger.debug("[%s] extrusion_cali_set command: %s", self.serial_number, command_json)
- self._client.publish(self.topic_publish, command_json, qos=1)
- return True
- def set_timelapse(self, enable: bool) -> bool:
- """Enable or disable timelapse recording.
- Args:
- enable: True to enable, False to disable
- Returns:
- True if command was sent, False otherwise
- """
- if not self._client or not self.state.connected:
- logger.warning("[%s] Cannot set timelapse: not connected", self.serial_number)
- return False
- command = {"pushing": {"command": "pushall", "sequence_id": "0"}}
- # First send the timelapse setting
- timelapse_cmd = {
- "print": {"command": "gcode_line", "param": f"M981 S{1 if enable else 0} P20000", "sequence_id": "0"}
- }
- self._client.publish(self.topic_publish, json.dumps(timelapse_cmd), qos=1)
- # Request status update
- self._client.publish(self.topic_publish, json.dumps(command), qos=1)
- logger.info("[%s] Set timelapse %s", self.serial_number, "enabled" if enable else "disabled")
- return True
- def set_liveview(self, enable: bool) -> bool:
- """Enable or disable live view / camera streaming.
- Args:
- enable: True to enable, False to disable
- Returns:
- True if command was sent, False otherwise
- """
- if not self._client or not self.state.connected:
- logger.warning("[%s] Cannot set liveview: not connected", self.serial_number)
- return False
- command = {
- "xcam": {"command": "ipcam_record_set", "control": "enable" if enable else "disable", "sequence_id": "0"}
- }
- self._client.publish(self.topic_publish, json.dumps(command), qos=1)
- # Request status update
- pushall = {"pushing": {"command": "pushall", "sequence_id": "0"}}
- self._client.publish(self.topic_publish, json.dumps(pushall), qos=1)
- logger.info("[%s] Set liveview %s", self.serial_number, "enabled" if enable else "disabled")
- return True
|