test_bambu_mqtt.py 101 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658
  1. """
  2. Tests for the BambuMQTTClient service.
  3. These tests focus on timelapse tracking during prints.
  4. """
  5. import json
  6. import pytest
  7. class TestTimelapseTracking:
  8. """Tests for timelapse state tracking during prints."""
  9. @pytest.fixture
  10. def mqtt_client(self):
  11. """Create a BambuMQTTClient instance for testing."""
  12. from backend.app.services.bambu_mqtt import BambuMQTTClient
  13. client = BambuMQTTClient(
  14. ip_address="192.168.1.100",
  15. serial_number="TEST123",
  16. access_code="12345678",
  17. )
  18. return client
  19. def test_timelapse_flag_initializes_to_false(self, mqtt_client):
  20. """Verify _timelapse_during_print starts as False."""
  21. assert mqtt_client._timelapse_during_print is False
  22. def test_timelapse_flag_set_when_timelapse_active_during_running(self, mqtt_client):
  23. """Verify timelapse flag is set when timelapse is active while printing."""
  24. # Simulate print running
  25. mqtt_client._was_running = True
  26. mqtt_client.state.timelapse = False
  27. # Simulate xcam data showing timelapse is enabled
  28. xcam_data = {"timelapse": "enable"}
  29. mqtt_client._parse_xcam_data(xcam_data)
  30. assert mqtt_client.state.timelapse is True
  31. assert mqtt_client._timelapse_during_print is True
  32. def test_timelapse_flag_not_set_when_not_running(self, mqtt_client):
  33. """Verify timelapse flag is NOT set when printer not running."""
  34. # Printer is idle (not running)
  35. mqtt_client._was_running = False
  36. mqtt_client.state.timelapse = False
  37. # Timelapse is enabled but we're not printing
  38. xcam_data = {"timelapse": "enable"}
  39. mqtt_client._parse_xcam_data(xcam_data)
  40. assert mqtt_client.state.timelapse is True
  41. # Flag should NOT be set since we're not printing
  42. assert mqtt_client._timelapse_during_print is False
  43. def test_timelapse_flag_persists_after_timelapse_stops(self, mqtt_client):
  44. """Verify timelapse flag stays True even after recording stops."""
  45. # Simulate print running with timelapse
  46. mqtt_client._was_running = True
  47. # Enable timelapse during print
  48. xcam_data = {"timelapse": "enable"}
  49. mqtt_client._parse_xcam_data(xcam_data)
  50. assert mqtt_client._timelapse_during_print is True
  51. # Disable timelapse (recording stops at end of print)
  52. xcam_data = {"timelapse": "disable"}
  53. mqtt_client._parse_xcam_data(xcam_data)
  54. # Flag should still be True (persists until reset)
  55. assert mqtt_client.state.timelapse is False
  56. assert mqtt_client._timelapse_during_print is True
  57. def test_timelapse_flag_from_print_data(self, mqtt_client):
  58. """Verify timelapse flag is set from print data (not just xcam)."""
  59. # Simulate print running
  60. mqtt_client._was_running = True
  61. mqtt_client.state.timelapse = False
  62. mqtt_client._timelapse_during_print = False
  63. # Manually test the timelapse parsing logic from _parse_print_data
  64. # This tests the "timelapse" field in the main print data
  65. data = {"timelapse": True}
  66. mqtt_client.state.timelapse = data["timelapse"] is True
  67. if mqtt_client.state.timelapse and mqtt_client._was_running:
  68. mqtt_client._timelapse_during_print = True
  69. assert mqtt_client._timelapse_during_print is True
  70. class TestPrintCompletionWithTimelapse:
  71. """Tests for print completion including timelapse flag."""
  72. @pytest.fixture
  73. def mqtt_client(self):
  74. """Create a BambuMQTTClient instance for testing."""
  75. from backend.app.services.bambu_mqtt import BambuMQTTClient
  76. client = BambuMQTTClient(
  77. ip_address="192.168.1.100",
  78. serial_number="TEST123",
  79. access_code="12345678",
  80. )
  81. return client
  82. def test_print_complete_includes_timelapse_flag(self, mqtt_client):
  83. """Verify print complete callback includes timelapse_was_active."""
  84. # Set up completion callback
  85. callback_data = {}
  86. def on_complete(data):
  87. callback_data.update(data)
  88. mqtt_client.on_print_complete = on_complete
  89. # Simulate a print that had timelapse active
  90. mqtt_client._was_running = True
  91. mqtt_client._completion_triggered = False
  92. mqtt_client._timelapse_during_print = True
  93. mqtt_client._previous_gcode_state = "RUNNING"
  94. mqtt_client._previous_gcode_file = "test.gcode"
  95. mqtt_client.state.subtask_name = "Test Print"
  96. # Simulate print finish
  97. mqtt_client.state.state = "FINISH"
  98. # Manually trigger the completion logic (simplified)
  99. # In real code this happens in _parse_print_data
  100. should_trigger = (
  101. mqtt_client.state.state in ("FINISH", "FAILED")
  102. and not mqtt_client._completion_triggered
  103. and mqtt_client.on_print_complete
  104. and mqtt_client._previous_gcode_state == "RUNNING"
  105. )
  106. if should_trigger:
  107. status = "completed" if mqtt_client.state.state == "FINISH" else "failed"
  108. timelapse_was_active = mqtt_client._timelapse_during_print
  109. mqtt_client._completion_triggered = True
  110. mqtt_client._was_running = False
  111. mqtt_client._timelapse_during_print = False
  112. mqtt_client.on_print_complete(
  113. {
  114. "status": status,
  115. "filename": mqtt_client._previous_gcode_file,
  116. "subtask_name": mqtt_client.state.subtask_name,
  117. "timelapse_was_active": timelapse_was_active,
  118. }
  119. )
  120. assert "timelapse_was_active" in callback_data
  121. assert callback_data["timelapse_was_active"] is True
  122. def test_print_complete_timelapse_flag_false_when_no_timelapse(self, mqtt_client):
  123. """Verify timelapse_was_active is False when no timelapse during print."""
  124. callback_data = {}
  125. def on_complete(data):
  126. callback_data.update(data)
  127. mqtt_client.on_print_complete = on_complete
  128. # Print without timelapse
  129. mqtt_client._was_running = True
  130. mqtt_client._completion_triggered = False
  131. mqtt_client._timelapse_during_print = False # No timelapse
  132. mqtt_client._previous_gcode_state = "RUNNING"
  133. mqtt_client._previous_gcode_file = "test.gcode"
  134. mqtt_client.state.subtask_name = "Test Print"
  135. mqtt_client.state.state = "FINISH"
  136. # Trigger completion
  137. timelapse_was_active = mqtt_client._timelapse_during_print
  138. mqtt_client.on_print_complete(
  139. {
  140. "status": "completed",
  141. "filename": mqtt_client._previous_gcode_file,
  142. "subtask_name": mqtt_client.state.subtask_name,
  143. "timelapse_was_active": timelapse_was_active,
  144. }
  145. )
  146. assert callback_data["timelapse_was_active"] is False
  147. def test_timelapse_flag_reset_after_completion(self, mqtt_client):
  148. """Verify _timelapse_during_print is reset after print completion."""
  149. mqtt_client._timelapse_during_print = True
  150. mqtt_client._was_running = True
  151. mqtt_client._completion_triggered = False
  152. # Simulate completion reset
  153. mqtt_client._completion_triggered = True
  154. mqtt_client._was_running = False
  155. mqtt_client._timelapse_during_print = False
  156. assert mqtt_client._timelapse_during_print is False
  157. class TestRealisticMessageFlow:
  158. """Tests that simulate realistic MQTT message sequences.
  159. These tests process messages through _process_message to test the full flow,
  160. including the order of xcam parsing vs state detection.
  161. """
  162. @pytest.fixture
  163. def mqtt_client(self):
  164. """Create a BambuMQTTClient instance for testing."""
  165. from backend.app.services.bambu_mqtt import BambuMQTTClient
  166. client = BambuMQTTClient(
  167. ip_address="192.168.1.100",
  168. serial_number="TEST123",
  169. access_code="12345678",
  170. )
  171. return client
  172. def test_timelapse_detected_at_print_start_in_same_message(self, mqtt_client):
  173. """Test that timelapse is detected when xcam and state come in same message.
  174. This is the critical race condition test - xcam data is parsed BEFORE
  175. state detection, so the timelapse flag must be set AFTER _was_running is True.
  176. """
  177. # Callbacks to track events
  178. start_callback_data = {}
  179. def on_start(data):
  180. start_callback_data.update(data)
  181. mqtt_client.on_print_start = on_start
  182. # Initial state - idle
  183. mqtt_client._was_running = False
  184. mqtt_client._timelapse_during_print = False
  185. mqtt_client._previous_gcode_state = None
  186. # Simulate first message when print starts - contains both xcam and gcode_state
  187. # This is the realistic scenario from the printer
  188. # NOTE: Real MQTT messages wrap print data inside a "print" key
  189. payload = {
  190. "print": {
  191. "gcode_state": "RUNNING",
  192. "gcode_file": "/data/Metadata/test_print.gcode",
  193. "subtask_name": "Test_Print",
  194. "xcam": {
  195. "timelapse": "enable", # Timelapse is enabled in this print
  196. "printing_monitor": True,
  197. },
  198. "mc_percent": 0,
  199. "mc_remaining_time": 3600,
  200. }
  201. }
  202. # Process the message (this is what happens in real MQTT flow)
  203. mqtt_client._process_message(payload)
  204. # Verify timelapse was detected even though xcam is parsed before state
  205. assert mqtt_client._was_running is True, "_was_running should be True after RUNNING state"
  206. assert mqtt_client.state.timelapse is True, "state.timelapse should be True"
  207. assert mqtt_client._timelapse_during_print is True, (
  208. "timelapse_during_print should be True when timelapse is in the same message as RUNNING state"
  209. )
  210. def test_timelapse_not_detected_when_disabled(self, mqtt_client):
  211. """Test that timelapse is NOT detected when disabled in xcam data."""
  212. mqtt_client.on_print_start = lambda data: None
  213. # Initial state - idle
  214. mqtt_client._was_running = False
  215. mqtt_client._timelapse_during_print = False
  216. mqtt_client._previous_gcode_state = None
  217. # Print starts without timelapse
  218. payload = {
  219. "print": {
  220. "gcode_state": "RUNNING",
  221. "gcode_file": "/data/Metadata/test_print.gcode",
  222. "subtask_name": "Test_Print",
  223. "xcam": {
  224. "timelapse": "disable", # Timelapse is disabled
  225. "printing_monitor": True,
  226. },
  227. }
  228. }
  229. mqtt_client._process_message(payload)
  230. assert mqtt_client._was_running is True
  231. assert mqtt_client.state.timelapse is False
  232. assert mqtt_client._timelapse_during_print is False
  233. def test_timelapse_detected_when_enabled_after_print_start(self, mqtt_client):
  234. """Test timelapse detected when enabled in a message after print starts."""
  235. mqtt_client.on_print_start = lambda data: None
  236. # First message - print starts without timelapse info
  237. payload_start = {
  238. "print": {
  239. "gcode_state": "RUNNING",
  240. "gcode_file": "/data/Metadata/test_print.gcode",
  241. "subtask_name": "Test_Print",
  242. }
  243. }
  244. mqtt_client._process_message(payload_start)
  245. assert mqtt_client._was_running is True
  246. assert mqtt_client._timelapse_during_print is False # Not detected yet
  247. # Second message - xcam data arrives with timelapse enabled
  248. payload_xcam = {
  249. "print": {
  250. "gcode_state": "RUNNING",
  251. "gcode_file": "/data/Metadata/test_print.gcode",
  252. "subtask_name": "Test_Print",
  253. "xcam": {
  254. "timelapse": "enable",
  255. },
  256. }
  257. }
  258. mqtt_client._process_message(payload_xcam)
  259. # Now timelapse should be detected because _was_running is already True
  260. assert mqtt_client._timelapse_during_print is True
  261. def test_print_complete_includes_timelapse_flag_full_flow(self, mqtt_client):
  262. """Test full print lifecycle with timelapse - from start to completion."""
  263. start_data = {}
  264. complete_data = {}
  265. def on_start(data):
  266. start_data.update(data)
  267. def on_complete(data):
  268. complete_data.update(data)
  269. mqtt_client.on_print_start = on_start
  270. mqtt_client.on_print_complete = on_complete
  271. # 1. Print starts with timelapse
  272. mqtt_client._process_message(
  273. {
  274. "print": {
  275. "gcode_state": "RUNNING",
  276. "gcode_file": "/data/Metadata/test.gcode",
  277. "subtask_name": "Test",
  278. "xcam": {"timelapse": "enable"},
  279. }
  280. }
  281. )
  282. assert mqtt_client._timelapse_during_print is True
  283. assert "subtask_name" in start_data
  284. # 2. Print continues (multiple messages)
  285. for _ in range(3):
  286. mqtt_client._process_message(
  287. {
  288. "print": {
  289. "gcode_state": "RUNNING",
  290. "gcode_file": "/data/Metadata/test.gcode",
  291. "subtask_name": "Test",
  292. "mc_percent": 50,
  293. }
  294. }
  295. )
  296. # Timelapse flag should still be True
  297. assert mqtt_client._timelapse_during_print is True
  298. # 3. Print completes
  299. mqtt_client._process_message(
  300. {
  301. "print": {
  302. "gcode_state": "FINISH",
  303. "gcode_file": "/data/Metadata/test.gcode",
  304. "subtask_name": "Test",
  305. }
  306. }
  307. )
  308. # Verify completion callback received timelapse flag
  309. assert "timelapse_was_active" in complete_data
  310. assert complete_data["timelapse_was_active"] is True
  311. assert complete_data["status"] == "completed"
  312. # Flags should be reset after completion
  313. assert mqtt_client._timelapse_during_print is False
  314. assert mqtt_client._was_running is False
  315. def test_print_failed_includes_timelapse_flag(self, mqtt_client):
  316. """Test that failed print also includes timelapse flag."""
  317. complete_data = {}
  318. def on_complete(data):
  319. complete_data.update(data)
  320. mqtt_client.on_print_start = lambda data: None
  321. mqtt_client.on_print_complete = on_complete
  322. # Start with timelapse
  323. mqtt_client._process_message(
  324. {
  325. "print": {
  326. "gcode_state": "RUNNING",
  327. "gcode_file": "/data/Metadata/test.gcode",
  328. "subtask_name": "Test",
  329. "xcam": {"timelapse": "enable"},
  330. }
  331. }
  332. )
  333. # Print fails
  334. mqtt_client._process_message(
  335. {
  336. "print": {
  337. "gcode_state": "FAILED",
  338. "gcode_file": "/data/Metadata/test.gcode",
  339. "subtask_name": "Test",
  340. }
  341. }
  342. )
  343. assert complete_data["timelapse_was_active"] is True
  344. assert complete_data["status"] == "failed"
  345. class TestAMSDataMerging:
  346. """Tests for AMS data merging, particularly handling empty slots."""
  347. @pytest.fixture
  348. def mqtt_client(self):
  349. """Create a BambuMQTTClient instance for testing."""
  350. from backend.app.services.bambu_mqtt import BambuMQTTClient
  351. client = BambuMQTTClient(
  352. ip_address="192.168.1.100",
  353. serial_number="TEST123",
  354. access_code="12345678",
  355. )
  356. return client
  357. def test_empty_slot_clears_tray_type(self, mqtt_client):
  358. """Test that empty slot update clears tray_type (Issue #147).
  359. When a spool is removed from an old AMS, the printer sends empty values.
  360. These must overwrite the previous values to show the slot as empty.
  361. """
  362. # Initial state: AMS unit with a loaded spool
  363. initial_ams = {
  364. "ams": [
  365. {
  366. "id": 0,
  367. "tray": [
  368. {
  369. "id": 0,
  370. "tray_type": "PLA",
  371. "tray_sub_brands": "Bambu PLA Basic",
  372. "tray_color": "FF0000",
  373. "tag_uid": "1234567890ABCDEF",
  374. "remain": 80,
  375. }
  376. ],
  377. }
  378. ]
  379. }
  380. mqtt_client._handle_ams_data(initial_ams)
  381. # Verify initial state
  382. ams_data = mqtt_client.state.raw_data.get("ams", [])
  383. assert len(ams_data) == 1
  384. tray = ams_data[0]["tray"][0]
  385. assert tray["tray_type"] == "PLA"
  386. assert tray["tray_color"] == "FF0000"
  387. # Now simulate spool removal - printer sends empty values
  388. empty_update = {
  389. "ams": [
  390. {
  391. "id": 0,
  392. "tray": [
  393. {
  394. "id": 0,
  395. "tray_type": "", # Empty = slot is empty
  396. "tray_sub_brands": "",
  397. "tray_color": "",
  398. "tag_uid": "0000000000000000", # Zero UID
  399. "remain": 0,
  400. }
  401. ],
  402. }
  403. ]
  404. }
  405. mqtt_client._handle_ams_data(empty_update)
  406. # Verify empty values were applied (not ignored by merge logic)
  407. ams_data = mqtt_client.state.raw_data.get("ams", [])
  408. tray = ams_data[0]["tray"][0]
  409. assert tray["tray_type"] == "", "tray_type should be cleared when slot is empty"
  410. assert tray["tray_color"] == "", "tray_color should be cleared when slot is empty"
  411. assert tray["tray_sub_brands"] == "", "tray_sub_brands should be cleared"
  412. assert tray["tag_uid"] == "0000000000000000", "tag_uid should be cleared"
  413. def test_partial_update_preserves_other_fields(self, mqtt_client):
  414. """Test that partial updates still preserve non-slot-status fields."""
  415. # Initial state with full data
  416. initial_ams = {
  417. "ams": [
  418. {
  419. "id": 0,
  420. "humidity": "3",
  421. "temp": "25.5",
  422. "tray": [
  423. {
  424. "id": 0,
  425. "tray_type": "PLA",
  426. "tray_color": "00FF00",
  427. "remain": 90,
  428. "k": 0.02,
  429. }
  430. ],
  431. }
  432. ]
  433. }
  434. mqtt_client._handle_ams_data(initial_ams)
  435. # Partial update - only remain changes
  436. partial_update = {
  437. "ams": [
  438. {
  439. "id": 0,
  440. "tray": [
  441. {
  442. "id": 0,
  443. "remain": 85, # Only this changed
  444. }
  445. ],
  446. }
  447. ]
  448. }
  449. mqtt_client._handle_ams_data(partial_update)
  450. # Verify remain was updated but other fields preserved
  451. ams_data = mqtt_client.state.raw_data.get("ams", [])
  452. tray = ams_data[0]["tray"][0]
  453. assert tray["remain"] == 85, "remain should be updated"
  454. assert tray["tray_type"] == "PLA", "tray_type should be preserved"
  455. assert tray["tray_color"] == "00FF00", "tray_color should be preserved"
  456. assert tray["k"] == 0.02, "k should be preserved"
  457. def test_tray_exist_bits_clears_empty_slots(self, mqtt_client):
  458. """Test that tray_exist_bits clears slots marked as empty (Issue #147).
  459. New AMS models (AMS 2 Pro) don't send empty tray data when a spool is removed.
  460. Instead, they update tray_exist_bits to indicate which slots have spools.
  461. """
  462. # Initial state: AMS 0 and AMS 1 with loaded spools
  463. initial_ams = {
  464. "ams": [
  465. {
  466. "id": 0,
  467. "tray": [
  468. {"id": 0, "tray_type": "PLA", "tray_color": "FF0000", "remain": 80},
  469. {"id": 1, "tray_type": "PETG", "tray_color": "00FF00", "remain": 60},
  470. {"id": 2, "tray_type": "ABS", "tray_color": "0000FF", "remain": 40},
  471. {"id": 3, "tray_type": "TPU", "tray_color": "FFFF00", "remain": 20},
  472. ],
  473. },
  474. {
  475. "id": 1,
  476. "tray": [
  477. {"id": 0, "tray_type": "PLA", "tray_color": "FFFFFF", "remain": 90},
  478. {"id": 1, "tray_type": "PLA", "tray_color": "000000", "remain": 70},
  479. {"id": 2, "tray_type": "PLA", "tray_color": "FF00FF", "remain": 50},
  480. {"id": 3, "tray_type": "PLA", "tray_color": "00FFFF", "remain": 30},
  481. ],
  482. },
  483. ],
  484. "tray_exist_bits": "ff", # All 8 slots have spools (0xFF = 11111111)
  485. }
  486. mqtt_client._handle_ams_data(initial_ams)
  487. # Verify initial state
  488. ams_data = mqtt_client.state.raw_data.get("ams", [])
  489. assert ams_data[1]["tray"][3]["tray_type"] == "PLA" # AMS 1 slot 3 (B4) has spool
  490. # Now simulate spool removal from AMS 1 slot 3 (B4)
  491. # tray_exist_bits: 0x7f = 01111111 (bit 7 = 0 means AMS 1 slot 3 is empty)
  492. update_ams = {
  493. "ams": [
  494. {"id": 0, "tray": [{"id": 0}, {"id": 1}, {"id": 2}, {"id": 3}]},
  495. {"id": 1, "tray": [{"id": 0}, {"id": 1}, {"id": 2}, {"id": 3}]},
  496. ],
  497. "tray_exist_bits": "7f", # Bit 7 = 0 -> AMS 1 slot 3 is empty
  498. }
  499. mqtt_client._handle_ams_data(update_ams)
  500. # Verify AMS 1 slot 3 was cleared
  501. ams_data = mqtt_client.state.raw_data.get("ams", [])
  502. b4_tray = ams_data[1]["tray"][3]
  503. assert b4_tray["tray_type"] == "", "tray_type should be cleared for empty slot"
  504. assert b4_tray["remain"] == 0, "remain should be 0 for empty slot"
  505. # Verify other slots are preserved
  506. assert ams_data[0]["tray"][0]["tray_type"] == "PLA", "A1 should still have PLA"
  507. assert ams_data[1]["tray"][0]["tray_type"] == "PLA", "B1 should still have PLA"
  508. def test_shutdown_message_preserves_ams_data(self, mqtt_client):
  509. """Printer shutdown (power_on_flag=False) must not wipe AMS slot data (#765).
  510. When a printer shuts down it sends a final MQTT message with
  511. tray_exist_bits='0' and power_on_flag=False. This all-zero value
  512. previously caused every slot to be cleared, which then triggered
  513. auto-unlink of all spool assignments on reconnect.
  514. """
  515. # Initial state: two AMS units with loaded spools
  516. initial_ams = {
  517. "ams": [
  518. {
  519. "id": 0,
  520. "tray": [
  521. {"id": 0, "tray_type": "PLA", "tray_color": "FF0000FF", "remain": 80},
  522. {"id": 1, "tray_type": "PETG", "tray_color": "00FF00FF", "remain": 60},
  523. ],
  524. },
  525. {
  526. "id": 1,
  527. "tray": [
  528. {"id": 0, "tray_type": "PETG", "tray_color": "DBDDD9FF", "remain": 90},
  529. {"id": 1, "tray_type": "PETG", "tray_color": "67DB25FF", "remain": 70},
  530. ],
  531. },
  532. ],
  533. "tray_exist_bits": "33", # Slots 0,1 of each AMS (0b00110011)
  534. "power_on_flag": True,
  535. }
  536. mqtt_client._handle_ams_data(initial_ams)
  537. # Verify initial state
  538. ams_data = mqtt_client.state.raw_data["ams"]
  539. assert ams_data[0]["tray"][0]["tray_type"] == "PLA"
  540. assert ams_data[1]["tray"][0]["tray_type"] == "PETG"
  541. # Simulate printer shutdown — all-zero bits with power_on_flag=False
  542. shutdown_ams = {
  543. "ams_exist_bits": "0",
  544. "tray_exist_bits": "0",
  545. "power_on_flag": False,
  546. "insert_flag": False,
  547. "tray_now": "0",
  548. "version": 0,
  549. }
  550. mqtt_client._handle_ams_data(shutdown_ams)
  551. # AMS slot data MUST be preserved — shutdown should not clear it
  552. ams_data = mqtt_client.state.raw_data["ams"]
  553. assert ams_data[0]["tray"][0]["tray_type"] == "PLA", "Shutdown must not clear AMS 0 slot 0"
  554. assert ams_data[0]["tray"][0]["tray_color"] == "FF0000FF", "Shutdown must not clear AMS 0 slot 0 color"
  555. assert ams_data[0]["tray"][1]["tray_type"] == "PETG", "Shutdown must not clear AMS 0 slot 1"
  556. assert ams_data[1]["tray"][0]["tray_type"] == "PETG", "Shutdown must not clear AMS 1 slot 0"
  557. assert ams_data[1]["tray"][1]["tray_type"] == "PETG", "Shutdown must not clear AMS 1 slot 1"
  558. def test_genuine_removal_still_clears_with_power_on(self, mqtt_client):
  559. """Genuine spool removal (power_on_flag=True) must still clear slot data.
  560. Ensures the #765 fix doesn't break normal spool removal detection.
  561. """
  562. # Initial state: AMS with loaded spool
  563. initial_ams = {
  564. "ams": [
  565. {
  566. "id": 0,
  567. "tray": [
  568. {"id": 0, "tray_type": "PLA", "tray_color": "FF0000", "remain": 80},
  569. {"id": 1, "tray_type": "PETG", "tray_color": "00FF00", "remain": 60},
  570. ],
  571. },
  572. ],
  573. "tray_exist_bits": "3", # Both slots occupied (0b11)
  574. "power_on_flag": True,
  575. }
  576. mqtt_client._handle_ams_data(initial_ams)
  577. # Spool removed from slot 1 while printer is running
  578. removal_ams = {
  579. "ams": [
  580. {
  581. "id": 0,
  582. "tray": [{"id": 0}, {"id": 1}],
  583. },
  584. ],
  585. "tray_exist_bits": "1", # Only slot 0 occupied (0b01)
  586. "power_on_flag": True,
  587. }
  588. mqtt_client._handle_ams_data(removal_ams)
  589. # Slot 0 preserved, slot 1 cleared
  590. ams_data = mqtt_client.state.raw_data["ams"]
  591. assert ams_data[0]["tray"][0]["tray_type"] == "PLA", "Slot 0 should be preserved"
  592. assert ams_data[0]["tray"][1]["tray_type"] == "", "Slot 1 should be cleared on removal"
  593. assert ams_data[0]["tray"][1]["tray_color"] == "", "Slot 1 color should be cleared"
  594. def test_power_on_flag_defaults_true_when_absent(self, mqtt_client):
  595. """When power_on_flag is not in the MQTT data, clearing must proceed normally.
  596. Ensures backwards compatibility with firmware that doesn't send power_on_flag.
  597. """
  598. # Initial state
  599. initial_ams = {
  600. "ams": [
  601. {
  602. "id": 0,
  603. "tray": [
  604. {"id": 0, "tray_type": "PLA", "tray_color": "FF0000", "remain": 80},
  605. ],
  606. },
  607. ],
  608. "tray_exist_bits": "1",
  609. }
  610. mqtt_client._handle_ams_data(initial_ams)
  611. # Update WITHOUT power_on_flag — should still clear when bit=0
  612. update_ams = {
  613. "ams": [{"id": 0, "tray": [{"id": 0}]}],
  614. "tray_exist_bits": "0",
  615. # No power_on_flag key at all
  616. }
  617. mqtt_client._handle_ams_data(update_ams)
  618. ams_data = mqtt_client.state.raw_data["ams"]
  619. assert ams_data[0]["tray"][0]["tray_type"] == "", (
  620. "Without power_on_flag, clearing should proceed (defaults to True)"
  621. )
  622. class TestNozzleRackData:
  623. """Tests for nozzle rack data parsing from H2 series device.nozzle.info."""
  624. @pytest.fixture
  625. def mqtt_client(self):
  626. """Create a BambuMQTTClient instance for testing."""
  627. from backend.app.services.bambu_mqtt import BambuMQTTClient
  628. client = BambuMQTTClient(
  629. ip_address="192.168.1.100",
  630. serial_number="TEST123",
  631. access_code="12345678",
  632. )
  633. return client
  634. def test_h2c_nozzle_rack_populated_with_8_entries(self, mqtt_client):
  635. """H2C provides 8 nozzle entries: IDs 0,1 (L/R hotend) + 16-21 (rack)."""
  636. payload = {
  637. "print": {
  638. "device": {
  639. "nozzle": {
  640. "info": [
  641. {
  642. "id": 0,
  643. "type": "HS",
  644. "diameter": "0.4",
  645. "wear": 5,
  646. "stat": 1,
  647. "max_temp": 300,
  648. "serial_number": "SN-L",
  649. },
  650. {
  651. "id": 1,
  652. "type": "HS",
  653. "diameter": "0.4",
  654. "wear": 3,
  655. "stat": 0,
  656. "max_temp": 300,
  657. "serial_number": "SN-R",
  658. },
  659. {
  660. "id": 16,
  661. "type": "HS",
  662. "diameter": "0.4",
  663. "wear": 10,
  664. "stat": 0,
  665. "max_temp": 300,
  666. "serial_number": "SN-16",
  667. },
  668. {
  669. "id": 17,
  670. "type": "HH01",
  671. "diameter": "0.6",
  672. "wear": 0,
  673. "stat": 0,
  674. "max_temp": 300,
  675. "serial_number": "SN-17",
  676. },
  677. {
  678. "id": 18,
  679. "type": "HS",
  680. "diameter": "0.4",
  681. "wear": 2,
  682. "stat": 0,
  683. "max_temp": 300,
  684. "serial_number": "SN-18",
  685. },
  686. {
  687. "id": 19,
  688. "type": "",
  689. "diameter": "",
  690. "wear": None,
  691. "stat": None,
  692. "max_temp": 0,
  693. "serial_number": "",
  694. },
  695. {
  696. "id": 20,
  697. "type": "",
  698. "diameter": "",
  699. "wear": None,
  700. "stat": None,
  701. "max_temp": 0,
  702. "serial_number": "",
  703. },
  704. {
  705. "id": 21,
  706. "type": "",
  707. "diameter": "",
  708. "wear": None,
  709. "stat": None,
  710. "max_temp": 0,
  711. "serial_number": "",
  712. },
  713. ]
  714. }
  715. }
  716. }
  717. }
  718. mqtt_client._process_message(payload)
  719. assert len(mqtt_client.state.nozzle_rack) == 8
  720. ids = [n["id"] for n in mqtt_client.state.nozzle_rack]
  721. assert ids == [0, 1, 16, 17, 18, 19, 20, 21]
  722. def test_h2d_nozzle_rack_populated_with_2_entries(self, mqtt_client):
  723. """H2D provides 2 nozzle entries: IDs 0,1 (L/R hotend) — no rack slots."""
  724. payload = {
  725. "print": {
  726. "device": {
  727. "nozzle": {
  728. "info": [
  729. {
  730. "id": 0,
  731. "type": "HS",
  732. "diameter": "0.4",
  733. "wear": 5,
  734. "stat": 1,
  735. "max_temp": 300,
  736. "serial_number": "SN-L",
  737. },
  738. {
  739. "id": 1,
  740. "type": "HS",
  741. "diameter": "0.4",
  742. "wear": 3,
  743. "stat": 1,
  744. "max_temp": 300,
  745. "serial_number": "SN-R",
  746. },
  747. ]
  748. }
  749. }
  750. }
  751. }
  752. mqtt_client._process_message(payload)
  753. assert len(mqtt_client.state.nozzle_rack) == 2
  754. ids = [n["id"] for n in mqtt_client.state.nozzle_rack]
  755. assert ids == [0, 1]
  756. def test_single_nozzle_h2s_populated(self, mqtt_client):
  757. """H2S provides 1 nozzle entry: ID 0 only — single nozzle printer."""
  758. payload = {
  759. "print": {
  760. "device": {
  761. "nozzle": {
  762. "info": [
  763. {
  764. "id": 0,
  765. "type": "HS",
  766. "diameter": "0.4",
  767. "wear": 2,
  768. "stat": 1,
  769. "max_temp": 300,
  770. "serial_number": "SN-0",
  771. },
  772. ]
  773. }
  774. }
  775. }
  776. }
  777. mqtt_client._process_message(payload)
  778. assert len(mqtt_client.state.nozzle_rack) == 1
  779. assert mqtt_client.state.nozzle_rack[0]["id"] == 0
  780. def test_empty_nozzle_info_does_not_populate_rack(self, mqtt_client):
  781. """Empty nozzle info list should not populate nozzle_rack."""
  782. payload = {"print": {"device": {"nozzle": {"info": []}}}}
  783. mqtt_client._process_message(payload)
  784. assert mqtt_client.state.nozzle_rack == []
  785. def test_nozzle_rack_sorted_by_id(self, mqtt_client):
  786. """Nozzle rack entries should be sorted by ID regardless of input order."""
  787. payload = {
  788. "print": {
  789. "device": {
  790. "nozzle": {
  791. "info": [
  792. {"id": 17, "type": "HS", "diameter": "0.6"},
  793. {"id": 0, "type": "HS", "diameter": "0.4"},
  794. {"id": 16, "type": "HS", "diameter": "0.4"},
  795. {"id": 1, "type": "HS", "diameter": "0.4"},
  796. ]
  797. }
  798. }
  799. }
  800. }
  801. mqtt_client._process_message(payload)
  802. ids = [n["id"] for n in mqtt_client.state.nozzle_rack]
  803. assert ids == [0, 1, 16, 17]
  804. def test_nozzle_rack_field_mapping(self, mqtt_client):
  805. """Verify field mapping from MQTT nozzle_info to nozzle_rack dict keys."""
  806. payload = {
  807. "print": {
  808. "device": {
  809. "nozzle": {
  810. "info": [
  811. {
  812. "id": 16,
  813. "type": "HH01",
  814. "diameter": "0.6",
  815. "wear": 15,
  816. "stat": 0,
  817. "max_temp": 320,
  818. "serial_number": "SN-ABC123",
  819. "filament_colour": "FF8800",
  820. "filament_id": "F42",
  821. "tray_type": "ABS",
  822. }
  823. ]
  824. }
  825. }
  826. }
  827. }
  828. mqtt_client._process_message(payload)
  829. slot = mqtt_client.state.nozzle_rack[0]
  830. assert slot["id"] == 16
  831. assert slot["type"] == "HH01"
  832. assert slot["diameter"] == "0.6"
  833. assert slot["wear"] == 15
  834. assert slot["stat"] == 0
  835. assert slot["max_temp"] == 320
  836. assert slot["serial_number"] == "SN-ABC123"
  837. assert slot["filament_color"] == "FF8800"
  838. assert slot["filament_id"] == "F42"
  839. assert slot["filament_type"] == "ABS"
  840. def test_nozzle_info_updates_nozzle_state(self, mqtt_client):
  841. """Nozzle info for IDs 0,1 should also update nozzle state (type/diameter)."""
  842. payload = {
  843. "print": {
  844. "device": {
  845. "nozzle": {
  846. "info": [
  847. {"id": 0, "type": "HS", "diameter": "0.4"},
  848. {"id": 1, "type": "HH01", "diameter": "0.6"},
  849. ]
  850. }
  851. }
  852. }
  853. }
  854. mqtt_client._process_message(payload)
  855. assert mqtt_client.state.nozzles[0].nozzle_type == "HS"
  856. assert mqtt_client.state.nozzles[0].nozzle_diameter == "0.4"
  857. assert mqtt_client.state.nozzles[1].nozzle_type == "HH01"
  858. assert mqtt_client.state.nozzles[1].nozzle_diameter == "0.6"
  859. class TestRequestTopicFailSafe:
  860. """Tests for graceful degradation when broker rejects request topic subscription."""
  861. @pytest.fixture(autouse=True)
  862. def clear_request_topic_cache(self):
  863. """Clear class-level cache before each test to avoid cross-test pollution."""
  864. from backend.app.services.bambu_mqtt import BambuMQTTClient
  865. BambuMQTTClient._request_topic_cache.clear()
  866. @pytest.fixture
  867. def mqtt_client(self):
  868. from backend.app.services.bambu_mqtt import BambuMQTTClient
  869. client = BambuMQTTClient(
  870. ip_address="192.168.1.100",
  871. serial_number="TEST123",
  872. access_code="12345678",
  873. )
  874. return client
  875. def test_request_topic_supported_by_default(self, mqtt_client):
  876. """Request topic subscription is attempted by default."""
  877. assert mqtt_client._request_topic_supported is True
  878. assert mqtt_client._request_topic_confirmed is False
  879. def test_on_subscribe_confirms_success(self, mqtt_client):
  880. """Successful SUBACK marks request topic as confirmed."""
  881. from paho.mqtt.reasoncodes import ReasonCode
  882. mqtt_client._request_topic_sub_mid = 42
  883. rc = ReasonCode(9, identifier=0) # SUBACK packetType=9, QoS 0 = success
  884. mqtt_client._on_subscribe(None, None, 42, [rc], None)
  885. assert mqtt_client._request_topic_confirmed is True
  886. assert mqtt_client._request_topic_supported is True
  887. assert mqtt_client._request_topic_sub_mid is None
  888. assert mqtt_client._request_topic_sub_time == 0.0
  889. def test_on_subscribe_detects_rejection(self, mqtt_client):
  890. """SUBACK with failure code disables request topic."""
  891. from paho.mqtt.reasoncodes import ReasonCode
  892. mqtt_client._request_topic_sub_mid = 42
  893. rc = ReasonCode(9, identifier=0x80) # SUBACK packetType=9, 0x80 = failure
  894. mqtt_client._on_subscribe(None, None, 42, [rc], None)
  895. assert mqtt_client._request_topic_supported is False
  896. assert mqtt_client._request_topic_confirmed is False
  897. def test_on_subscribe_ignores_other_mids(self, mqtt_client):
  898. """SUBACK for other subscriptions (e.g. report topic) is ignored."""
  899. from paho.mqtt.reasoncodes import ReasonCode
  900. mqtt_client._request_topic_sub_mid = 42
  901. rc = ReasonCode(9, identifier=0x80)
  902. mqtt_client._on_subscribe(None, None, 99, [rc], None)
  903. # Not affected — mid doesn't match
  904. assert mqtt_client._request_topic_supported is True
  905. def test_disconnect_after_subscription_disables_topic(self, mqtt_client):
  906. """Disconnect within 10s of subscription attempt disables request topic."""
  907. import time
  908. mqtt_client._request_topic_sub_time = time.time()
  909. mqtt_client._request_topic_confirmed = False
  910. mqtt_client._last_message_time = 0.0
  911. mqtt_client._on_disconnect(None, None)
  912. assert mqtt_client._request_topic_supported is False
  913. assert mqtt_client._request_topic_sub_time == 0.0
  914. def test_disconnect_after_confirmation_does_not_disable(self, mqtt_client):
  915. """Disconnect after SUBACK confirmation keeps request topic enabled."""
  916. import time
  917. mqtt_client._request_topic_sub_time = time.time()
  918. mqtt_client._request_topic_confirmed = True
  919. mqtt_client._last_message_time = 0.0
  920. mqtt_client._on_disconnect(None, None)
  921. assert mqtt_client._request_topic_supported is True
  922. def test_late_disconnect_does_not_disable(self, mqtt_client):
  923. """Disconnect long after subscription (>10s) doesn't blame request topic."""
  924. import time
  925. mqtt_client._request_topic_sub_time = time.time() - 30.0
  926. mqtt_client._request_topic_confirmed = False
  927. mqtt_client._last_message_time = 0.0
  928. mqtt_client._on_disconnect(None, None)
  929. assert mqtt_client._request_topic_supported is True
  930. def test_on_connect_skips_request_topic_when_unsupported(self, mqtt_client):
  931. """After marking unsupported, reconnect skips request topic subscription."""
  932. mqtt_client._request_topic_supported = False
  933. subscribe_calls = []
  934. mock_client = type(
  935. "MockClient",
  936. (),
  937. {
  938. "subscribe": lambda self, topic: subscribe_calls.append(topic) or (0, 1),
  939. },
  940. )()
  941. mqtt_client._on_connect(mock_client, None, None, 0)
  942. # Only report topic subscribed, not request topic
  943. assert len(subscribe_calls) == 1
  944. assert subscribe_calls[0] == mqtt_client.topic_subscribe
  945. def test_cache_persists_across_instances(self):
  946. """New client instance inherits request topic unsupported state from cache."""
  947. from backend.app.services.bambu_mqtt import BambuMQTTClient
  948. client1 = BambuMQTTClient(
  949. ip_address="192.168.1.100",
  950. serial_number="TEST_CACHE",
  951. access_code="12345678",
  952. )
  953. assert client1._request_topic_supported is True
  954. # Simulate disconnect-after-subscribe disabling the topic
  955. client1._request_topic_sub_time = __import__("time").time()
  956. client1._request_topic_confirmed = False
  957. client1._last_message_time = 0.0
  958. client1._on_disconnect(None, None)
  959. assert client1._request_topic_supported is False
  960. # New instance for same serial should inherit the cached state
  961. client2 = BambuMQTTClient(
  962. ip_address="192.168.1.100",
  963. serial_number="TEST_CACHE",
  964. access_code="12345678",
  965. )
  966. assert client2._request_topic_supported is False
  967. def test_cache_does_not_affect_different_serial(self):
  968. """Cache is per-serial — different printer is unaffected."""
  969. from backend.app.services.bambu_mqtt import BambuMQTTClient
  970. BambuMQTTClient._request_topic_cache["SERIAL_A"] = False
  971. client = BambuMQTTClient(
  972. ip_address="192.168.1.100",
  973. serial_number="SERIAL_B",
  974. access_code="12345678",
  975. )
  976. assert client._request_topic_supported is True
  977. def test_cache_updated_on_suback_success(self):
  978. """Successful SUBACK caches positive confirmation."""
  979. from paho.mqtt.reasoncodes import ReasonCode
  980. from backend.app.services.bambu_mqtt import BambuMQTTClient
  981. client = BambuMQTTClient(
  982. ip_address="192.168.1.100",
  983. serial_number="TEST_SUBACK",
  984. access_code="12345678",
  985. )
  986. client._request_topic_sub_mid = 42
  987. rc = ReasonCode(9, identifier=0) # Success
  988. client._on_subscribe(None, None, 42, [rc], None)
  989. assert BambuMQTTClient._request_topic_cache["TEST_SUBACK"] is True
  990. def test_cache_updated_on_suback_rejection(self):
  991. """SUBACK rejection caches negative state."""
  992. from paho.mqtt.reasoncodes import ReasonCode
  993. from backend.app.services.bambu_mqtt import BambuMQTTClient
  994. client = BambuMQTTClient(
  995. ip_address="192.168.1.100",
  996. serial_number="TEST_REJECT",
  997. access_code="12345678",
  998. )
  999. client._request_topic_sub_mid = 42
  1000. rc = ReasonCode(9, identifier=0x80) # Failure
  1001. client._on_subscribe(None, None, 42, [rc], None)
  1002. assert BambuMQTTClient._request_topic_cache["TEST_REJECT"] is False
  1003. class TestRequestTopicAmsMapping:
  1004. """Tests for capturing ams_mapping from the MQTT request topic."""
  1005. @pytest.fixture
  1006. def mqtt_client(self):
  1007. """Create a BambuMQTTClient instance for testing."""
  1008. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1009. client = BambuMQTTClient(
  1010. ip_address="192.168.1.100",
  1011. serial_number="TEST123",
  1012. access_code="12345678",
  1013. )
  1014. return client
  1015. def test_captured_ams_mapping_initializes_to_none(self, mqtt_client):
  1016. """Verify _captured_ams_mapping starts as None."""
  1017. assert mqtt_client._captured_ams_mapping is None
  1018. def test_handle_request_message_captures_ams_mapping(self, mqtt_client):
  1019. """project_file command with ams_mapping stores the mapping."""
  1020. data = {
  1021. "print": {
  1022. "command": "project_file",
  1023. "ams_mapping": [0, 4, -1, -1],
  1024. "url": "ftp://192.168.1.100/test.3mf",
  1025. }
  1026. }
  1027. mqtt_client._handle_request_message(data)
  1028. assert mqtt_client._captured_ams_mapping == [0, 4, -1, -1]
  1029. def test_handle_request_message_ignores_non_print_commands(self, mqtt_client):
  1030. """Non-project_file commands don't store ams_mapping."""
  1031. data = {
  1032. "print": {
  1033. "command": "pause",
  1034. }
  1035. }
  1036. mqtt_client._handle_request_message(data)
  1037. assert mqtt_client._captured_ams_mapping is None
  1038. def test_handle_request_message_ignores_missing_ams_mapping(self, mqtt_client):
  1039. """project_file command without ams_mapping doesn't store anything."""
  1040. data = {
  1041. "print": {
  1042. "command": "project_file",
  1043. "url": "ftp://192.168.1.100/test.3mf",
  1044. }
  1045. }
  1046. mqtt_client._handle_request_message(data)
  1047. assert mqtt_client._captured_ams_mapping is None
  1048. def test_handle_request_message_ignores_non_dict_print(self, mqtt_client):
  1049. """Non-dict print value is safely ignored."""
  1050. data = {"print": "not_a_dict"}
  1051. mqtt_client._handle_request_message(data)
  1052. assert mqtt_client._captured_ams_mapping is None
  1053. def test_handle_request_message_ignores_missing_print(self, mqtt_client):
  1054. """Message without print key is safely ignored."""
  1055. data = {"pushing": {"command": "pushall"}}
  1056. mqtt_client._handle_request_message(data)
  1057. assert mqtt_client._captured_ams_mapping is None
  1058. def test_captured_mapping_overwrites_previous(self, mqtt_client):
  1059. """A new print command overwrites a previously captured mapping."""
  1060. mqtt_client._captured_ams_mapping = [0, -1, -1, -1]
  1061. data = {
  1062. "print": {
  1063. "command": "project_file",
  1064. "ams_mapping": [4, 8, -1, -1],
  1065. }
  1066. }
  1067. mqtt_client._handle_request_message(data)
  1068. assert mqtt_client._captured_ams_mapping == [4, 8, -1, -1]
  1069. def test_print_start_callback_includes_ams_mapping(self, mqtt_client):
  1070. """on_print_start callback data includes captured ams_mapping."""
  1071. start_data = {}
  1072. def on_start(data):
  1073. start_data.update(data)
  1074. mqtt_client.on_print_start = on_start
  1075. mqtt_client._captured_ams_mapping = [0, 4, -1, -1]
  1076. # Trigger print start
  1077. mqtt_client._process_message(
  1078. {
  1079. "print": {
  1080. "gcode_state": "RUNNING",
  1081. "gcode_file": "/data/Metadata/test.gcode",
  1082. "subtask_name": "Test",
  1083. }
  1084. }
  1085. )
  1086. assert start_data.get("ams_mapping") == [0, 4, -1, -1]
  1087. def test_print_start_callback_ams_mapping_none_when_not_captured(self, mqtt_client):
  1088. """on_print_start callback has ams_mapping=None when no mapping captured."""
  1089. start_data = {}
  1090. def on_start(data):
  1091. start_data.update(data)
  1092. mqtt_client.on_print_start = on_start
  1093. mqtt_client._process_message(
  1094. {
  1095. "print": {
  1096. "gcode_state": "RUNNING",
  1097. "gcode_file": "/data/Metadata/test.gcode",
  1098. "subtask_name": "Test",
  1099. }
  1100. }
  1101. )
  1102. assert "ams_mapping" in start_data
  1103. assert start_data["ams_mapping"] is None
  1104. def test_print_complete_callback_includes_ams_mapping(self, mqtt_client):
  1105. """on_print_complete callback data includes captured ams_mapping."""
  1106. complete_data = {}
  1107. def on_complete(data):
  1108. complete_data.update(data)
  1109. mqtt_client.on_print_start = lambda d: None
  1110. mqtt_client.on_print_complete = on_complete
  1111. mqtt_client._captured_ams_mapping = [0, 9, -1, -1]
  1112. # Start print
  1113. mqtt_client._process_message(
  1114. {
  1115. "print": {
  1116. "gcode_state": "RUNNING",
  1117. "gcode_file": "/data/Metadata/test.gcode",
  1118. "subtask_name": "Test",
  1119. }
  1120. }
  1121. )
  1122. # Complete print
  1123. mqtt_client._process_message(
  1124. {
  1125. "print": {
  1126. "gcode_state": "FINISH",
  1127. "gcode_file": "/data/Metadata/test.gcode",
  1128. "subtask_name": "Test",
  1129. }
  1130. }
  1131. )
  1132. assert complete_data.get("ams_mapping") == [0, 9, -1, -1]
  1133. def test_captured_mapping_cleared_after_print_complete(self, mqtt_client):
  1134. """_captured_ams_mapping is reset to None after print completion."""
  1135. mqtt_client.on_print_start = lambda d: None
  1136. mqtt_client.on_print_complete = lambda d: None
  1137. mqtt_client._captured_ams_mapping = [0, 4, -1, -1]
  1138. # Start print
  1139. mqtt_client._process_message(
  1140. {
  1141. "print": {
  1142. "gcode_state": "RUNNING",
  1143. "gcode_file": "/data/Metadata/test.gcode",
  1144. "subtask_name": "Test",
  1145. }
  1146. }
  1147. )
  1148. # Complete print
  1149. mqtt_client._process_message(
  1150. {
  1151. "print": {
  1152. "gcode_state": "FINISH",
  1153. "gcode_file": "/data/Metadata/test.gcode",
  1154. "subtask_name": "Test",
  1155. }
  1156. }
  1157. )
  1158. assert mqtt_client._captured_ams_mapping is None
  1159. def test_full_flow_capture_and_deliver(self, mqtt_client):
  1160. """Full flow: slicer sends print command → MQTT captures mapping → completion delivers it."""
  1161. complete_data = {}
  1162. def on_complete(data):
  1163. complete_data.update(data)
  1164. mqtt_client.on_print_start = lambda d: None
  1165. mqtt_client.on_print_complete = on_complete
  1166. # 1. Slicer sends print command (captured from request topic)
  1167. mqtt_client._handle_request_message(
  1168. {
  1169. "print": {
  1170. "command": "project_file",
  1171. "ams_mapping": [4, 9, -1, -1],
  1172. "url": "ftp://192.168.1.100/model.3mf",
  1173. }
  1174. }
  1175. )
  1176. assert mqtt_client._captured_ams_mapping == [4, 9, -1, -1]
  1177. # 2. Printer reports RUNNING
  1178. mqtt_client._process_message(
  1179. {
  1180. "print": {
  1181. "gcode_state": "RUNNING",
  1182. "gcode_file": "/data/Metadata/model.gcode",
  1183. "subtask_name": "Model",
  1184. }
  1185. }
  1186. )
  1187. # 3. Printer reports FINISH
  1188. mqtt_client._process_message(
  1189. {
  1190. "print": {
  1191. "gcode_state": "FINISH",
  1192. "gcode_file": "/data/Metadata/model.gcode",
  1193. "subtask_name": "Model",
  1194. }
  1195. }
  1196. )
  1197. assert complete_data["ams_mapping"] == [4, 9, -1, -1]
  1198. assert complete_data["status"] == "completed"
  1199. # Mapping cleared after completion
  1200. assert mqtt_client._captured_ams_mapping is None
  1201. # ---------------------------------------------------------------------------
  1202. # tray_now disambiguation helpers
  1203. # ---------------------------------------------------------------------------
  1204. def _ams_payload(tray_now, ams_units=None, tray_exist_bits=None, ams_exist_bits=None):
  1205. """Build minimal print.ams payload for tray_now disambiguation tests."""
  1206. ams = {"tray_now": str(tray_now)}
  1207. if ams_units is not None:
  1208. ams["ams"] = ams_units
  1209. if tray_exist_bits is not None:
  1210. ams["tray_exist_bits"] = tray_exist_bits
  1211. if ams_exist_bits is not None:
  1212. ams["ams_exist_bits"] = ams_exist_bits
  1213. return {"print": {"ams": ams}}
  1214. def _extruder_info_payload(extruders):
  1215. """Build device.extruder.info payload (dual-nozzle detection + snow).
  1216. Each entry in *extruders* is a dict with at least ``id`` and ``snow``.
  1217. """
  1218. return {
  1219. "print": {
  1220. "device": {
  1221. "extruder": {
  1222. "info": extruders,
  1223. }
  1224. }
  1225. }
  1226. }
  1227. def _extruder_state_payload(state_val):
  1228. """Build device.extruder.state payload (active extruder via bit 8)."""
  1229. return {
  1230. "print": {
  1231. "device": {
  1232. "extruder": {
  1233. "state": state_val,
  1234. }
  1235. }
  1236. }
  1237. }
  1238. # ---------------------------------------------------------------------------
  1239. # 1. Single-nozzle X1E — direct passthrough
  1240. # ---------------------------------------------------------------------------
  1241. class TestTrayNowSingleNozzleX1E:
  1242. """Single-nozzle, 1 AMS — tray_now is a direct passthrough."""
  1243. @pytest.fixture
  1244. def mqtt_client(self):
  1245. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1246. return BambuMQTTClient(
  1247. ip_address="192.168.1.100",
  1248. serial_number="TEST_X1E",
  1249. access_code="12345678",
  1250. )
  1251. def test_tray_now_direct_passthrough_slot_0_to_3(self, mqtt_client):
  1252. """Each tray_now 0-3 maps 1:1 on single-nozzle printers."""
  1253. for slot in range(4):
  1254. mqtt_client._process_message(_ams_payload(slot))
  1255. assert mqtt_client.state.tray_now == slot
  1256. def test_tray_now_255_means_unloaded(self, mqtt_client):
  1257. """tray_now=255 means no filament loaded."""
  1258. mqtt_client._process_message(_ams_payload(255))
  1259. assert mqtt_client.state.tray_now == 255
  1260. def test_single_extruder_does_not_trigger_dual_nozzle(self, mqtt_client):
  1261. """device.extruder.info with 1 entry must NOT set _is_dual_nozzle."""
  1262. mqtt_client._process_message(_extruder_info_payload([{"id": 0, "snow": 0xFF00FF}]))
  1263. assert mqtt_client._is_dual_nozzle is False
  1264. def test_last_loaded_tray_survives_unload(self, mqtt_client):
  1265. """Load tray 2, unload → last_loaded_tray stays 2."""
  1266. mqtt_client._process_message(_ams_payload(2))
  1267. assert mqtt_client.state.last_loaded_tray == 2
  1268. mqtt_client._process_message(_ams_payload(255))
  1269. assert mqtt_client.state.tray_now == 255
  1270. assert mqtt_client.state.last_loaded_tray == 2
  1271. # ---------------------------------------------------------------------------
  1272. # 2. Single-nozzle P2S — multiple AMS, global IDs pass through
  1273. # ---------------------------------------------------------------------------
  1274. class TestTrayNowSingleNozzleP2S:
  1275. """Single-nozzle, 2 AMS — tray_now > 3 passes through as global ID."""
  1276. @pytest.fixture
  1277. def mqtt_client(self):
  1278. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1279. return BambuMQTTClient(
  1280. ip_address="192.168.1.100",
  1281. serial_number="TEST_P2S",
  1282. access_code="12345678",
  1283. )
  1284. def test_tray_now_ams1_global_ids_4_to_7(self, mqtt_client):
  1285. """tray_now 4-7 are global IDs for AMS 1 on single-nozzle printers."""
  1286. for global_id in range(4, 8):
  1287. mqtt_client._process_message(_ams_payload(global_id))
  1288. assert mqtt_client.state.tray_now == global_id
  1289. def test_tray_change_across_ams_units(self, mqtt_client):
  1290. """Switch from AMS 0 slot 1 → AMS 1 slot 2 (global 6)."""
  1291. mqtt_client._process_message(_ams_payload(1))
  1292. assert mqtt_client.state.tray_now == 1
  1293. mqtt_client._process_message(_ams_payload(6))
  1294. assert mqtt_client.state.tray_now == 6
  1295. # ---------------------------------------------------------------------------
  1296. # 2b. Single-nozzle P2S — multi-AMS local slot disambiguation (#420)
  1297. # ---------------------------------------------------------------------------
  1298. class TestTrayNowP2SMultiAmsDisambiguation:
  1299. """P2S firmware sends local slot IDs (0-3) in tray_now even with dual AMS.
  1300. When ams_exist_bits indicates >1 AMS unit and tray_now is 0-3, the backend
  1301. should use the MQTT mapping field (snow-encoded) to resolve the correct
  1302. global tray ID.
  1303. """
  1304. @pytest.fixture
  1305. def mqtt_client(self):
  1306. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1307. client = BambuMQTTClient(
  1308. ip_address="192.168.1.100",
  1309. serial_number="TEST_P2S_DUAL",
  1310. access_code="12345678",
  1311. )
  1312. return client
  1313. def test_resolves_ams1_slot1_from_mapping(self, mqtt_client):
  1314. """tray_now=1 with mapping=[257] → global ID 5 (AMS1-T1).
  1315. 257 snow-decoded: ams_hw_id=1, slot=1 → global 1*4+1=5.
  1316. """
  1317. # Set mapping field in raw_data (as the MQTT handler would)
  1318. mqtt_client.state.raw_data["mapping"] = [257]
  1319. mqtt_client._process_message(
  1320. _ams_payload(1, ams_exist_bits="3") # '3' = 0b11 → AMS 0 and 1
  1321. )
  1322. assert mqtt_client.state.tray_now == 5
  1323. def test_resolves_ams1_slot0_from_mapping(self, mqtt_client):
  1324. """tray_now=0 with mapping=[256] → global ID 4 (AMS1-T0).
  1325. 256 snow-decoded: ams_hw_id=1, slot=0 → global 1*4+0=4.
  1326. """
  1327. mqtt_client.state.raw_data["mapping"] = [256]
  1328. mqtt_client._process_message(_ams_payload(0, ams_exist_bits="3"))
  1329. assert mqtt_client.state.tray_now == 4
  1330. def test_resolves_ams1_slot3_from_mapping(self, mqtt_client):
  1331. """tray_now=3 with mapping=[259] → global ID 7 (AMS1-T3).
  1332. 259 snow-decoded: ams_hw_id=1, slot=3 → global 1*4+3=7.
  1333. """
  1334. mqtt_client.state.raw_data["mapping"] = [259]
  1335. mqtt_client._process_message(_ams_payload(3, ams_exist_bits="3"))
  1336. assert mqtt_client.state.tray_now == 7
  1337. def test_ams0_slot_unchanged_when_mapping_confirms_ams0(self, mqtt_client):
  1338. """tray_now=1 with mapping=[1] → stays 1 (AMS0-T1).
  1339. 1 snow-decoded: ams_hw_id=0, slot=1 → global 0*4+1=1.
  1340. """
  1341. mqtt_client.state.raw_data["mapping"] = [1]
  1342. mqtt_client._process_message(_ams_payload(1, ams_exist_bits="3"))
  1343. assert mqtt_client.state.tray_now == 1
  1344. def test_multicolor_resolves_ams1_from_multi_entry_mapping(self, mqtt_client):
  1345. """Multi-color print: mapping=[0, 257] → tray_now=1 resolves to AMS1-T1 (5).
  1346. Entry 0: ams_hw_id=0, slot=0 (local 0) — doesn't match tray_now=1.
  1347. Entry 257: ams_hw_id=1, slot=1 (local 1) — matches tray_now=1 → global 5.
  1348. """
  1349. mqtt_client.state.raw_data["mapping"] = [0, 257]
  1350. mqtt_client._process_message(_ams_payload(1, ams_exist_bits="3"))
  1351. assert mqtt_client.state.tray_now == 5
  1352. def test_multicolor_four_slot_mapping(self, mqtt_client):
  1353. """mapping=[65535, 65535, 65535, 257] → tray_now=1 resolves to global 5.
  1354. Only entry 257 has local slot=1, other entries are unmapped (65535).
  1355. Reproduces exact data from issue #420 support package.
  1356. """
  1357. mqtt_client.state.raw_data["mapping"] = [65535, 65535, 65535, 257]
  1358. mqtt_client._process_message(_ams_payload(1, ams_exist_bits="3"))
  1359. assert mqtt_client.state.tray_now == 5
  1360. def test_ambiguous_mapping_falls_back_to_local_slot(self, mqtt_client):
  1361. """Two AMS units with same local slot in mapping → ambiguous, keep local slot.
  1362. mapping=[1, 257]: both have local slot 1 (AMS0-T1 and AMS1-T1).
  1363. Cannot disambiguate → fall back to tray_now=1.
  1364. """
  1365. mqtt_client.state.raw_data["mapping"] = [1, 257]
  1366. mqtt_client._process_message(_ams_payload(1, ams_exist_bits="3"))
  1367. assert mqtt_client.state.tray_now == 1
  1368. def test_no_mapping_falls_back_to_local_slot(self, mqtt_client):
  1369. """No mapping field available → fall back to raw tray_now."""
  1370. # No mapping in raw_data (e.g. manual filament load, not during print)
  1371. mqtt_client._process_message(_ams_payload(1, ams_exist_bits="3"))
  1372. assert mqtt_client.state.tray_now == 1
  1373. def test_empty_mapping_falls_back_to_local_slot(self, mqtt_client):
  1374. """Empty mapping list → fall back to raw tray_now."""
  1375. mqtt_client.state.raw_data["mapping"] = []
  1376. mqtt_client._process_message(_ams_payload(1, ams_exist_bits="3"))
  1377. assert mqtt_client.state.tray_now == 1
  1378. def test_single_ams_passthrough(self, mqtt_client):
  1379. """Single AMS (ams_exist_bits='1') → tray_now 0-3 is direct global ID."""
  1380. mqtt_client._process_message(_ams_payload(2, ams_exist_bits="1"))
  1381. assert mqtt_client.state.tray_now == 2
  1382. def test_no_ams_exist_bits_passthrough(self, mqtt_client):
  1383. """No ams_exist_bits in payload → fall back to raw tray_now."""
  1384. mqtt_client._process_message(_ams_payload(1))
  1385. assert mqtt_client.state.tray_now == 1
  1386. def test_tray_now_255_unaffected_by_multi_ams(self, mqtt_client):
  1387. """tray_now=255 (unloaded) passes through regardless of AMS count."""
  1388. mqtt_client.state.raw_data["mapping"] = [257]
  1389. mqtt_client._process_message(_ams_payload(255, ams_exist_bits="3"))
  1390. assert mqtt_client.state.tray_now == 255
  1391. def test_tray_now_above_3_unaffected(self, mqtt_client):
  1392. """tray_now > 3 is already a global ID and passes through directly."""
  1393. mqtt_client._process_message(_ams_payload(6, ams_exist_bits="3"))
  1394. assert mqtt_client.state.tray_now == 6
  1395. def test_last_loaded_tray_uses_resolved_global_id(self, mqtt_client):
  1396. """last_loaded_tray should reflect the resolved global ID, not local slot."""
  1397. mqtt_client.state.raw_data["mapping"] = [257]
  1398. mqtt_client.state.state = "RUNNING"
  1399. mqtt_client._process_message(_ams_payload(1, ams_exist_bits="3"))
  1400. assert mqtt_client.state.tray_now == 5
  1401. assert mqtt_client.state.last_loaded_tray == 5
  1402. class TestResolveLocalSlotFromMapping:
  1403. """Unit tests for _resolve_local_slot_from_mapping static method."""
  1404. def test_single_match_ams0(self):
  1405. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1406. assert BambuMQTTClient._resolve_local_slot_from_mapping(1, [1]) == 1
  1407. def test_single_match_ams1(self):
  1408. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1409. # 257 = 1*256 + 1 → AMS1 slot1 → global 5
  1410. assert BambuMQTTClient._resolve_local_slot_from_mapping(1, [257]) == 5
  1411. def test_single_match_ams2(self):
  1412. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1413. # 514 = 2*256 + 2 → AMS2 slot2 → global 10
  1414. assert BambuMQTTClient._resolve_local_slot_from_mapping(2, [514]) == 10
  1415. def test_unmapped_entries_skipped(self):
  1416. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1417. assert BambuMQTTClient._resolve_local_slot_from_mapping(1, [65535, 65535, 65535, 257]) == 5
  1418. def test_no_match_returns_none(self):
  1419. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1420. # mapping has slot 0 only, looking for slot 2
  1421. assert BambuMQTTClient._resolve_local_slot_from_mapping(2, [0]) is None
  1422. def test_ambiguous_returns_none(self):
  1423. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1424. # Both AMS0 slot1 (1) and AMS1 slot1 (257) → ambiguous
  1425. assert BambuMQTTClient._resolve_local_slot_from_mapping(1, [1, 257]) is None
  1426. def test_none_mapping_returns_none(self):
  1427. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1428. assert BambuMQTTClient._resolve_local_slot_from_mapping(1, None) is None
  1429. def test_empty_mapping_returns_none(self):
  1430. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1431. assert BambuMQTTClient._resolve_local_slot_from_mapping(1, []) is None
  1432. def test_ams_ht_slot0_match(self):
  1433. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1434. # AMS-HT id=128: snow = 128*256 + 0 = 32768
  1435. assert BambuMQTTClient._resolve_local_slot_from_mapping(0, [32768]) == 128
  1436. # ---------------------------------------------------------------------------
  1437. # 3. H2D Pro — initial state detection
  1438. # ---------------------------------------------------------------------------
  1439. class TestTrayNowDualNozzleH2DSetup:
  1440. """H2D Pro initial state detection."""
  1441. @pytest.fixture
  1442. def mqtt_client(self):
  1443. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1444. return BambuMQTTClient(
  1445. ip_address="192.168.1.100",
  1446. serial_number="TEST_H2D",
  1447. access_code="12345678",
  1448. )
  1449. def test_dual_nozzle_detected_from_extruder_info(self, mqtt_client):
  1450. """2 entries in device.extruder.info → _is_dual_nozzle=True."""
  1451. mqtt_client._process_message(
  1452. _extruder_info_payload(
  1453. [
  1454. {"id": 0, "snow": 0xFF00FF},
  1455. {"id": 1, "snow": 0xFF00FF},
  1456. ]
  1457. )
  1458. )
  1459. assert mqtt_client._is_dual_nozzle is True
  1460. def test_ams_extruder_map_parsed_from_info_field(self, mqtt_client):
  1461. """AMS info field is hex: 0x2003 → ext 0 (right), 0x2104 → ext 1 (left)."""
  1462. # MQTT sends info as string; BambuStudio parses as hex via stoull(str, 16)
  1463. ams_units = [
  1464. {"id": 0, "info": "2003", "tray": [{"id": i} for i in range(4)]},
  1465. {"id": 128, "info": "2104", "tray": [{"id": 0}]},
  1466. ]
  1467. payload = {
  1468. "print": {
  1469. "ams": {
  1470. "ams": ams_units,
  1471. "tray_now": "255",
  1472. "tray_exist_bits": "1000f",
  1473. },
  1474. }
  1475. }
  1476. mqtt_client._process_message(payload)
  1477. # 0x2003: bits 8-11 = (0x2003 >> 8) & 0xF = 0x20 & 0xF = 0 → extruder 0 (right)
  1478. # 0x2104: bits 8-11 = (0x2104 >> 8) & 0xF = 0x21 & 0xF = 1 → extruder 1 (left)
  1479. assert mqtt_client.state.ams_extruder_map == {"0": 0, "128": 1}
  1480. def test_ams_extruder_map_real_h2d_values(self, mqtt_client):
  1481. """Real H2D MQTT values: AMS2 Pro on right, AMS-HT on left."""
  1482. ams_units = [
  1483. {"id": 0, "info": "10001003", "tray": [{"id": i} for i in range(4)]},
  1484. {"id": 128, "info": "10002104", "tray": [{"id": 0}]},
  1485. ]
  1486. payload = {
  1487. "print": {
  1488. "ams": {
  1489. "ams": ams_units,
  1490. "tray_now": "255",
  1491. "tray_exist_bits": "1000a",
  1492. },
  1493. }
  1494. }
  1495. mqtt_client._process_message(payload)
  1496. # 0x10001003: bits 8-11 = (0x10001003 >> 8) & 0xF = 0x10 & 0xF = 0 → right
  1497. # 0x10002104: bits 8-11 = (0x10002104 >> 8) & 0xF = 0x21 & 0xF = 1 → left
  1498. assert mqtt_client.state.ams_extruder_map == {"0": 0, "128": 1}
  1499. def test_ams_extruder_map_skips_uninitialized(self, mqtt_client):
  1500. """extruder_id 0xE means uninitialized AMS — should be skipped."""
  1501. ams_units = [
  1502. {"id": 0, "info": "e03", "tray": [{"id": i} for i in range(4)]},
  1503. ]
  1504. payload = {
  1505. "print": {
  1506. "ams": {
  1507. "ams": ams_units,
  1508. "tray_now": "255",
  1509. "tray_exist_bits": "f",
  1510. },
  1511. }
  1512. }
  1513. mqtt_client._process_message(payload)
  1514. assert mqtt_client.state.ams_extruder_map == {}
  1515. def test_ams_extruder_map_partial_update_preserves_entries(self, mqtt_client):
  1516. """Partial MQTT update with one AMS should not overwrite other entries."""
  1517. # First: full update with both AMS units
  1518. full_payload = {
  1519. "print": {
  1520. "ams": {
  1521. "ams": [
  1522. {"id": 0, "info": "2003", "tray": [{"id": i} for i in range(4)]},
  1523. {"id": 128, "info": "2104", "tray": [{"id": 0}]},
  1524. ],
  1525. "tray_now": "255",
  1526. "tray_exist_bits": "1000f",
  1527. },
  1528. }
  1529. }
  1530. mqtt_client._process_message(full_payload)
  1531. assert mqtt_client.state.ams_extruder_map == {"0": 0, "128": 1}
  1532. # Then: partial update with only AMS 0 (no info field this time)
  1533. partial_payload = {
  1534. "print": {
  1535. "ams": {
  1536. "ams": [
  1537. {"id": 0, "tray": [{"id": 0, "remain": 50}]},
  1538. ],
  1539. "tray_now": "0",
  1540. "tray_exist_bits": "1000f",
  1541. },
  1542. }
  1543. }
  1544. mqtt_client._process_message(partial_payload)
  1545. # Both entries should still be present
  1546. assert mqtt_client.state.ams_extruder_map == {"0": 0, "128": 1}
  1547. def test_dual_nozzle_detection_before_ams_in_same_message(self, mqtt_client):
  1548. """Dual-nozzle detection at line 538 happens before _handle_ams_data() at line 549.
  1549. If both arrive in the same message, tray_now disambiguation already uses dual-nozzle logic.
  1550. """
  1551. payload = {
  1552. "print": {
  1553. "device": {
  1554. "extruder": {
  1555. "info": [
  1556. {"id": 0, "snow": 0xFF00FF},
  1557. {"id": 1, "snow": 0xFF00FF},
  1558. ],
  1559. "state": 0x0001,
  1560. }
  1561. },
  1562. "ams": {
  1563. "ams": [
  1564. {"id": 0, "info": "2003", "tray": [{"id": i} for i in range(4)]},
  1565. ],
  1566. "tray_now": "2",
  1567. "tray_exist_bits": "f",
  1568. },
  1569. }
  1570. }
  1571. mqtt_client._process_message(payload)
  1572. # Dual-nozzle was detected; AMS 0 on right extruder (active by default);
  1573. # snow is 0xFF00FF (unloaded), so falls through to ams_extruder_map fallback.
  1574. # Single AMS on extruder 0 → global_id = 0*4+2 = 2
  1575. assert mqtt_client._is_dual_nozzle is True
  1576. assert mqtt_client.state.tray_now == 2
  1577. # ---------------------------------------------------------------------------
  1578. # Shared H2D fixture for classes 4-8
  1579. # ---------------------------------------------------------------------------
  1580. class _H2DFixtureMixin:
  1581. """Mixin providing a pre-configured H2D Pro client."""
  1582. @pytest.fixture
  1583. def mqtt_client(self):
  1584. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1585. return BambuMQTTClient(
  1586. ip_address="192.168.1.100",
  1587. serial_number="TEST_H2D",
  1588. access_code="12345678",
  1589. )
  1590. @pytest.fixture
  1591. def h2d_client(self, mqtt_client):
  1592. """Pre-configure as H2D Pro: dual-nozzle + ams_extruder_map."""
  1593. mqtt_client._process_message(
  1594. {
  1595. "print": {
  1596. "device": {
  1597. "extruder": {
  1598. "info": [
  1599. {"id": 0, "snow": 0xFF00FF},
  1600. {"id": 1, "snow": 0xFF00FF},
  1601. ],
  1602. "state": 0x0001, # right extruder active
  1603. }
  1604. },
  1605. "ams": {
  1606. "ams": [
  1607. {"id": 0, "info": "2003", "tray": [{"id": i} for i in range(4)]},
  1608. {"id": 128, "info": "2104", "tray": [{"id": 0}]},
  1609. ],
  1610. "tray_now": "255",
  1611. "tray_exist_bits": "1000f",
  1612. },
  1613. }
  1614. }
  1615. )
  1616. assert mqtt_client._is_dual_nozzle is True
  1617. assert mqtt_client.state.ams_extruder_map == {"0": 0, "128": 1}
  1618. return mqtt_client
  1619. # ---------------------------------------------------------------------------
  1620. # 4. H2D Snow field disambiguation
  1621. # ---------------------------------------------------------------------------
  1622. class TestTrayNowDualNozzleH2DSnow(_H2DFixtureMixin):
  1623. """Snow field disambiguation (primary path)."""
  1624. def test_snow_disambiguates_ams0_slot(self, h2d_client):
  1625. """snow ext[0]=AMS 0 slot 2, tray_now='2' → global 2."""
  1626. # Send snow update FIRST (snow is parsed AFTER tray_now in the same message,
  1627. # so we need it in a prior message).
  1628. snow_val = 0 << 8 | 2 # AMS 0 slot 2 = raw 2
  1629. h2d_client._process_message(
  1630. _extruder_info_payload(
  1631. [
  1632. {"id": 0, "snow": snow_val},
  1633. {"id": 1, "snow": 0xFF00FF},
  1634. ]
  1635. )
  1636. )
  1637. assert h2d_client.state.h2d_extruder_snow.get(0) == 2
  1638. # Now send tray_now=2
  1639. h2d_client._process_message(_ams_payload(2))
  1640. assert h2d_client.state.tray_now == 2
  1641. def test_snow_disambiguates_ams_ht_to_128(self, h2d_client):
  1642. """snow ext[1]=AMS HT (128), left active, tray_now='0' → global 128."""
  1643. # Snow: extruder 1 → AMS 128 slot 0
  1644. snow_val = 128 << 8 | 0 # = 32768
  1645. h2d_client._process_message(
  1646. _extruder_info_payload(
  1647. [
  1648. {"id": 0, "snow": 0xFF00FF},
  1649. {"id": 1, "snow": snow_val},
  1650. ]
  1651. )
  1652. )
  1653. assert h2d_client.state.h2d_extruder_snow.get(1) == 128
  1654. # Switch to left extruder
  1655. h2d_client._process_message(_extruder_state_payload(0x0100))
  1656. assert h2d_client.state.active_extruder == 1
  1657. # tray_now="0" with left extruder active, snow says AMS HT (128)
  1658. # AMS HT snow_slot = 0 (single slot), parsed_tray_now = 0 → match
  1659. h2d_client._process_message(_ams_payload(0))
  1660. assert h2d_client.state.tray_now == 128
  1661. def test_snow_updates_h2d_extruder_snow_state(self, h2d_client):
  1662. """Verify state.h2d_extruder_snow dict is populated correctly."""
  1663. snow_ext0 = 1 << 8 | 3 # AMS 1 slot 3 → global 7
  1664. snow_ext1 = 0 << 8 | 0 # AMS 0 slot 0 → global 0
  1665. h2d_client._process_message(
  1666. _extruder_info_payload(
  1667. [
  1668. {"id": 0, "snow": snow_ext0},
  1669. {"id": 1, "snow": snow_ext1},
  1670. ]
  1671. )
  1672. )
  1673. assert h2d_client.state.h2d_extruder_snow[0] == 7
  1674. assert h2d_client.state.h2d_extruder_snow[1] == 0
  1675. def test_snow_unloaded_value(self, h2d_client):
  1676. """snow=0xFFFF (ams_id=255, slot=255) → 255 (unloaded)."""
  1677. h2d_client._process_message(
  1678. _extruder_info_payload(
  1679. [
  1680. {"id": 0, "snow": 0xFFFF},
  1681. {"id": 1, "snow": 0xFFFF},
  1682. ]
  1683. )
  1684. )
  1685. assert h2d_client.state.h2d_extruder_snow[0] == 255
  1686. assert h2d_client.state.h2d_extruder_snow[1] == 255
  1687. def test_snow_initial_sentinel_not_stored(self, h2d_client):
  1688. """snow=0xFF00FF (firmware initial sentinel) is not parsed into h2d_extruder_snow."""
  1689. # 0xFF00FF has ams_id=0xFF00=65280 which doesn't match any branch
  1690. h2d_client._process_message(
  1691. _extruder_info_payload(
  1692. [
  1693. {"id": 0, "snow": 0xFF00FF},
  1694. {"id": 1, "snow": 0xFF00FF},
  1695. ]
  1696. )
  1697. )
  1698. # Snow dict should remain empty (no matching branch)
  1699. assert h2d_client.state.h2d_extruder_snow == {}
  1700. # ---------------------------------------------------------------------------
  1701. # 5. H2D Pending target disambiguation
  1702. # ---------------------------------------------------------------------------
  1703. class TestTrayNowDualNozzleH2DPendingTarget(_H2DFixtureMixin):
  1704. """Pending target disambiguation (when Bambuddy initiates load)."""
  1705. def test_pending_target_matches_slot(self, h2d_client):
  1706. """pending=5, tray_now='1' (5%4=1 matches) → tray_now=5."""
  1707. h2d_client.state.pending_tray_target = 5
  1708. h2d_client._process_message(_ams_payload(1))
  1709. assert h2d_client.state.tray_now == 5
  1710. assert h2d_client.state.pending_tray_target is None # cleared
  1711. def test_pending_target_slot_mismatch(self, h2d_client):
  1712. """pending=5, tray_now='2' → uses raw slot, clears pending."""
  1713. h2d_client.state.pending_tray_target = 5
  1714. h2d_client._process_message(_ams_payload(2))
  1715. # Slot 2 != 5%4=1 → mismatch, uses raw slot 2
  1716. assert h2d_client.state.tray_now == 2
  1717. assert h2d_client.state.pending_tray_target is None
  1718. def test_pending_target_takes_priority_over_snow(self, h2d_client):
  1719. """When both pending and snow are set, pending wins."""
  1720. # Set up snow for extruder 0 → AMS 0 slot 1 → global 1
  1721. snow_val = 0 << 8 | 1
  1722. h2d_client._process_message(
  1723. _extruder_info_payload(
  1724. [
  1725. {"id": 0, "snow": snow_val},
  1726. {"id": 1, "snow": 0xFF00FF},
  1727. ]
  1728. )
  1729. )
  1730. assert h2d_client.state.h2d_extruder_snow.get(0) == 1
  1731. # Set pending target to AMS 1 slot 1 (global 5)
  1732. h2d_client.state.pending_tray_target = 5
  1733. # tray_now="1" — matches pending (5%4=1), pending should win over snow
  1734. h2d_client._process_message(_ams_payload(1))
  1735. assert h2d_client.state.tray_now == 5
  1736. # ---------------------------------------------------------------------------
  1737. # 6. H2D ams_extruder_map fallback
  1738. # ---------------------------------------------------------------------------
  1739. class TestTrayNowDualNozzleH2DFallback(_H2DFixtureMixin):
  1740. """ams_extruder_map fallback (no pending, no snow)."""
  1741. def test_single_ams_on_extruder_computes_global_id(self, h2d_client):
  1742. """AMS 0 on right extruder, tray_now='2' → 0*4+2=2."""
  1743. # h2d_client has snow=0xFF00FF (unloaded) by default, so snow path skips
  1744. h2d_client._process_message(_ams_payload(2))
  1745. # AMS 0 is the only AMS on extruder 0 (right, active by default)
  1746. # Fallback: single AMS → global = 0*4+2 = 2
  1747. assert h2d_client.state.tray_now == 2
  1748. def test_multiple_ams_keeps_current_if_valid(self, h2d_client):
  1749. """Current tray matches slot → keeps it (multi-AMS on same extruder)."""
  1750. # Set up: two AMS units on the same extruder (right, ext 0)
  1751. h2d_client.state.ams_extruder_map = {"0": 0, "1": 0}
  1752. # Pre-set tray_now=5 (AMS 1 slot 1) — current_ams=1 which is in ams_on_extruder
  1753. h2d_client.state.tray_now = 5
  1754. # tray_now="1" → 5%4=1 matches → keep current=5
  1755. h2d_client._process_message(_ams_payload(1))
  1756. assert h2d_client.state.tray_now == 5
  1757. def test_no_ams_on_extruder_uses_raw_slot(self, h2d_client):
  1758. """No AMS mapped to the active extruder → raw slot as global ID."""
  1759. # All AMS on left extruder, but right is active
  1760. h2d_client.state.ams_extruder_map = {"0": 1, "128": 1}
  1761. h2d_client._process_message(_ams_payload(2))
  1762. assert h2d_client.state.tray_now == 2
  1763. def test_single_ams_ht_on_extruder_returns_unit_id(self, h2d_client):
  1764. """AMS-HT 128 alone on left extruder, slot 0 → global ID 128 (not 512)."""
  1765. # Switch to left extruder (where AMS-HT 128 is mapped)
  1766. h2d_client._process_message(_extruder_state_payload(0x0100))
  1767. # Only AMS-HT 128 on left extruder; no snow available
  1768. h2d_client._process_message(_ams_payload(0))
  1769. assert h2d_client.state.tray_now == 128
  1770. def test_single_ams_ht_ignores_nonzero_slot(self, h2d_client):
  1771. """AMS-HT has single slot; even if printer reports slot 1, global ID = unit ID."""
  1772. h2d_client.state.ams_extruder_map = {"129": 0}
  1773. h2d_client._process_message(_ams_payload(1))
  1774. # AMS-HT 129: global ID = 129, not 129*4+1=517
  1775. assert h2d_client.state.tray_now == 129
  1776. def test_multiple_ams_keeps_current_ams_ht(self, h2d_client):
  1777. """Current tray is AMS-HT 128, slot 0 reported → keeps 128."""
  1778. h2d_client.state.ams_extruder_map = {"0": 0, "128": 0}
  1779. h2d_client.state.tray_now = 128
  1780. h2d_client._process_message(_ams_payload(0))
  1781. assert h2d_client.state.tray_now == 128
  1782. def test_multiple_ams_slot_nonzero_excludes_ams_ht(self, h2d_client):
  1783. """Slot > 0 eliminates AMS-HT candidates; single regular AMS left → resolves."""
  1784. # AMS 0 + AMS-HT 128 both on right extruder
  1785. h2d_client.state.ams_extruder_map = {"0": 0, "128": 0}
  1786. h2d_client.state.tray_now = 255 # no current match
  1787. # Slot 2 → can't be AMS-HT → only AMS 0 → global = 0*4+2 = 2
  1788. h2d_client._process_message(_ams_payload(2))
  1789. assert h2d_client.state.tray_now == 2
  1790. def test_multiple_ams_slot_nonzero_narrows_to_single_ht_excluded(self, h2d_client):
  1791. """Two regular AMS + one AMS-HT, slot > 0 → AMS-HT excluded but still ambiguous."""
  1792. h2d_client.state.ams_extruder_map = {"0": 0, "1": 0, "128": 0}
  1793. h2d_client.state.tray_now = 255
  1794. # Slot 3 → excludes AMS-HT, but AMS 0 and AMS 1 both remain → ambiguous
  1795. h2d_client._process_message(_ams_payload(3))
  1796. assert h2d_client.state.tray_now == 3 # raw slot fallback
  1797. # ---------------------------------------------------------------------------
  1798. # 6b. H2D last_loaded_tray validation
  1799. # ---------------------------------------------------------------------------
  1800. class TestLastLoadedTrayValidation(_H2DFixtureMixin):
  1801. """last_loaded_tray only stores physically valid tray IDs."""
  1802. def test_regular_ams_tray_stored(self, h2d_client):
  1803. """Valid regular AMS tray (0-15) → stored in last_loaded_tray."""
  1804. h2d_client.state.tray_now = 7
  1805. # Trigger tray_now processing via AMS message
  1806. h2d_client._process_message(
  1807. _extruder_info_payload(
  1808. [
  1809. {"id": 0, "snow": 1 << 8 | 3}, # AMS 1 slot 3 → global 7
  1810. {"id": 1, "snow": 0xFF00FF},
  1811. ]
  1812. )
  1813. )
  1814. h2d_client._process_message(_ams_payload(3))
  1815. assert h2d_client.state.tray_now == 7
  1816. assert h2d_client.state.last_loaded_tray == 7
  1817. def test_ams_ht_tray_stored(self, h2d_client):
  1818. """Valid AMS-HT tray (128-135) → stored in last_loaded_tray."""
  1819. h2d_client._process_message(_extruder_state_payload(0x0100))
  1820. h2d_client._process_message(
  1821. _extruder_info_payload(
  1822. [
  1823. {"id": 0, "snow": 0xFF00FF},
  1824. {"id": 1, "snow": 128 << 8 | 0},
  1825. ]
  1826. )
  1827. )
  1828. h2d_client._process_message(_ams_payload(0))
  1829. assert h2d_client.state.tray_now == 128
  1830. assert h2d_client.state.last_loaded_tray == 128
  1831. def test_unloaded_not_stored(self, h2d_client):
  1832. """tray_now=255 (unloaded) → last_loaded_tray unchanged."""
  1833. h2d_client.state.last_loaded_tray = 5
  1834. h2d_client._process_message(_ams_payload(255))
  1835. assert h2d_client.state.tray_now == 255
  1836. assert h2d_client.state.last_loaded_tray == 5
  1837. # ---------------------------------------------------------------------------
  1838. # 7. H2D Active extruder switching
  1839. # ---------------------------------------------------------------------------
  1840. class TestTrayNowDualNozzleH2DActiveExtruder(_H2DFixtureMixin):
  1841. """Active extruder switching via device.extruder.state bit 8."""
  1842. def test_active_extruder_right_by_default(self, h2d_client):
  1843. """Initial state.active_extruder == 0 (right)."""
  1844. assert h2d_client.state.active_extruder == 0
  1845. def test_extruder_state_bit8_switches_to_left(self, h2d_client):
  1846. """state=0x100 → active_extruder=1 (left)."""
  1847. h2d_client._process_message(_extruder_state_payload(0x0100))
  1848. assert h2d_client.state.active_extruder == 1
  1849. def test_extruder_state_bit8_switches_back_to_right(self, h2d_client):
  1850. """Cycle 0 → 1 → 0."""
  1851. h2d_client._process_message(_extruder_state_payload(0x0100))
  1852. assert h2d_client.state.active_extruder == 1
  1853. h2d_client._process_message(_extruder_state_payload(0x0001))
  1854. assert h2d_client.state.active_extruder == 0
  1855. def test_extruder_switch_changes_tray_disambiguation(self, h2d_client):
  1856. """Snow on both extruders; switching active changes which snow is used."""
  1857. # Snow: ext 0 → AMS 0 slot 1 (global 1), ext 1 → AMS 128 slot 0 (global 128)
  1858. h2d_client._process_message(
  1859. _extruder_info_payload(
  1860. [
  1861. {"id": 0, "snow": 0 << 8 | 1}, # AMS 0 slot 1 → global 1
  1862. {"id": 1, "snow": 128 << 8 | 0}, # AMS HT → global 128
  1863. ]
  1864. )
  1865. )
  1866. # Right active (default) — tray_now="1" → snow ext[0] says global 1
  1867. h2d_client._process_message(_ams_payload(1))
  1868. assert h2d_client.state.tray_now == 1
  1869. # Switch to left
  1870. h2d_client._process_message(_extruder_state_payload(0x0100))
  1871. # Left active — tray_now="0" → snow ext[1] says AMS HT (128), slot 0 matches
  1872. h2d_client._process_message(_ams_payload(0))
  1873. assert h2d_client.state.tray_now == 128
  1874. # ---------------------------------------------------------------------------
  1875. # 8. H2D Full multi-message sequences
  1876. # ---------------------------------------------------------------------------
  1877. class TestTrayNowDualNozzleH2DFullSequence(_H2DFixtureMixin):
  1878. """Multi-message sequences simulating real H2D Pro prints."""
  1879. def test_h2d_right_nozzle_ams0_lifecycle(self, h2d_client):
  1880. """Setup → load AMS 0 slot 1 → verify tray_now=1."""
  1881. # Snow update: extruder 0 loading AMS 0 slot 1
  1882. h2d_client._process_message(
  1883. _extruder_info_payload(
  1884. [
  1885. {"id": 0, "snow": 0 << 8 | 1},
  1886. {"id": 1, "snow": 0xFF00FF},
  1887. ]
  1888. )
  1889. )
  1890. # Printer reports tray_now="1"
  1891. h2d_client._process_message(_ams_payload(1))
  1892. assert h2d_client.state.tray_now == 1
  1893. assert h2d_client.state.last_loaded_tray == 1
  1894. def test_h2d_left_nozzle_ams_ht_lifecycle(self, h2d_client):
  1895. """Setup → switch left → load AMS HT → verify tray_now=128."""
  1896. # Switch to left extruder
  1897. h2d_client._process_message(_extruder_state_payload(0x0100))
  1898. # Snow: ext 1 → AMS HT slot 0
  1899. h2d_client._process_message(
  1900. _extruder_info_payload(
  1901. [
  1902. {"id": 0, "snow": 0xFF00FF},
  1903. {"id": 1, "snow": 128 << 8 | 0},
  1904. ]
  1905. )
  1906. )
  1907. # Printer reports tray_now="0" (AMS HT single slot)
  1908. h2d_client._process_message(_ams_payload(0))
  1909. assert h2d_client.state.tray_now == 128
  1910. assert h2d_client.state.last_loaded_tray == 128
  1911. def test_h2d_multi_color_alternating_nozzles(self, h2d_client):
  1912. """Multi-color print alternating between right and left nozzles.
  1913. Sequence:
  1914. 1. Right loads AMS 0 slot 0 (tray=0)
  1915. 2. Switch left, load AMS HT (tray=128)
  1916. 3. Switch right, snow updates, load AMS 0 slot 2 (tray=2)
  1917. 4. Unload (255)
  1918. """
  1919. # Step 1: Right extruder loads AMS 0 slot 0
  1920. h2d_client._process_message(
  1921. _extruder_info_payload(
  1922. [
  1923. {"id": 0, "snow": 0 << 8 | 0},
  1924. {"id": 1, "snow": 0xFF00FF},
  1925. ]
  1926. )
  1927. )
  1928. h2d_client._process_message(_ams_payload(0))
  1929. assert h2d_client.state.tray_now == 0
  1930. # Step 2: Switch to left, load AMS HT
  1931. h2d_client._process_message(_extruder_state_payload(0x0100))
  1932. h2d_client._process_message(
  1933. _extruder_info_payload(
  1934. [
  1935. {"id": 0, "snow": 0 << 8 | 0},
  1936. {"id": 1, "snow": 128 << 8 | 0},
  1937. ]
  1938. )
  1939. )
  1940. h2d_client._process_message(_ams_payload(0))
  1941. assert h2d_client.state.tray_now == 128
  1942. # Step 3: Switch back to right, load AMS 0 slot 2
  1943. h2d_client._process_message(_extruder_state_payload(0x0001))
  1944. h2d_client._process_message(
  1945. _extruder_info_payload(
  1946. [
  1947. {"id": 0, "snow": 0 << 8 | 2},
  1948. {"id": 1, "snow": 128 << 8 | 0},
  1949. ]
  1950. )
  1951. )
  1952. h2d_client._process_message(_ams_payload(2))
  1953. assert h2d_client.state.tray_now == 2
  1954. # Step 4: Unload
  1955. h2d_client._process_message(_ams_payload(255))
  1956. assert h2d_client.state.tray_now == 255
  1957. assert h2d_client.state.last_loaded_tray == 2
  1958. class TestTrayChangeLog:
  1959. """Tests for tray_change_log tracking during prints (mid-print tray switch)."""
  1960. @pytest.fixture
  1961. def mqtt_client(self):
  1962. """Create a BambuMQTTClient instance for testing."""
  1963. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1964. client = BambuMQTTClient(
  1965. ip_address="192.168.1.100",
  1966. serial_number="TRAYLOG1",
  1967. access_code="12345678",
  1968. )
  1969. return client
  1970. def test_tray_change_log_defaults_empty(self, mqtt_client):
  1971. """tray_change_log starts as an empty list."""
  1972. assert mqtt_client.state.tray_change_log == []
  1973. def test_tray_change_log_seeded_on_print_start(self, mqtt_client):
  1974. """Print start clears log and seeds with initial tray at layer 0."""
  1975. mqtt_client.state.tray_now = 2
  1976. mqtt_client.state.last_loaded_tray = 2
  1977. mqtt_client._previous_gcode_state = "IDLE"
  1978. # Transition to RUNNING via _process_message
  1979. mqtt_client._process_message(
  1980. {
  1981. "print": {
  1982. "gcode_state": "RUNNING",
  1983. "gcode_file": "test.3mf",
  1984. }
  1985. }
  1986. )
  1987. assert mqtt_client.state.tray_change_log == [(2, 0)]
  1988. def test_tray_change_log_cleared_on_new_print(self, mqtt_client):
  1989. """Old log entries are cleared when a new print starts."""
  1990. mqtt_client.state.tray_change_log = [(5, 0), (3, 100)]
  1991. mqtt_client.state.tray_now = 1
  1992. mqtt_client.state.last_loaded_tray = 1
  1993. mqtt_client._previous_gcode_state = "IDLE"
  1994. mqtt_client._process_message(
  1995. {
  1996. "print": {
  1997. "gcode_state": "RUNNING",
  1998. "gcode_file": "new.3mf",
  1999. }
  2000. }
  2001. )
  2002. assert mqtt_client.state.tray_change_log == [(1, 0)]
  2003. def test_tray_change_recorded_during_running(self, mqtt_client):
  2004. """Tray change while RUNNING is appended to the log."""
  2005. mqtt_client.state.state = "RUNNING"
  2006. mqtt_client.state.layer_num = 50
  2007. mqtt_client.state.last_loaded_tray = 0
  2008. mqtt_client.state.tray_change_log = [(0, 0)]
  2009. # Simulate tray_now update via AMS data
  2010. mqtt_client.state.tray_now = 1
  2011. # Trigger the tracking code path
  2012. tn = mqtt_client.state.tray_now
  2013. if tn != mqtt_client.state.last_loaded_tray and mqtt_client.state.state in ("RUNNING", "PAUSE"):
  2014. mqtt_client.state.tray_change_log.append((tn, mqtt_client.state.layer_num))
  2015. mqtt_client.state.last_loaded_tray = tn
  2016. assert mqtt_client.state.tray_change_log == [(0, 0), (1, 50)]
  2017. def test_tray_change_not_recorded_when_idle(self, mqtt_client):
  2018. """Tray changes while IDLE are NOT logged."""
  2019. mqtt_client.state.state = "IDLE"
  2020. mqtt_client.state.layer_num = 0
  2021. mqtt_client.state.last_loaded_tray = 0
  2022. mqtt_client.state.tray_change_log = []
  2023. mqtt_client.state.tray_now = 3
  2024. tn = mqtt_client.state.tray_now
  2025. if tn != mqtt_client.state.last_loaded_tray and mqtt_client.state.state in ("RUNNING", "PAUSE"):
  2026. mqtt_client.state.tray_change_log.append((tn, mqtt_client.state.layer_num))
  2027. mqtt_client.state.last_loaded_tray = tn
  2028. assert mqtt_client.state.tray_change_log == []
  2029. def test_tray_change_recorded_during_pause(self, mqtt_client):
  2030. """Tray change while PAUSE is also logged (AMS can swap during pause)."""
  2031. mqtt_client.state.state = "PAUSE"
  2032. mqtt_client.state.layer_num = 75
  2033. mqtt_client.state.last_loaded_tray = 2
  2034. mqtt_client.state.tray_change_log = [(2, 0)]
  2035. mqtt_client.state.tray_now = 5
  2036. tn = mqtt_client.state.tray_now
  2037. if tn != mqtt_client.state.last_loaded_tray and mqtt_client.state.state in ("RUNNING", "PAUSE"):
  2038. mqtt_client.state.tray_change_log.append((tn, mqtt_client.state.layer_num))
  2039. mqtt_client.state.last_loaded_tray = tn
  2040. assert mqtt_client.state.tray_change_log == [(2, 0), (5, 75)]
  2041. def test_same_tray_not_logged_twice(self, mqtt_client):
  2042. """Same tray value doesn't create duplicate log entries."""
  2043. mqtt_client.state.state = "RUNNING"
  2044. mqtt_client.state.layer_num = 30
  2045. mqtt_client.state.last_loaded_tray = 2
  2046. mqtt_client.state.tray_change_log = [(2, 0)]
  2047. # Same tray again
  2048. mqtt_client.state.tray_now = 2
  2049. tn = mqtt_client.state.tray_now
  2050. if tn != mqtt_client.state.last_loaded_tray and mqtt_client.state.state in ("RUNNING", "PAUSE"):
  2051. mqtt_client.state.tray_change_log.append((tn, mqtt_client.state.layer_num))
  2052. mqtt_client.state.last_loaded_tray = tn
  2053. assert mqtt_client.state.tray_change_log == [(2, 0)]
  2054. def test_multiple_tray_changes(self, mqtt_client):
  2055. """Multiple tray changes create a full history."""
  2056. mqtt_client.state.state = "RUNNING"
  2057. mqtt_client.state.last_loaded_tray = 0
  2058. mqtt_client.state.tray_change_log = [(0, 0)]
  2059. changes = [(1, 50), (3, 120), (0, 200)]
  2060. for tray, layer in changes:
  2061. mqtt_client.state.tray_now = tray
  2062. mqtt_client.state.layer_num = layer
  2063. tn = mqtt_client.state.tray_now
  2064. if tn != mqtt_client.state.last_loaded_tray and mqtt_client.state.state in ("RUNNING", "PAUSE"):
  2065. mqtt_client.state.tray_change_log.append((tn, mqtt_client.state.layer_num))
  2066. mqtt_client.state.last_loaded_tray = tn
  2067. assert mqtt_client.state.tray_change_log == [(0, 0), (1, 50), (3, 120), (0, 200)]
  2068. class TestDeveloperModeDetection:
  2069. """Tests for developer LAN mode detection from MQTT 'fun' field."""
  2070. @pytest.fixture
  2071. def mqtt_client(self):
  2072. """Create a BambuMQTTClient instance for testing."""
  2073. from backend.app.services.bambu_mqtt import BambuMQTTClient
  2074. client = BambuMQTTClient(
  2075. ip_address="192.168.1.100",
  2076. serial_number="TEST123",
  2077. access_code="12345678",
  2078. )
  2079. return client
  2080. def test_developer_mode_initially_none(self, mqtt_client):
  2081. """Verify developer_mode starts as None (unknown)."""
  2082. assert mqtt_client.state.developer_mode is None
  2083. def test_developer_mode_on_when_bit_clear(self, mqtt_client):
  2084. """Verify developer_mode is True when bit 0x20000000 is clear."""
  2085. # Bit 29 clear in lower 32 bits = developer mode ON
  2086. payload = {
  2087. "print": {
  2088. "gcode_state": "IDLE",
  2089. "fun": "1C8187FF9CFF",
  2090. }
  2091. }
  2092. mqtt_client._process_message(payload)
  2093. assert mqtt_client.state.developer_mode is True
  2094. def test_developer_mode_off_when_bit_set(self, mqtt_client):
  2095. """Verify developer_mode is False when bit 0x20000000 is set."""
  2096. # Bit 29 set in lower 32 bits = developer mode OFF (encryption required)
  2097. payload = {
  2098. "print": {
  2099. "gcode_state": "IDLE",
  2100. "fun": "1C81A7FF9CFF",
  2101. }
  2102. }
  2103. mqtt_client._process_message(payload)
  2104. assert mqtt_client.state.developer_mode is False
  2105. def test_developer_mode_exact_bit_check(self, mqtt_client):
  2106. """Verify only bit 0x20000000 matters, not other bits."""
  2107. # 0x20000000 in hex = bit 29. Set ONLY that bit.
  2108. payload = {
  2109. "print": {
  2110. "gcode_state": "IDLE",
  2111. "fun": "000020000000",
  2112. }
  2113. }
  2114. mqtt_client._process_message(payload)
  2115. assert mqtt_client.state.developer_mode is False
  2116. # All zeros = all bits clear = developer mode ON
  2117. payload["print"]["fun"] = "000000000000"
  2118. mqtt_client._process_message(payload)
  2119. assert mqtt_client.state.developer_mode is True
  2120. def test_developer_mode_invalid_fun_ignored(self, mqtt_client):
  2121. """Verify invalid fun values don't crash or change state."""
  2122. mqtt_client.state.developer_mode = True
  2123. payload = {
  2124. "print": {
  2125. "gcode_state": "IDLE",
  2126. "fun": "not_a_hex_value",
  2127. }
  2128. }
  2129. mqtt_client._process_message(payload)
  2130. # Should remain unchanged
  2131. assert mqtt_client.state.developer_mode is True
  2132. def test_developer_mode_missing_fun_preserves_state(self, mqtt_client):
  2133. """Verify messages without fun field don't reset developer_mode."""
  2134. mqtt_client.state.developer_mode = False
  2135. payload = {
  2136. "print": {
  2137. "gcode_state": "RUNNING",
  2138. "mc_percent": 50,
  2139. }
  2140. }
  2141. mqtt_client._process_message(payload)
  2142. assert mqtt_client.state.developer_mode is False
  2143. def test_developer_mode_persists_across_messages(self, mqtt_client):
  2144. """Verify developer_mode set by fun persists across messages without fun."""
  2145. # First message sets developer_mode
  2146. mqtt_client._process_message(
  2147. {
  2148. "print": {
  2149. "gcode_state": "IDLE",
  2150. "fun": "3EC1AFFF9CFF",
  2151. }
  2152. }
  2153. )
  2154. assert mqtt_client.state.developer_mode is False
  2155. # Subsequent messages without fun don't change it
  2156. for _ in range(3):
  2157. mqtt_client._process_message(
  2158. {
  2159. "print": {
  2160. "gcode_state": "RUNNING",
  2161. "mc_percent": 50,
  2162. }
  2163. }
  2164. )
  2165. assert mqtt_client.state.developer_mode is False
  2166. class TestSendDryingCommand:
  2167. """Tests for send_drying_command MQTT payload construction."""
  2168. @pytest.fixture
  2169. def mqtt_client(self):
  2170. """Create a BambuMQTTClient with a mock MQTT client."""
  2171. from unittest.mock import MagicMock
  2172. from backend.app.services.bambu_mqtt import BambuMQTTClient
  2173. client = BambuMQTTClient(
  2174. ip_address="192.168.1.100",
  2175. serial_number="TEST123",
  2176. access_code="12345678",
  2177. )
  2178. client._client = MagicMock()
  2179. return client
  2180. def test_rotate_tray_false_by_default(self, mqtt_client):
  2181. """Verify rotate_tray defaults to False in the MQTT payload."""
  2182. mqtt_client.send_drying_command(ams_id=0, temp=55, duration=4, mode=1, filament="PLA")
  2183. call_args = mqtt_client._client.publish.call_args
  2184. payload = json.loads(call_args[0][1])
  2185. assert payload["print"]["rotate_tray"] is False
  2186. def test_rotate_tray_true_when_enabled(self, mqtt_client):
  2187. """Verify rotate_tray is True when explicitly enabled."""
  2188. mqtt_client.send_drying_command(ams_id=0, temp=55, duration=4, mode=1, filament="PLA", rotate_tray=True)
  2189. call_args = mqtt_client._client.publish.call_args
  2190. payload = json.loads(call_args[0][1])
  2191. assert payload["print"]["rotate_tray"] is True
  2192. def test_rotate_tray_false_on_stop(self, mqtt_client):
  2193. """Verify rotate_tray is False when stopping drying (mode=0)."""
  2194. mqtt_client.send_drying_command(ams_id=0, temp=0, duration=0, mode=0)
  2195. call_args = mqtt_client._client.publish.call_args
  2196. payload = json.loads(call_args[0][1])
  2197. assert payload["print"]["rotate_tray"] is False
  2198. def test_all_required_fields_present(self, mqtt_client):
  2199. """Verify all required MQTT fields are present in the drying command."""
  2200. mqtt_client.send_drying_command(ams_id=128, temp=75, duration=8, mode=1, filament="ABS", rotate_tray=True)
  2201. call_args = mqtt_client._client.publish.call_args
  2202. payload = json.loads(call_args[0][1])
  2203. cmd = payload["print"]
  2204. assert cmd["command"] == "ams_filament_drying"
  2205. assert cmd["ams_id"] == 128
  2206. assert cmd["temp"] == 75
  2207. assert cmd["duration"] == 8
  2208. assert cmd["mode"] == 1
  2209. assert cmd["rotate_tray"] is True
  2210. assert cmd["filament"] == "ABS"
  2211. assert cmd["cooling_temp"] == 20
  2212. assert cmd["humidity"] == 0
  2213. assert cmd["close_power_conflict"] is False
  2214. assert "sequence_id" in cmd
  2215. def test_publishes_with_qos_1(self, mqtt_client):
  2216. """Verify drying commands are published with QoS 1."""
  2217. mqtt_client.send_drying_command(ams_id=0, temp=55, duration=4)
  2218. call_args = mqtt_client._client.publish.call_args
  2219. # qos may be positional arg [2] or keyword
  2220. qos = call_args.kwargs.get("qos", call_args[0][2] if len(call_args[0]) > 2 else None)
  2221. assert qos == 1