test_bambu_mqtt.py 87 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305
  1. """
  2. Tests for the BambuMQTTClient service.
  3. These tests focus on timelapse tracking during prints.
  4. """
  5. import pytest
  6. class TestTimelapseTracking:
  7. """Tests for timelapse state tracking during prints."""
  8. @pytest.fixture
  9. def mqtt_client(self):
  10. """Create a BambuMQTTClient instance for testing."""
  11. from backend.app.services.bambu_mqtt import BambuMQTTClient
  12. client = BambuMQTTClient(
  13. ip_address="192.168.1.100",
  14. serial_number="TEST123",
  15. access_code="12345678",
  16. )
  17. return client
  18. def test_timelapse_flag_initializes_to_false(self, mqtt_client):
  19. """Verify _timelapse_during_print starts as False."""
  20. assert mqtt_client._timelapse_during_print is False
  21. def test_timelapse_flag_set_when_timelapse_active_during_running(self, mqtt_client):
  22. """Verify timelapse flag is set when timelapse is active while printing."""
  23. # Simulate print running
  24. mqtt_client._was_running = True
  25. mqtt_client.state.timelapse = False
  26. # Simulate xcam data showing timelapse is enabled
  27. xcam_data = {"timelapse": "enable"}
  28. mqtt_client._parse_xcam_data(xcam_data)
  29. assert mqtt_client.state.timelapse is True
  30. assert mqtt_client._timelapse_during_print is True
  31. def test_timelapse_flag_not_set_when_not_running(self, mqtt_client):
  32. """Verify timelapse flag is NOT set when printer not running."""
  33. # Printer is idle (not running)
  34. mqtt_client._was_running = False
  35. mqtt_client.state.timelapse = False
  36. # Timelapse is enabled but we're not printing
  37. xcam_data = {"timelapse": "enable"}
  38. mqtt_client._parse_xcam_data(xcam_data)
  39. assert mqtt_client.state.timelapse is True
  40. # Flag should NOT be set since we're not printing
  41. assert mqtt_client._timelapse_during_print is False
  42. def test_timelapse_flag_persists_after_timelapse_stops(self, mqtt_client):
  43. """Verify timelapse flag stays True even after recording stops."""
  44. # Simulate print running with timelapse
  45. mqtt_client._was_running = True
  46. # Enable timelapse during print
  47. xcam_data = {"timelapse": "enable"}
  48. mqtt_client._parse_xcam_data(xcam_data)
  49. assert mqtt_client._timelapse_during_print is True
  50. # Disable timelapse (recording stops at end of print)
  51. xcam_data = {"timelapse": "disable"}
  52. mqtt_client._parse_xcam_data(xcam_data)
  53. # Flag should still be True (persists until reset)
  54. assert mqtt_client.state.timelapse is False
  55. assert mqtt_client._timelapse_during_print is True
  56. def test_timelapse_flag_from_print_data(self, mqtt_client):
  57. """Verify timelapse flag is set from print data (not just xcam)."""
  58. # Simulate print running
  59. mqtt_client._was_running = True
  60. mqtt_client.state.timelapse = False
  61. mqtt_client._timelapse_during_print = False
  62. # Manually test the timelapse parsing logic from _parse_print_data
  63. # This tests the "timelapse" field in the main print data
  64. data = {"timelapse": True}
  65. mqtt_client.state.timelapse = data["timelapse"] is True
  66. if mqtt_client.state.timelapse and mqtt_client._was_running:
  67. mqtt_client._timelapse_during_print = True
  68. assert mqtt_client._timelapse_during_print is True
  69. class TestPrintCompletionWithTimelapse:
  70. """Tests for print completion including timelapse flag."""
  71. @pytest.fixture
  72. def mqtt_client(self):
  73. """Create a BambuMQTTClient instance for testing."""
  74. from backend.app.services.bambu_mqtt import BambuMQTTClient
  75. client = BambuMQTTClient(
  76. ip_address="192.168.1.100",
  77. serial_number="TEST123",
  78. access_code="12345678",
  79. )
  80. return client
  81. def test_print_complete_includes_timelapse_flag(self, mqtt_client):
  82. """Verify print complete callback includes timelapse_was_active."""
  83. # Set up completion callback
  84. callback_data = {}
  85. def on_complete(data):
  86. callback_data.update(data)
  87. mqtt_client.on_print_complete = on_complete
  88. # Simulate a print that had timelapse active
  89. mqtt_client._was_running = True
  90. mqtt_client._completion_triggered = False
  91. mqtt_client._timelapse_during_print = True
  92. mqtt_client._previous_gcode_state = "RUNNING"
  93. mqtt_client._previous_gcode_file = "test.gcode"
  94. mqtt_client.state.subtask_name = "Test Print"
  95. # Simulate print finish
  96. mqtt_client.state.state = "FINISH"
  97. # Manually trigger the completion logic (simplified)
  98. # In real code this happens in _parse_print_data
  99. should_trigger = (
  100. mqtt_client.state.state in ("FINISH", "FAILED")
  101. and not mqtt_client._completion_triggered
  102. and mqtt_client.on_print_complete
  103. and mqtt_client._previous_gcode_state == "RUNNING"
  104. )
  105. if should_trigger:
  106. status = "completed" if mqtt_client.state.state == "FINISH" else "failed"
  107. timelapse_was_active = mqtt_client._timelapse_during_print
  108. mqtt_client._completion_triggered = True
  109. mqtt_client._was_running = False
  110. mqtt_client._timelapse_during_print = False
  111. mqtt_client.on_print_complete(
  112. {
  113. "status": status,
  114. "filename": mqtt_client._previous_gcode_file,
  115. "subtask_name": mqtt_client.state.subtask_name,
  116. "timelapse_was_active": timelapse_was_active,
  117. }
  118. )
  119. assert "timelapse_was_active" in callback_data
  120. assert callback_data["timelapse_was_active"] is True
  121. def test_print_complete_timelapse_flag_false_when_no_timelapse(self, mqtt_client):
  122. """Verify timelapse_was_active is False when no timelapse during print."""
  123. callback_data = {}
  124. def on_complete(data):
  125. callback_data.update(data)
  126. mqtt_client.on_print_complete = on_complete
  127. # Print without timelapse
  128. mqtt_client._was_running = True
  129. mqtt_client._completion_triggered = False
  130. mqtt_client._timelapse_during_print = False # No timelapse
  131. mqtt_client._previous_gcode_state = "RUNNING"
  132. mqtt_client._previous_gcode_file = "test.gcode"
  133. mqtt_client.state.subtask_name = "Test Print"
  134. mqtt_client.state.state = "FINISH"
  135. # Trigger completion
  136. timelapse_was_active = mqtt_client._timelapse_during_print
  137. mqtt_client.on_print_complete(
  138. {
  139. "status": "completed",
  140. "filename": mqtt_client._previous_gcode_file,
  141. "subtask_name": mqtt_client.state.subtask_name,
  142. "timelapse_was_active": timelapse_was_active,
  143. }
  144. )
  145. assert callback_data["timelapse_was_active"] is False
  146. def test_timelapse_flag_reset_after_completion(self, mqtt_client):
  147. """Verify _timelapse_during_print is reset after print completion."""
  148. mqtt_client._timelapse_during_print = True
  149. mqtt_client._was_running = True
  150. mqtt_client._completion_triggered = False
  151. # Simulate completion reset
  152. mqtt_client._completion_triggered = True
  153. mqtt_client._was_running = False
  154. mqtt_client._timelapse_during_print = False
  155. assert mqtt_client._timelapse_during_print is False
  156. class TestRealisticMessageFlow:
  157. """Tests that simulate realistic MQTT message sequences.
  158. These tests process messages through _process_message to test the full flow,
  159. including the order of xcam parsing vs state detection.
  160. """
  161. @pytest.fixture
  162. def mqtt_client(self):
  163. """Create a BambuMQTTClient instance for testing."""
  164. from backend.app.services.bambu_mqtt import BambuMQTTClient
  165. client = BambuMQTTClient(
  166. ip_address="192.168.1.100",
  167. serial_number="TEST123",
  168. access_code="12345678",
  169. )
  170. return client
  171. def test_timelapse_detected_at_print_start_in_same_message(self, mqtt_client):
  172. """Test that timelapse is detected when xcam and state come in same message.
  173. This is the critical race condition test - xcam data is parsed BEFORE
  174. state detection, so the timelapse flag must be set AFTER _was_running is True.
  175. """
  176. # Callbacks to track events
  177. start_callback_data = {}
  178. def on_start(data):
  179. start_callback_data.update(data)
  180. mqtt_client.on_print_start = on_start
  181. # Initial state - idle
  182. mqtt_client._was_running = False
  183. mqtt_client._timelapse_during_print = False
  184. mqtt_client._previous_gcode_state = None
  185. # Simulate first message when print starts - contains both xcam and gcode_state
  186. # This is the realistic scenario from the printer
  187. # NOTE: Real MQTT messages wrap print data inside a "print" key
  188. payload = {
  189. "print": {
  190. "gcode_state": "RUNNING",
  191. "gcode_file": "/data/Metadata/test_print.gcode",
  192. "subtask_name": "Test_Print",
  193. "xcam": {
  194. "timelapse": "enable", # Timelapse is enabled in this print
  195. "printing_monitor": True,
  196. },
  197. "mc_percent": 0,
  198. "mc_remaining_time": 3600,
  199. }
  200. }
  201. # Process the message (this is what happens in real MQTT flow)
  202. mqtt_client._process_message(payload)
  203. # Verify timelapse was detected even though xcam is parsed before state
  204. assert mqtt_client._was_running is True, "_was_running should be True after RUNNING state"
  205. assert mqtt_client.state.timelapse is True, "state.timelapse should be True"
  206. assert mqtt_client._timelapse_during_print is True, (
  207. "timelapse_during_print should be True when timelapse is in the same message as RUNNING state"
  208. )
  209. def test_timelapse_not_detected_when_disabled(self, mqtt_client):
  210. """Test that timelapse is NOT detected when disabled in xcam data."""
  211. mqtt_client.on_print_start = lambda data: None
  212. # Initial state - idle
  213. mqtt_client._was_running = False
  214. mqtt_client._timelapse_during_print = False
  215. mqtt_client._previous_gcode_state = None
  216. # Print starts without timelapse
  217. payload = {
  218. "print": {
  219. "gcode_state": "RUNNING",
  220. "gcode_file": "/data/Metadata/test_print.gcode",
  221. "subtask_name": "Test_Print",
  222. "xcam": {
  223. "timelapse": "disable", # Timelapse is disabled
  224. "printing_monitor": True,
  225. },
  226. }
  227. }
  228. mqtt_client._process_message(payload)
  229. assert mqtt_client._was_running is True
  230. assert mqtt_client.state.timelapse is False
  231. assert mqtt_client._timelapse_during_print is False
  232. def test_timelapse_detected_when_enabled_after_print_start(self, mqtt_client):
  233. """Test timelapse detected when enabled in a message after print starts."""
  234. mqtt_client.on_print_start = lambda data: None
  235. # First message - print starts without timelapse info
  236. payload_start = {
  237. "print": {
  238. "gcode_state": "RUNNING",
  239. "gcode_file": "/data/Metadata/test_print.gcode",
  240. "subtask_name": "Test_Print",
  241. }
  242. }
  243. mqtt_client._process_message(payload_start)
  244. assert mqtt_client._was_running is True
  245. assert mqtt_client._timelapse_during_print is False # Not detected yet
  246. # Second message - xcam data arrives with timelapse enabled
  247. payload_xcam = {
  248. "print": {
  249. "gcode_state": "RUNNING",
  250. "gcode_file": "/data/Metadata/test_print.gcode",
  251. "subtask_name": "Test_Print",
  252. "xcam": {
  253. "timelapse": "enable",
  254. },
  255. }
  256. }
  257. mqtt_client._process_message(payload_xcam)
  258. # Now timelapse should be detected because _was_running is already True
  259. assert mqtt_client._timelapse_during_print is True
  260. def test_print_complete_includes_timelapse_flag_full_flow(self, mqtt_client):
  261. """Test full print lifecycle with timelapse - from start to completion."""
  262. start_data = {}
  263. complete_data = {}
  264. def on_start(data):
  265. start_data.update(data)
  266. def on_complete(data):
  267. complete_data.update(data)
  268. mqtt_client.on_print_start = on_start
  269. mqtt_client.on_print_complete = on_complete
  270. # 1. Print starts with timelapse
  271. mqtt_client._process_message(
  272. {
  273. "print": {
  274. "gcode_state": "RUNNING",
  275. "gcode_file": "/data/Metadata/test.gcode",
  276. "subtask_name": "Test",
  277. "xcam": {"timelapse": "enable"},
  278. }
  279. }
  280. )
  281. assert mqtt_client._timelapse_during_print is True
  282. assert "subtask_name" in start_data
  283. # 2. Print continues (multiple messages)
  284. for _ in range(3):
  285. mqtt_client._process_message(
  286. {
  287. "print": {
  288. "gcode_state": "RUNNING",
  289. "gcode_file": "/data/Metadata/test.gcode",
  290. "subtask_name": "Test",
  291. "mc_percent": 50,
  292. }
  293. }
  294. )
  295. # Timelapse flag should still be True
  296. assert mqtt_client._timelapse_during_print is True
  297. # 3. Print completes
  298. mqtt_client._process_message(
  299. {
  300. "print": {
  301. "gcode_state": "FINISH",
  302. "gcode_file": "/data/Metadata/test.gcode",
  303. "subtask_name": "Test",
  304. }
  305. }
  306. )
  307. # Verify completion callback received timelapse flag
  308. assert "timelapse_was_active" in complete_data
  309. assert complete_data["timelapse_was_active"] is True
  310. assert complete_data["status"] == "completed"
  311. # Flags should be reset after completion
  312. assert mqtt_client._timelapse_during_print is False
  313. assert mqtt_client._was_running is False
  314. def test_print_failed_includes_timelapse_flag(self, mqtt_client):
  315. """Test that failed print also includes timelapse flag."""
  316. complete_data = {}
  317. def on_complete(data):
  318. complete_data.update(data)
  319. mqtt_client.on_print_start = lambda data: None
  320. mqtt_client.on_print_complete = on_complete
  321. # Start with timelapse
  322. mqtt_client._process_message(
  323. {
  324. "print": {
  325. "gcode_state": "RUNNING",
  326. "gcode_file": "/data/Metadata/test.gcode",
  327. "subtask_name": "Test",
  328. "xcam": {"timelapse": "enable"},
  329. }
  330. }
  331. )
  332. # Print fails
  333. mqtt_client._process_message(
  334. {
  335. "print": {
  336. "gcode_state": "FAILED",
  337. "gcode_file": "/data/Metadata/test.gcode",
  338. "subtask_name": "Test",
  339. }
  340. }
  341. )
  342. assert complete_data["timelapse_was_active"] is True
  343. assert complete_data["status"] == "failed"
  344. class TestAMSDataMerging:
  345. """Tests for AMS data merging, particularly handling empty slots."""
  346. @pytest.fixture
  347. def mqtt_client(self):
  348. """Create a BambuMQTTClient instance for testing."""
  349. from backend.app.services.bambu_mqtt import BambuMQTTClient
  350. client = BambuMQTTClient(
  351. ip_address="192.168.1.100",
  352. serial_number="TEST123",
  353. access_code="12345678",
  354. )
  355. return client
  356. def test_empty_slot_clears_tray_type(self, mqtt_client):
  357. """Test that empty slot update clears tray_type (Issue #147).
  358. When a spool is removed from an old AMS, the printer sends empty values.
  359. These must overwrite the previous values to show the slot as empty.
  360. """
  361. # Initial state: AMS unit with a loaded spool
  362. initial_ams = {
  363. "ams": [
  364. {
  365. "id": 0,
  366. "tray": [
  367. {
  368. "id": 0,
  369. "tray_type": "PLA",
  370. "tray_sub_brands": "Bambu PLA Basic",
  371. "tray_color": "FF0000",
  372. "tag_uid": "1234567890ABCDEF",
  373. "remain": 80,
  374. }
  375. ],
  376. }
  377. ]
  378. }
  379. mqtt_client._handle_ams_data(initial_ams)
  380. # Verify initial state
  381. ams_data = mqtt_client.state.raw_data.get("ams", [])
  382. assert len(ams_data) == 1
  383. tray = ams_data[0]["tray"][0]
  384. assert tray["tray_type"] == "PLA"
  385. assert tray["tray_color"] == "FF0000"
  386. # Now simulate spool removal - printer sends empty values
  387. empty_update = {
  388. "ams": [
  389. {
  390. "id": 0,
  391. "tray": [
  392. {
  393. "id": 0,
  394. "tray_type": "", # Empty = slot is empty
  395. "tray_sub_brands": "",
  396. "tray_color": "",
  397. "tag_uid": "0000000000000000", # Zero UID
  398. "remain": 0,
  399. }
  400. ],
  401. }
  402. ]
  403. }
  404. mqtt_client._handle_ams_data(empty_update)
  405. # Verify empty values were applied (not ignored by merge logic)
  406. ams_data = mqtt_client.state.raw_data.get("ams", [])
  407. tray = ams_data[0]["tray"][0]
  408. assert tray["tray_type"] == "", "tray_type should be cleared when slot is empty"
  409. assert tray["tray_color"] == "", "tray_color should be cleared when slot is empty"
  410. assert tray["tray_sub_brands"] == "", "tray_sub_brands should be cleared"
  411. assert tray["tag_uid"] == "0000000000000000", "tag_uid should be cleared"
  412. def test_partial_update_preserves_other_fields(self, mqtt_client):
  413. """Test that partial updates still preserve non-slot-status fields."""
  414. # Initial state with full data
  415. initial_ams = {
  416. "ams": [
  417. {
  418. "id": 0,
  419. "humidity": "3",
  420. "temp": "25.5",
  421. "tray": [
  422. {
  423. "id": 0,
  424. "tray_type": "PLA",
  425. "tray_color": "00FF00",
  426. "remain": 90,
  427. "k": 0.02,
  428. }
  429. ],
  430. }
  431. ]
  432. }
  433. mqtt_client._handle_ams_data(initial_ams)
  434. # Partial update - only remain changes
  435. partial_update = {
  436. "ams": [
  437. {
  438. "id": 0,
  439. "tray": [
  440. {
  441. "id": 0,
  442. "remain": 85, # Only this changed
  443. }
  444. ],
  445. }
  446. ]
  447. }
  448. mqtt_client._handle_ams_data(partial_update)
  449. # Verify remain was updated but other fields preserved
  450. ams_data = mqtt_client.state.raw_data.get("ams", [])
  451. tray = ams_data[0]["tray"][0]
  452. assert tray["remain"] == 85, "remain should be updated"
  453. assert tray["tray_type"] == "PLA", "tray_type should be preserved"
  454. assert tray["tray_color"] == "00FF00", "tray_color should be preserved"
  455. assert tray["k"] == 0.02, "k should be preserved"
  456. def test_tray_exist_bits_clears_empty_slots(self, mqtt_client):
  457. """Test that tray_exist_bits clears slots marked as empty (Issue #147).
  458. New AMS models (AMS 2 Pro) don't send empty tray data when a spool is removed.
  459. Instead, they update tray_exist_bits to indicate which slots have spools.
  460. """
  461. # Initial state: AMS 0 and AMS 1 with loaded spools
  462. initial_ams = {
  463. "ams": [
  464. {
  465. "id": 0,
  466. "tray": [
  467. {"id": 0, "tray_type": "PLA", "tray_color": "FF0000", "remain": 80},
  468. {"id": 1, "tray_type": "PETG", "tray_color": "00FF00", "remain": 60},
  469. {"id": 2, "tray_type": "ABS", "tray_color": "0000FF", "remain": 40},
  470. {"id": 3, "tray_type": "TPU", "tray_color": "FFFF00", "remain": 20},
  471. ],
  472. },
  473. {
  474. "id": 1,
  475. "tray": [
  476. {"id": 0, "tray_type": "PLA", "tray_color": "FFFFFF", "remain": 90},
  477. {"id": 1, "tray_type": "PLA", "tray_color": "000000", "remain": 70},
  478. {"id": 2, "tray_type": "PLA", "tray_color": "FF00FF", "remain": 50},
  479. {"id": 3, "tray_type": "PLA", "tray_color": "00FFFF", "remain": 30},
  480. ],
  481. },
  482. ],
  483. "tray_exist_bits": "ff", # All 8 slots have spools (0xFF = 11111111)
  484. }
  485. mqtt_client._handle_ams_data(initial_ams)
  486. # Verify initial state
  487. ams_data = mqtt_client.state.raw_data.get("ams", [])
  488. assert ams_data[1]["tray"][3]["tray_type"] == "PLA" # AMS 1 slot 3 (B4) has spool
  489. # Now simulate spool removal from AMS 1 slot 3 (B4)
  490. # tray_exist_bits: 0x7f = 01111111 (bit 7 = 0 means AMS 1 slot 3 is empty)
  491. update_ams = {
  492. "ams": [
  493. {"id": 0, "tray": [{"id": 0}, {"id": 1}, {"id": 2}, {"id": 3}]},
  494. {"id": 1, "tray": [{"id": 0}, {"id": 1}, {"id": 2}, {"id": 3}]},
  495. ],
  496. "tray_exist_bits": "7f", # Bit 7 = 0 -> AMS 1 slot 3 is empty
  497. }
  498. mqtt_client._handle_ams_data(update_ams)
  499. # Verify AMS 1 slot 3 was cleared
  500. ams_data = mqtt_client.state.raw_data.get("ams", [])
  501. b4_tray = ams_data[1]["tray"][3]
  502. assert b4_tray["tray_type"] == "", "tray_type should be cleared for empty slot"
  503. assert b4_tray["remain"] == 0, "remain should be 0 for empty slot"
  504. # Verify other slots are preserved
  505. assert ams_data[0]["tray"][0]["tray_type"] == "PLA", "A1 should still have PLA"
  506. assert ams_data[1]["tray"][0]["tray_type"] == "PLA", "B1 should still have PLA"
  507. class TestNozzleRackData:
  508. """Tests for nozzle rack data parsing from H2 series device.nozzle.info."""
  509. @pytest.fixture
  510. def mqtt_client(self):
  511. """Create a BambuMQTTClient instance for testing."""
  512. from backend.app.services.bambu_mqtt import BambuMQTTClient
  513. client = BambuMQTTClient(
  514. ip_address="192.168.1.100",
  515. serial_number="TEST123",
  516. access_code="12345678",
  517. )
  518. return client
  519. def test_h2c_nozzle_rack_populated_with_8_entries(self, mqtt_client):
  520. """H2C provides 8 nozzle entries: IDs 0,1 (L/R hotend) + 16-21 (rack)."""
  521. payload = {
  522. "print": {
  523. "device": {
  524. "nozzle": {
  525. "info": [
  526. {
  527. "id": 0,
  528. "type": "HS",
  529. "diameter": "0.4",
  530. "wear": 5,
  531. "stat": 1,
  532. "max_temp": 300,
  533. "serial_number": "SN-L",
  534. },
  535. {
  536. "id": 1,
  537. "type": "HS",
  538. "diameter": "0.4",
  539. "wear": 3,
  540. "stat": 0,
  541. "max_temp": 300,
  542. "serial_number": "SN-R",
  543. },
  544. {
  545. "id": 16,
  546. "type": "HS",
  547. "diameter": "0.4",
  548. "wear": 10,
  549. "stat": 0,
  550. "max_temp": 300,
  551. "serial_number": "SN-16",
  552. },
  553. {
  554. "id": 17,
  555. "type": "HH01",
  556. "diameter": "0.6",
  557. "wear": 0,
  558. "stat": 0,
  559. "max_temp": 300,
  560. "serial_number": "SN-17",
  561. },
  562. {
  563. "id": 18,
  564. "type": "HS",
  565. "diameter": "0.4",
  566. "wear": 2,
  567. "stat": 0,
  568. "max_temp": 300,
  569. "serial_number": "SN-18",
  570. },
  571. {
  572. "id": 19,
  573. "type": "",
  574. "diameter": "",
  575. "wear": None,
  576. "stat": None,
  577. "max_temp": 0,
  578. "serial_number": "",
  579. },
  580. {
  581. "id": 20,
  582. "type": "",
  583. "diameter": "",
  584. "wear": None,
  585. "stat": None,
  586. "max_temp": 0,
  587. "serial_number": "",
  588. },
  589. {
  590. "id": 21,
  591. "type": "",
  592. "diameter": "",
  593. "wear": None,
  594. "stat": None,
  595. "max_temp": 0,
  596. "serial_number": "",
  597. },
  598. ]
  599. }
  600. }
  601. }
  602. }
  603. mqtt_client._process_message(payload)
  604. assert len(mqtt_client.state.nozzle_rack) == 8
  605. ids = [n["id"] for n in mqtt_client.state.nozzle_rack]
  606. assert ids == [0, 1, 16, 17, 18, 19, 20, 21]
  607. def test_h2d_nozzle_rack_populated_with_2_entries(self, mqtt_client):
  608. """H2D provides 2 nozzle entries: IDs 0,1 (L/R hotend) — no rack slots."""
  609. payload = {
  610. "print": {
  611. "device": {
  612. "nozzle": {
  613. "info": [
  614. {
  615. "id": 0,
  616. "type": "HS",
  617. "diameter": "0.4",
  618. "wear": 5,
  619. "stat": 1,
  620. "max_temp": 300,
  621. "serial_number": "SN-L",
  622. },
  623. {
  624. "id": 1,
  625. "type": "HS",
  626. "diameter": "0.4",
  627. "wear": 3,
  628. "stat": 1,
  629. "max_temp": 300,
  630. "serial_number": "SN-R",
  631. },
  632. ]
  633. }
  634. }
  635. }
  636. }
  637. mqtt_client._process_message(payload)
  638. assert len(mqtt_client.state.nozzle_rack) == 2
  639. ids = [n["id"] for n in mqtt_client.state.nozzle_rack]
  640. assert ids == [0, 1]
  641. def test_single_nozzle_h2s_populated(self, mqtt_client):
  642. """H2S provides 1 nozzle entry: ID 0 only — single nozzle printer."""
  643. payload = {
  644. "print": {
  645. "device": {
  646. "nozzle": {
  647. "info": [
  648. {
  649. "id": 0,
  650. "type": "HS",
  651. "diameter": "0.4",
  652. "wear": 2,
  653. "stat": 1,
  654. "max_temp": 300,
  655. "serial_number": "SN-0",
  656. },
  657. ]
  658. }
  659. }
  660. }
  661. }
  662. mqtt_client._process_message(payload)
  663. assert len(mqtt_client.state.nozzle_rack) == 1
  664. assert mqtt_client.state.nozzle_rack[0]["id"] == 0
  665. def test_empty_nozzle_info_does_not_populate_rack(self, mqtt_client):
  666. """Empty nozzle info list should not populate nozzle_rack."""
  667. payload = {"print": {"device": {"nozzle": {"info": []}}}}
  668. mqtt_client._process_message(payload)
  669. assert mqtt_client.state.nozzle_rack == []
  670. def test_nozzle_rack_sorted_by_id(self, mqtt_client):
  671. """Nozzle rack entries should be sorted by ID regardless of input order."""
  672. payload = {
  673. "print": {
  674. "device": {
  675. "nozzle": {
  676. "info": [
  677. {"id": 17, "type": "HS", "diameter": "0.6"},
  678. {"id": 0, "type": "HS", "diameter": "0.4"},
  679. {"id": 16, "type": "HS", "diameter": "0.4"},
  680. {"id": 1, "type": "HS", "diameter": "0.4"},
  681. ]
  682. }
  683. }
  684. }
  685. }
  686. mqtt_client._process_message(payload)
  687. ids = [n["id"] for n in mqtt_client.state.nozzle_rack]
  688. assert ids == [0, 1, 16, 17]
  689. def test_nozzle_rack_field_mapping(self, mqtt_client):
  690. """Verify field mapping from MQTT nozzle_info to nozzle_rack dict keys."""
  691. payload = {
  692. "print": {
  693. "device": {
  694. "nozzle": {
  695. "info": [
  696. {
  697. "id": 16,
  698. "type": "HH01",
  699. "diameter": "0.6",
  700. "wear": 15,
  701. "stat": 0,
  702. "max_temp": 320,
  703. "serial_number": "SN-ABC123",
  704. "filament_colour": "FF8800",
  705. "filament_id": "F42",
  706. "tray_type": "ABS",
  707. }
  708. ]
  709. }
  710. }
  711. }
  712. }
  713. mqtt_client._process_message(payload)
  714. slot = mqtt_client.state.nozzle_rack[0]
  715. assert slot["id"] == 16
  716. assert slot["type"] == "HH01"
  717. assert slot["diameter"] == "0.6"
  718. assert slot["wear"] == 15
  719. assert slot["stat"] == 0
  720. assert slot["max_temp"] == 320
  721. assert slot["serial_number"] == "SN-ABC123"
  722. assert slot["filament_color"] == "FF8800"
  723. assert slot["filament_id"] == "F42"
  724. assert slot["filament_type"] == "ABS"
  725. def test_nozzle_info_updates_nozzle_state(self, mqtt_client):
  726. """Nozzle info for IDs 0,1 should also update nozzle state (type/diameter)."""
  727. payload = {
  728. "print": {
  729. "device": {
  730. "nozzle": {
  731. "info": [
  732. {"id": 0, "type": "HS", "diameter": "0.4"},
  733. {"id": 1, "type": "HH01", "diameter": "0.6"},
  734. ]
  735. }
  736. }
  737. }
  738. }
  739. mqtt_client._process_message(payload)
  740. assert mqtt_client.state.nozzles[0].nozzle_type == "HS"
  741. assert mqtt_client.state.nozzles[0].nozzle_diameter == "0.4"
  742. assert mqtt_client.state.nozzles[1].nozzle_type == "HH01"
  743. assert mqtt_client.state.nozzles[1].nozzle_diameter == "0.6"
  744. class TestRequestTopicFailSafe:
  745. """Tests for graceful degradation when broker rejects request topic subscription."""
  746. @pytest.fixture
  747. def mqtt_client(self):
  748. from backend.app.services.bambu_mqtt import BambuMQTTClient
  749. client = BambuMQTTClient(
  750. ip_address="192.168.1.100",
  751. serial_number="TEST123",
  752. access_code="12345678",
  753. )
  754. return client
  755. def test_request_topic_supported_by_default(self, mqtt_client):
  756. """Request topic subscription is attempted by default."""
  757. assert mqtt_client._request_topic_supported is True
  758. assert mqtt_client._request_topic_confirmed is False
  759. def test_on_subscribe_confirms_success(self, mqtt_client):
  760. """Successful SUBACK marks request topic as confirmed."""
  761. from paho.mqtt.reasoncodes import ReasonCode
  762. mqtt_client._request_topic_sub_mid = 42
  763. rc = ReasonCode(9, identifier=0) # SUBACK packetType=9, QoS 0 = success
  764. mqtt_client._on_subscribe(None, None, 42, [rc], None)
  765. assert mqtt_client._request_topic_confirmed is True
  766. assert mqtt_client._request_topic_supported is True
  767. assert mqtt_client._request_topic_sub_mid is None
  768. assert mqtt_client._request_topic_sub_time == 0.0
  769. def test_on_subscribe_detects_rejection(self, mqtt_client):
  770. """SUBACK with failure code disables request topic."""
  771. from paho.mqtt.reasoncodes import ReasonCode
  772. mqtt_client._request_topic_sub_mid = 42
  773. rc = ReasonCode(9, identifier=0x80) # SUBACK packetType=9, 0x80 = failure
  774. mqtt_client._on_subscribe(None, None, 42, [rc], None)
  775. assert mqtt_client._request_topic_supported is False
  776. assert mqtt_client._request_topic_confirmed is False
  777. def test_on_subscribe_ignores_other_mids(self, mqtt_client):
  778. """SUBACK for other subscriptions (e.g. report topic) is ignored."""
  779. from paho.mqtt.reasoncodes import ReasonCode
  780. mqtt_client._request_topic_sub_mid = 42
  781. rc = ReasonCode(9, identifier=0x80)
  782. mqtt_client._on_subscribe(None, None, 99, [rc], None)
  783. # Not affected — mid doesn't match
  784. assert mqtt_client._request_topic_supported is True
  785. def test_disconnect_after_subscription_disables_topic(self, mqtt_client):
  786. """Disconnect within 10s of subscription attempt disables request topic."""
  787. import time
  788. mqtt_client._request_topic_sub_time = time.time()
  789. mqtt_client._request_topic_confirmed = False
  790. mqtt_client._last_message_time = 0.0
  791. mqtt_client._on_disconnect(None, None)
  792. assert mqtt_client._request_topic_supported is False
  793. assert mqtt_client._request_topic_sub_time == 0.0
  794. def test_disconnect_after_confirmation_does_not_disable(self, mqtt_client):
  795. """Disconnect after SUBACK confirmation keeps request topic enabled."""
  796. import time
  797. mqtt_client._request_topic_sub_time = time.time()
  798. mqtt_client._request_topic_confirmed = True
  799. mqtt_client._last_message_time = 0.0
  800. mqtt_client._on_disconnect(None, None)
  801. assert mqtt_client._request_topic_supported is True
  802. def test_late_disconnect_does_not_disable(self, mqtt_client):
  803. """Disconnect long after subscription (>10s) doesn't blame request topic."""
  804. import time
  805. mqtt_client._request_topic_sub_time = time.time() - 30.0
  806. mqtt_client._request_topic_confirmed = False
  807. mqtt_client._last_message_time = 0.0
  808. mqtt_client._on_disconnect(None, None)
  809. assert mqtt_client._request_topic_supported is True
  810. def test_on_connect_skips_request_topic_when_unsupported(self, mqtt_client):
  811. """After marking unsupported, reconnect skips request topic subscription."""
  812. mqtt_client._request_topic_supported = False
  813. subscribe_calls = []
  814. mock_client = type(
  815. "MockClient",
  816. (),
  817. {
  818. "subscribe": lambda self, topic: subscribe_calls.append(topic) or (0, 1),
  819. },
  820. )()
  821. mqtt_client._on_connect(mock_client, None, None, 0)
  822. # Only report topic subscribed, not request topic
  823. assert len(subscribe_calls) == 1
  824. assert subscribe_calls[0] == mqtt_client.topic_subscribe
  825. class TestRequestTopicAmsMapping:
  826. """Tests for capturing ams_mapping from the MQTT request topic."""
  827. @pytest.fixture
  828. def mqtt_client(self):
  829. """Create a BambuMQTTClient instance for testing."""
  830. from backend.app.services.bambu_mqtt import BambuMQTTClient
  831. client = BambuMQTTClient(
  832. ip_address="192.168.1.100",
  833. serial_number="TEST123",
  834. access_code="12345678",
  835. )
  836. return client
  837. def test_captured_ams_mapping_initializes_to_none(self, mqtt_client):
  838. """Verify _captured_ams_mapping starts as None."""
  839. assert mqtt_client._captured_ams_mapping is None
  840. def test_handle_request_message_captures_ams_mapping(self, mqtt_client):
  841. """project_file command with ams_mapping stores the mapping."""
  842. data = {
  843. "print": {
  844. "command": "project_file",
  845. "ams_mapping": [0, 4, -1, -1],
  846. "url": "ftp://192.168.1.100/test.3mf",
  847. }
  848. }
  849. mqtt_client._handle_request_message(data)
  850. assert mqtt_client._captured_ams_mapping == [0, 4, -1, -1]
  851. def test_handle_request_message_ignores_non_print_commands(self, mqtt_client):
  852. """Non-project_file commands don't store ams_mapping."""
  853. data = {
  854. "print": {
  855. "command": "pause",
  856. }
  857. }
  858. mqtt_client._handle_request_message(data)
  859. assert mqtt_client._captured_ams_mapping is None
  860. def test_handle_request_message_ignores_missing_ams_mapping(self, mqtt_client):
  861. """project_file command without ams_mapping doesn't store anything."""
  862. data = {
  863. "print": {
  864. "command": "project_file",
  865. "url": "ftp://192.168.1.100/test.3mf",
  866. }
  867. }
  868. mqtt_client._handle_request_message(data)
  869. assert mqtt_client._captured_ams_mapping is None
  870. def test_handle_request_message_ignores_non_dict_print(self, mqtt_client):
  871. """Non-dict print value is safely ignored."""
  872. data = {"print": "not_a_dict"}
  873. mqtt_client._handle_request_message(data)
  874. assert mqtt_client._captured_ams_mapping is None
  875. def test_handle_request_message_ignores_missing_print(self, mqtt_client):
  876. """Message without print key is safely ignored."""
  877. data = {"pushing": {"command": "pushall"}}
  878. mqtt_client._handle_request_message(data)
  879. assert mqtt_client._captured_ams_mapping is None
  880. def test_captured_mapping_overwrites_previous(self, mqtt_client):
  881. """A new print command overwrites a previously captured mapping."""
  882. mqtt_client._captured_ams_mapping = [0, -1, -1, -1]
  883. data = {
  884. "print": {
  885. "command": "project_file",
  886. "ams_mapping": [4, 8, -1, -1],
  887. }
  888. }
  889. mqtt_client._handle_request_message(data)
  890. assert mqtt_client._captured_ams_mapping == [4, 8, -1, -1]
  891. def test_print_start_callback_includes_ams_mapping(self, mqtt_client):
  892. """on_print_start callback data includes captured ams_mapping."""
  893. start_data = {}
  894. def on_start(data):
  895. start_data.update(data)
  896. mqtt_client.on_print_start = on_start
  897. mqtt_client._captured_ams_mapping = [0, 4, -1, -1]
  898. # Trigger print start
  899. mqtt_client._process_message(
  900. {
  901. "print": {
  902. "gcode_state": "RUNNING",
  903. "gcode_file": "/data/Metadata/test.gcode",
  904. "subtask_name": "Test",
  905. }
  906. }
  907. )
  908. assert start_data.get("ams_mapping") == [0, 4, -1, -1]
  909. def test_print_start_callback_ams_mapping_none_when_not_captured(self, mqtt_client):
  910. """on_print_start callback has ams_mapping=None when no mapping captured."""
  911. start_data = {}
  912. def on_start(data):
  913. start_data.update(data)
  914. mqtt_client.on_print_start = on_start
  915. mqtt_client._process_message(
  916. {
  917. "print": {
  918. "gcode_state": "RUNNING",
  919. "gcode_file": "/data/Metadata/test.gcode",
  920. "subtask_name": "Test",
  921. }
  922. }
  923. )
  924. assert "ams_mapping" in start_data
  925. assert start_data["ams_mapping"] is None
  926. def test_print_complete_callback_includes_ams_mapping(self, mqtt_client):
  927. """on_print_complete callback data includes captured ams_mapping."""
  928. complete_data = {}
  929. def on_complete(data):
  930. complete_data.update(data)
  931. mqtt_client.on_print_start = lambda d: None
  932. mqtt_client.on_print_complete = on_complete
  933. mqtt_client._captured_ams_mapping = [0, 9, -1, -1]
  934. # Start print
  935. mqtt_client._process_message(
  936. {
  937. "print": {
  938. "gcode_state": "RUNNING",
  939. "gcode_file": "/data/Metadata/test.gcode",
  940. "subtask_name": "Test",
  941. }
  942. }
  943. )
  944. # Complete print
  945. mqtt_client._process_message(
  946. {
  947. "print": {
  948. "gcode_state": "FINISH",
  949. "gcode_file": "/data/Metadata/test.gcode",
  950. "subtask_name": "Test",
  951. }
  952. }
  953. )
  954. assert complete_data.get("ams_mapping") == [0, 9, -1, -1]
  955. def test_captured_mapping_cleared_after_print_complete(self, mqtt_client):
  956. """_captured_ams_mapping is reset to None after print completion."""
  957. mqtt_client.on_print_start = lambda d: None
  958. mqtt_client.on_print_complete = lambda d: None
  959. mqtt_client._captured_ams_mapping = [0, 4, -1, -1]
  960. # Start print
  961. mqtt_client._process_message(
  962. {
  963. "print": {
  964. "gcode_state": "RUNNING",
  965. "gcode_file": "/data/Metadata/test.gcode",
  966. "subtask_name": "Test",
  967. }
  968. }
  969. )
  970. # Complete print
  971. mqtt_client._process_message(
  972. {
  973. "print": {
  974. "gcode_state": "FINISH",
  975. "gcode_file": "/data/Metadata/test.gcode",
  976. "subtask_name": "Test",
  977. }
  978. }
  979. )
  980. assert mqtt_client._captured_ams_mapping is None
  981. def test_full_flow_capture_and_deliver(self, mqtt_client):
  982. """Full flow: slicer sends print command → MQTT captures mapping → completion delivers it."""
  983. complete_data = {}
  984. def on_complete(data):
  985. complete_data.update(data)
  986. mqtt_client.on_print_start = lambda d: None
  987. mqtt_client.on_print_complete = on_complete
  988. # 1. Slicer sends print command (captured from request topic)
  989. mqtt_client._handle_request_message(
  990. {
  991. "print": {
  992. "command": "project_file",
  993. "ams_mapping": [4, 9, -1, -1],
  994. "url": "ftp://192.168.1.100/model.3mf",
  995. }
  996. }
  997. )
  998. assert mqtt_client._captured_ams_mapping == [4, 9, -1, -1]
  999. # 2. Printer reports RUNNING
  1000. mqtt_client._process_message(
  1001. {
  1002. "print": {
  1003. "gcode_state": "RUNNING",
  1004. "gcode_file": "/data/Metadata/model.gcode",
  1005. "subtask_name": "Model",
  1006. }
  1007. }
  1008. )
  1009. # 3. Printer reports FINISH
  1010. mqtt_client._process_message(
  1011. {
  1012. "print": {
  1013. "gcode_state": "FINISH",
  1014. "gcode_file": "/data/Metadata/model.gcode",
  1015. "subtask_name": "Model",
  1016. }
  1017. }
  1018. )
  1019. assert complete_data["ams_mapping"] == [4, 9, -1, -1]
  1020. assert complete_data["status"] == "completed"
  1021. # Mapping cleared after completion
  1022. assert mqtt_client._captured_ams_mapping is None
  1023. # ---------------------------------------------------------------------------
  1024. # tray_now disambiguation helpers
  1025. # ---------------------------------------------------------------------------
  1026. def _ams_payload(tray_now, ams_units=None, tray_exist_bits=None, ams_exist_bits=None):
  1027. """Build minimal print.ams payload for tray_now disambiguation tests."""
  1028. ams = {"tray_now": str(tray_now)}
  1029. if ams_units is not None:
  1030. ams["ams"] = ams_units
  1031. if tray_exist_bits is not None:
  1032. ams["tray_exist_bits"] = tray_exist_bits
  1033. if ams_exist_bits is not None:
  1034. ams["ams_exist_bits"] = ams_exist_bits
  1035. return {"print": {"ams": ams}}
  1036. def _extruder_info_payload(extruders):
  1037. """Build device.extruder.info payload (dual-nozzle detection + snow).
  1038. Each entry in *extruders* is a dict with at least ``id`` and ``snow``.
  1039. """
  1040. return {
  1041. "print": {
  1042. "device": {
  1043. "extruder": {
  1044. "info": extruders,
  1045. }
  1046. }
  1047. }
  1048. }
  1049. def _extruder_state_payload(state_val):
  1050. """Build device.extruder.state payload (active extruder via bit 8)."""
  1051. return {
  1052. "print": {
  1053. "device": {
  1054. "extruder": {
  1055. "state": state_val,
  1056. }
  1057. }
  1058. }
  1059. }
  1060. # ---------------------------------------------------------------------------
  1061. # 1. Single-nozzle X1E — direct passthrough
  1062. # ---------------------------------------------------------------------------
  1063. class TestTrayNowSingleNozzleX1E:
  1064. """Single-nozzle, 1 AMS — tray_now is a direct passthrough."""
  1065. @pytest.fixture
  1066. def mqtt_client(self):
  1067. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1068. return BambuMQTTClient(
  1069. ip_address="192.168.1.100",
  1070. serial_number="TEST_X1E",
  1071. access_code="12345678",
  1072. )
  1073. def test_tray_now_direct_passthrough_slot_0_to_3(self, mqtt_client):
  1074. """Each tray_now 0-3 maps 1:1 on single-nozzle printers."""
  1075. for slot in range(4):
  1076. mqtt_client._process_message(_ams_payload(slot))
  1077. assert mqtt_client.state.tray_now == slot
  1078. def test_tray_now_255_means_unloaded(self, mqtt_client):
  1079. """tray_now=255 means no filament loaded."""
  1080. mqtt_client._process_message(_ams_payload(255))
  1081. assert mqtt_client.state.tray_now == 255
  1082. def test_single_extruder_does_not_trigger_dual_nozzle(self, mqtt_client):
  1083. """device.extruder.info with 1 entry must NOT set _is_dual_nozzle."""
  1084. mqtt_client._process_message(_extruder_info_payload([{"id": 0, "snow": 0xFF00FF}]))
  1085. assert mqtt_client._is_dual_nozzle is False
  1086. def test_last_loaded_tray_survives_unload(self, mqtt_client):
  1087. """Load tray 2, unload → last_loaded_tray stays 2."""
  1088. mqtt_client._process_message(_ams_payload(2))
  1089. assert mqtt_client.state.last_loaded_tray == 2
  1090. mqtt_client._process_message(_ams_payload(255))
  1091. assert mqtt_client.state.tray_now == 255
  1092. assert mqtt_client.state.last_loaded_tray == 2
  1093. # ---------------------------------------------------------------------------
  1094. # 2. Single-nozzle P2S — multiple AMS, global IDs pass through
  1095. # ---------------------------------------------------------------------------
  1096. class TestTrayNowSingleNozzleP2S:
  1097. """Single-nozzle, 2 AMS — tray_now > 3 passes through as global ID."""
  1098. @pytest.fixture
  1099. def mqtt_client(self):
  1100. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1101. return BambuMQTTClient(
  1102. ip_address="192.168.1.100",
  1103. serial_number="TEST_P2S",
  1104. access_code="12345678",
  1105. )
  1106. def test_tray_now_ams1_global_ids_4_to_7(self, mqtt_client):
  1107. """tray_now 4-7 are global IDs for AMS 1 on single-nozzle printers."""
  1108. for global_id in range(4, 8):
  1109. mqtt_client._process_message(_ams_payload(global_id))
  1110. assert mqtt_client.state.tray_now == global_id
  1111. def test_tray_change_across_ams_units(self, mqtt_client):
  1112. """Switch from AMS 0 slot 1 → AMS 1 slot 2 (global 6)."""
  1113. mqtt_client._process_message(_ams_payload(1))
  1114. assert mqtt_client.state.tray_now == 1
  1115. mqtt_client._process_message(_ams_payload(6))
  1116. assert mqtt_client.state.tray_now == 6
  1117. # ---------------------------------------------------------------------------
  1118. # 2b. Single-nozzle P2S — multi-AMS local slot disambiguation (#420)
  1119. # ---------------------------------------------------------------------------
  1120. class TestTrayNowP2SMultiAmsDisambiguation:
  1121. """P2S firmware sends local slot IDs (0-3) in tray_now even with dual AMS.
  1122. When ams_exist_bits indicates >1 AMS unit and tray_now is 0-3, the backend
  1123. should use the MQTT mapping field (snow-encoded) to resolve the correct
  1124. global tray ID.
  1125. """
  1126. @pytest.fixture
  1127. def mqtt_client(self):
  1128. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1129. client = BambuMQTTClient(
  1130. ip_address="192.168.1.100",
  1131. serial_number="TEST_P2S_DUAL",
  1132. access_code="12345678",
  1133. )
  1134. return client
  1135. def test_resolves_ams1_slot1_from_mapping(self, mqtt_client):
  1136. """tray_now=1 with mapping=[257] → global ID 5 (AMS1-T1).
  1137. 257 snow-decoded: ams_hw_id=1, slot=1 → global 1*4+1=5.
  1138. """
  1139. # Set mapping field in raw_data (as the MQTT handler would)
  1140. mqtt_client.state.raw_data["mapping"] = [257]
  1141. mqtt_client._process_message(
  1142. _ams_payload(1, ams_exist_bits="3") # '3' = 0b11 → AMS 0 and 1
  1143. )
  1144. assert mqtt_client.state.tray_now == 5
  1145. def test_resolves_ams1_slot0_from_mapping(self, mqtt_client):
  1146. """tray_now=0 with mapping=[256] → global ID 4 (AMS1-T0).
  1147. 256 snow-decoded: ams_hw_id=1, slot=0 → global 1*4+0=4.
  1148. """
  1149. mqtt_client.state.raw_data["mapping"] = [256]
  1150. mqtt_client._process_message(_ams_payload(0, ams_exist_bits="3"))
  1151. assert mqtt_client.state.tray_now == 4
  1152. def test_resolves_ams1_slot3_from_mapping(self, mqtt_client):
  1153. """tray_now=3 with mapping=[259] → global ID 7 (AMS1-T3).
  1154. 259 snow-decoded: ams_hw_id=1, slot=3 → global 1*4+3=7.
  1155. """
  1156. mqtt_client.state.raw_data["mapping"] = [259]
  1157. mqtt_client._process_message(_ams_payload(3, ams_exist_bits="3"))
  1158. assert mqtt_client.state.tray_now == 7
  1159. def test_ams0_slot_unchanged_when_mapping_confirms_ams0(self, mqtt_client):
  1160. """tray_now=1 with mapping=[1] → stays 1 (AMS0-T1).
  1161. 1 snow-decoded: ams_hw_id=0, slot=1 → global 0*4+1=1.
  1162. """
  1163. mqtt_client.state.raw_data["mapping"] = [1]
  1164. mqtt_client._process_message(_ams_payload(1, ams_exist_bits="3"))
  1165. assert mqtt_client.state.tray_now == 1
  1166. def test_multicolor_resolves_ams1_from_multi_entry_mapping(self, mqtt_client):
  1167. """Multi-color print: mapping=[0, 257] → tray_now=1 resolves to AMS1-T1 (5).
  1168. Entry 0: ams_hw_id=0, slot=0 (local 0) — doesn't match tray_now=1.
  1169. Entry 257: ams_hw_id=1, slot=1 (local 1) — matches tray_now=1 → global 5.
  1170. """
  1171. mqtt_client.state.raw_data["mapping"] = [0, 257]
  1172. mqtt_client._process_message(_ams_payload(1, ams_exist_bits="3"))
  1173. assert mqtt_client.state.tray_now == 5
  1174. def test_multicolor_four_slot_mapping(self, mqtt_client):
  1175. """mapping=[65535, 65535, 65535, 257] → tray_now=1 resolves to global 5.
  1176. Only entry 257 has local slot=1, other entries are unmapped (65535).
  1177. Reproduces exact data from issue #420 support package.
  1178. """
  1179. mqtt_client.state.raw_data["mapping"] = [65535, 65535, 65535, 257]
  1180. mqtt_client._process_message(_ams_payload(1, ams_exist_bits="3"))
  1181. assert mqtt_client.state.tray_now == 5
  1182. def test_ambiguous_mapping_falls_back_to_local_slot(self, mqtt_client):
  1183. """Two AMS units with same local slot in mapping → ambiguous, keep local slot.
  1184. mapping=[1, 257]: both have local slot 1 (AMS0-T1 and AMS1-T1).
  1185. Cannot disambiguate → fall back to tray_now=1.
  1186. """
  1187. mqtt_client.state.raw_data["mapping"] = [1, 257]
  1188. mqtt_client._process_message(_ams_payload(1, ams_exist_bits="3"))
  1189. assert mqtt_client.state.tray_now == 1
  1190. def test_no_mapping_falls_back_to_local_slot(self, mqtt_client):
  1191. """No mapping field available → fall back to raw tray_now."""
  1192. # No mapping in raw_data (e.g. manual filament load, not during print)
  1193. mqtt_client._process_message(_ams_payload(1, ams_exist_bits="3"))
  1194. assert mqtt_client.state.tray_now == 1
  1195. def test_empty_mapping_falls_back_to_local_slot(self, mqtt_client):
  1196. """Empty mapping list → fall back to raw tray_now."""
  1197. mqtt_client.state.raw_data["mapping"] = []
  1198. mqtt_client._process_message(_ams_payload(1, ams_exist_bits="3"))
  1199. assert mqtt_client.state.tray_now == 1
  1200. def test_single_ams_passthrough(self, mqtt_client):
  1201. """Single AMS (ams_exist_bits='1') → tray_now 0-3 is direct global ID."""
  1202. mqtt_client._process_message(_ams_payload(2, ams_exist_bits="1"))
  1203. assert mqtt_client.state.tray_now == 2
  1204. def test_no_ams_exist_bits_passthrough(self, mqtt_client):
  1205. """No ams_exist_bits in payload → fall back to raw tray_now."""
  1206. mqtt_client._process_message(_ams_payload(1))
  1207. assert mqtt_client.state.tray_now == 1
  1208. def test_tray_now_255_unaffected_by_multi_ams(self, mqtt_client):
  1209. """tray_now=255 (unloaded) passes through regardless of AMS count."""
  1210. mqtt_client.state.raw_data["mapping"] = [257]
  1211. mqtt_client._process_message(_ams_payload(255, ams_exist_bits="3"))
  1212. assert mqtt_client.state.tray_now == 255
  1213. def test_tray_now_above_3_unaffected(self, mqtt_client):
  1214. """tray_now > 3 is already a global ID and passes through directly."""
  1215. mqtt_client._process_message(_ams_payload(6, ams_exist_bits="3"))
  1216. assert mqtt_client.state.tray_now == 6
  1217. def test_last_loaded_tray_uses_resolved_global_id(self, mqtt_client):
  1218. """last_loaded_tray should reflect the resolved global ID, not local slot."""
  1219. mqtt_client.state.raw_data["mapping"] = [257]
  1220. mqtt_client.state.state = "RUNNING"
  1221. mqtt_client._process_message(_ams_payload(1, ams_exist_bits="3"))
  1222. assert mqtt_client.state.tray_now == 5
  1223. assert mqtt_client.state.last_loaded_tray == 5
  1224. class TestResolveLocalSlotFromMapping:
  1225. """Unit tests for _resolve_local_slot_from_mapping static method."""
  1226. def test_single_match_ams0(self):
  1227. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1228. assert BambuMQTTClient._resolve_local_slot_from_mapping(1, [1]) == 1
  1229. def test_single_match_ams1(self):
  1230. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1231. # 257 = 1*256 + 1 → AMS1 slot1 → global 5
  1232. assert BambuMQTTClient._resolve_local_slot_from_mapping(1, [257]) == 5
  1233. def test_single_match_ams2(self):
  1234. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1235. # 514 = 2*256 + 2 → AMS2 slot2 → global 10
  1236. assert BambuMQTTClient._resolve_local_slot_from_mapping(2, [514]) == 10
  1237. def test_unmapped_entries_skipped(self):
  1238. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1239. assert BambuMQTTClient._resolve_local_slot_from_mapping(1, [65535, 65535, 65535, 257]) == 5
  1240. def test_no_match_returns_none(self):
  1241. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1242. # mapping has slot 0 only, looking for slot 2
  1243. assert BambuMQTTClient._resolve_local_slot_from_mapping(2, [0]) is None
  1244. def test_ambiguous_returns_none(self):
  1245. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1246. # Both AMS0 slot1 (1) and AMS1 slot1 (257) → ambiguous
  1247. assert BambuMQTTClient._resolve_local_slot_from_mapping(1, [1, 257]) is None
  1248. def test_none_mapping_returns_none(self):
  1249. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1250. assert BambuMQTTClient._resolve_local_slot_from_mapping(1, None) is None
  1251. def test_empty_mapping_returns_none(self):
  1252. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1253. assert BambuMQTTClient._resolve_local_slot_from_mapping(1, []) is None
  1254. def test_ams_ht_slot0_match(self):
  1255. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1256. # AMS-HT id=128: snow = 128*256 + 0 = 32768
  1257. assert BambuMQTTClient._resolve_local_slot_from_mapping(0, [32768]) == 128
  1258. # ---------------------------------------------------------------------------
  1259. # 3. H2D Pro — initial state detection
  1260. # ---------------------------------------------------------------------------
  1261. class TestTrayNowDualNozzleH2DSetup:
  1262. """H2D Pro initial state detection."""
  1263. @pytest.fixture
  1264. def mqtt_client(self):
  1265. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1266. return BambuMQTTClient(
  1267. ip_address="192.168.1.100",
  1268. serial_number="TEST_H2D",
  1269. access_code="12345678",
  1270. )
  1271. def test_dual_nozzle_detected_from_extruder_info(self, mqtt_client):
  1272. """2 entries in device.extruder.info → _is_dual_nozzle=True."""
  1273. mqtt_client._process_message(
  1274. _extruder_info_payload(
  1275. [
  1276. {"id": 0, "snow": 0xFF00FF},
  1277. {"id": 1, "snow": 0xFF00FF},
  1278. ]
  1279. )
  1280. )
  1281. assert mqtt_client._is_dual_nozzle is True
  1282. def test_ams_extruder_map_parsed_from_info_field(self, mqtt_client):
  1283. """AMS 0 info=2003 → right (ext 0), AMS 128 info=2104 → left (ext 1)."""
  1284. ams_units = [
  1285. {"id": 0, "info": 2003, "tray": [{"id": i} for i in range(4)]},
  1286. {"id": 128, "info": 2104, "tray": [{"id": 0}]},
  1287. ]
  1288. payload = {
  1289. "print": {
  1290. "ams": {
  1291. "ams": ams_units,
  1292. "tray_now": "255",
  1293. "tray_exist_bits": "1000f",
  1294. },
  1295. }
  1296. }
  1297. mqtt_client._process_message(payload)
  1298. # info=2003: bit8 = (2003>>8)&1 = 7&1 = 1 → extruder = 1-1 = 0 (right)
  1299. # info=2104: bit8 = (2104>>8)&1 = 8&1 = 0 → extruder = 1-0 = 1 (left)
  1300. assert mqtt_client.state.ams_extruder_map == {"0": 0, "128": 1}
  1301. def test_dual_nozzle_detection_before_ams_in_same_message(self, mqtt_client):
  1302. """Dual-nozzle detection at line 538 happens before _handle_ams_data() at line 549.
  1303. If both arrive in the same message, tray_now disambiguation already uses dual-nozzle logic.
  1304. """
  1305. payload = {
  1306. "print": {
  1307. "device": {
  1308. "extruder": {
  1309. "info": [
  1310. {"id": 0, "snow": 0xFF00FF},
  1311. {"id": 1, "snow": 0xFF00FF},
  1312. ],
  1313. "state": 0x0001,
  1314. }
  1315. },
  1316. "ams": {
  1317. "ams": [
  1318. {"id": 0, "info": 2003, "tray": [{"id": i} for i in range(4)]},
  1319. ],
  1320. "tray_now": "2",
  1321. "tray_exist_bits": "f",
  1322. },
  1323. }
  1324. }
  1325. mqtt_client._process_message(payload)
  1326. # Dual-nozzle was detected; AMS 0 on right extruder (active by default);
  1327. # snow is 0xFF00FF (unloaded), so falls through to ams_extruder_map fallback.
  1328. # Single AMS on extruder 0 → global_id = 0*4+2 = 2
  1329. assert mqtt_client._is_dual_nozzle is True
  1330. assert mqtt_client.state.tray_now == 2
  1331. # ---------------------------------------------------------------------------
  1332. # Shared H2D fixture for classes 4-8
  1333. # ---------------------------------------------------------------------------
  1334. class _H2DFixtureMixin:
  1335. """Mixin providing a pre-configured H2D Pro client."""
  1336. @pytest.fixture
  1337. def mqtt_client(self):
  1338. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1339. return BambuMQTTClient(
  1340. ip_address="192.168.1.100",
  1341. serial_number="TEST_H2D",
  1342. access_code="12345678",
  1343. )
  1344. @pytest.fixture
  1345. def h2d_client(self, mqtt_client):
  1346. """Pre-configure as H2D Pro: dual-nozzle + ams_extruder_map."""
  1347. mqtt_client._process_message(
  1348. {
  1349. "print": {
  1350. "device": {
  1351. "extruder": {
  1352. "info": [
  1353. {"id": 0, "snow": 0xFF00FF},
  1354. {"id": 1, "snow": 0xFF00FF},
  1355. ],
  1356. "state": 0x0001, # right extruder active
  1357. }
  1358. },
  1359. "ams": {
  1360. "ams": [
  1361. {"id": 0, "info": 2003, "tray": [{"id": i} for i in range(4)]},
  1362. {"id": 128, "info": 2104, "tray": [{"id": 0}]},
  1363. ],
  1364. "tray_now": "255",
  1365. "tray_exist_bits": "1000f",
  1366. },
  1367. }
  1368. }
  1369. )
  1370. assert mqtt_client._is_dual_nozzle is True
  1371. assert mqtt_client.state.ams_extruder_map == {"0": 0, "128": 1}
  1372. return mqtt_client
  1373. # ---------------------------------------------------------------------------
  1374. # 4. H2D Snow field disambiguation
  1375. # ---------------------------------------------------------------------------
  1376. class TestTrayNowDualNozzleH2DSnow(_H2DFixtureMixin):
  1377. """Snow field disambiguation (primary path)."""
  1378. def test_snow_disambiguates_ams0_slot(self, h2d_client):
  1379. """snow ext[0]=AMS 0 slot 2, tray_now='2' → global 2."""
  1380. # Send snow update FIRST (snow is parsed AFTER tray_now in the same message,
  1381. # so we need it in a prior message).
  1382. snow_val = 0 << 8 | 2 # AMS 0 slot 2 = raw 2
  1383. h2d_client._process_message(
  1384. _extruder_info_payload(
  1385. [
  1386. {"id": 0, "snow": snow_val},
  1387. {"id": 1, "snow": 0xFF00FF},
  1388. ]
  1389. )
  1390. )
  1391. assert h2d_client.state.h2d_extruder_snow.get(0) == 2
  1392. # Now send tray_now=2
  1393. h2d_client._process_message(_ams_payload(2))
  1394. assert h2d_client.state.tray_now == 2
  1395. def test_snow_disambiguates_ams_ht_to_128(self, h2d_client):
  1396. """snow ext[1]=AMS HT (128), left active, tray_now='0' → global 128."""
  1397. # Snow: extruder 1 → AMS 128 slot 0
  1398. snow_val = 128 << 8 | 0 # = 32768
  1399. h2d_client._process_message(
  1400. _extruder_info_payload(
  1401. [
  1402. {"id": 0, "snow": 0xFF00FF},
  1403. {"id": 1, "snow": snow_val},
  1404. ]
  1405. )
  1406. )
  1407. assert h2d_client.state.h2d_extruder_snow.get(1) == 128
  1408. # Switch to left extruder
  1409. h2d_client._process_message(_extruder_state_payload(0x0100))
  1410. assert h2d_client.state.active_extruder == 1
  1411. # tray_now="0" with left extruder active, snow says AMS HT (128)
  1412. # AMS HT snow_slot = 0 (single slot), parsed_tray_now = 0 → match
  1413. h2d_client._process_message(_ams_payload(0))
  1414. assert h2d_client.state.tray_now == 128
  1415. def test_snow_updates_h2d_extruder_snow_state(self, h2d_client):
  1416. """Verify state.h2d_extruder_snow dict is populated correctly."""
  1417. snow_ext0 = 1 << 8 | 3 # AMS 1 slot 3 → global 7
  1418. snow_ext1 = 0 << 8 | 0 # AMS 0 slot 0 → global 0
  1419. h2d_client._process_message(
  1420. _extruder_info_payload(
  1421. [
  1422. {"id": 0, "snow": snow_ext0},
  1423. {"id": 1, "snow": snow_ext1},
  1424. ]
  1425. )
  1426. )
  1427. assert h2d_client.state.h2d_extruder_snow[0] == 7
  1428. assert h2d_client.state.h2d_extruder_snow[1] == 0
  1429. def test_snow_unloaded_value(self, h2d_client):
  1430. """snow=0xFFFF (ams_id=255, slot=255) → 255 (unloaded)."""
  1431. h2d_client._process_message(
  1432. _extruder_info_payload(
  1433. [
  1434. {"id": 0, "snow": 0xFFFF},
  1435. {"id": 1, "snow": 0xFFFF},
  1436. ]
  1437. )
  1438. )
  1439. assert h2d_client.state.h2d_extruder_snow[0] == 255
  1440. assert h2d_client.state.h2d_extruder_snow[1] == 255
  1441. def test_snow_initial_sentinel_not_stored(self, h2d_client):
  1442. """snow=0xFF00FF (firmware initial sentinel) is not parsed into h2d_extruder_snow."""
  1443. # 0xFF00FF has ams_id=0xFF00=65280 which doesn't match any branch
  1444. h2d_client._process_message(
  1445. _extruder_info_payload(
  1446. [
  1447. {"id": 0, "snow": 0xFF00FF},
  1448. {"id": 1, "snow": 0xFF00FF},
  1449. ]
  1450. )
  1451. )
  1452. # Snow dict should remain empty (no matching branch)
  1453. assert h2d_client.state.h2d_extruder_snow == {}
  1454. # ---------------------------------------------------------------------------
  1455. # 5. H2D Pending target disambiguation
  1456. # ---------------------------------------------------------------------------
  1457. class TestTrayNowDualNozzleH2DPendingTarget(_H2DFixtureMixin):
  1458. """Pending target disambiguation (when Bambuddy initiates load)."""
  1459. def test_pending_target_matches_slot(self, h2d_client):
  1460. """pending=5, tray_now='1' (5%4=1 matches) → tray_now=5."""
  1461. h2d_client.state.pending_tray_target = 5
  1462. h2d_client._process_message(_ams_payload(1))
  1463. assert h2d_client.state.tray_now == 5
  1464. assert h2d_client.state.pending_tray_target is None # cleared
  1465. def test_pending_target_slot_mismatch(self, h2d_client):
  1466. """pending=5, tray_now='2' → uses raw slot, clears pending."""
  1467. h2d_client.state.pending_tray_target = 5
  1468. h2d_client._process_message(_ams_payload(2))
  1469. # Slot 2 != 5%4=1 → mismatch, uses raw slot 2
  1470. assert h2d_client.state.tray_now == 2
  1471. assert h2d_client.state.pending_tray_target is None
  1472. def test_pending_target_takes_priority_over_snow(self, h2d_client):
  1473. """When both pending and snow are set, pending wins."""
  1474. # Set up snow for extruder 0 → AMS 0 slot 1 → global 1
  1475. snow_val = 0 << 8 | 1
  1476. h2d_client._process_message(
  1477. _extruder_info_payload(
  1478. [
  1479. {"id": 0, "snow": snow_val},
  1480. {"id": 1, "snow": 0xFF00FF},
  1481. ]
  1482. )
  1483. )
  1484. assert h2d_client.state.h2d_extruder_snow.get(0) == 1
  1485. # Set pending target to AMS 1 slot 1 (global 5)
  1486. h2d_client.state.pending_tray_target = 5
  1487. # tray_now="1" — matches pending (5%4=1), pending should win over snow
  1488. h2d_client._process_message(_ams_payload(1))
  1489. assert h2d_client.state.tray_now == 5
  1490. # ---------------------------------------------------------------------------
  1491. # 6. H2D ams_extruder_map fallback
  1492. # ---------------------------------------------------------------------------
  1493. class TestTrayNowDualNozzleH2DFallback(_H2DFixtureMixin):
  1494. """ams_extruder_map fallback (no pending, no snow)."""
  1495. def test_single_ams_on_extruder_computes_global_id(self, h2d_client):
  1496. """AMS 0 on right extruder, tray_now='2' → 0*4+2=2."""
  1497. # h2d_client has snow=0xFF00FF (unloaded) by default, so snow path skips
  1498. h2d_client._process_message(_ams_payload(2))
  1499. # AMS 0 is the only AMS on extruder 0 (right, active by default)
  1500. # Fallback: single AMS → global = 0*4+2 = 2
  1501. assert h2d_client.state.tray_now == 2
  1502. def test_multiple_ams_keeps_current_if_valid(self, h2d_client):
  1503. """Current tray matches slot → keeps it (multi-AMS on same extruder)."""
  1504. # Set up: two AMS units on the same extruder (right, ext 0)
  1505. h2d_client.state.ams_extruder_map = {"0": 0, "1": 0}
  1506. # Pre-set tray_now=5 (AMS 1 slot 1) — current_ams=1 which is in ams_on_extruder
  1507. h2d_client.state.tray_now = 5
  1508. # tray_now="1" → 5%4=1 matches → keep current=5
  1509. h2d_client._process_message(_ams_payload(1))
  1510. assert h2d_client.state.tray_now == 5
  1511. def test_no_ams_on_extruder_uses_raw_slot(self, h2d_client):
  1512. """No AMS mapped to the active extruder → raw slot as global ID."""
  1513. # All AMS on left extruder, but right is active
  1514. h2d_client.state.ams_extruder_map = {"0": 1, "128": 1}
  1515. h2d_client._process_message(_ams_payload(2))
  1516. assert h2d_client.state.tray_now == 2
  1517. def test_single_ams_ht_on_extruder_returns_unit_id(self, h2d_client):
  1518. """AMS-HT 128 alone on left extruder, slot 0 → global ID 128 (not 512)."""
  1519. # Switch to left extruder (where AMS-HT 128 is mapped)
  1520. h2d_client._process_message(_extruder_state_payload(0x0100))
  1521. # Only AMS-HT 128 on left extruder; no snow available
  1522. h2d_client._process_message(_ams_payload(0))
  1523. assert h2d_client.state.tray_now == 128
  1524. def test_single_ams_ht_ignores_nonzero_slot(self, h2d_client):
  1525. """AMS-HT has single slot; even if printer reports slot 1, global ID = unit ID."""
  1526. h2d_client.state.ams_extruder_map = {"129": 0}
  1527. h2d_client._process_message(_ams_payload(1))
  1528. # AMS-HT 129: global ID = 129, not 129*4+1=517
  1529. assert h2d_client.state.tray_now == 129
  1530. def test_multiple_ams_keeps_current_ams_ht(self, h2d_client):
  1531. """Current tray is AMS-HT 128, slot 0 reported → keeps 128."""
  1532. h2d_client.state.ams_extruder_map = {"0": 0, "128": 0}
  1533. h2d_client.state.tray_now = 128
  1534. h2d_client._process_message(_ams_payload(0))
  1535. assert h2d_client.state.tray_now == 128
  1536. def test_multiple_ams_slot_nonzero_excludes_ams_ht(self, h2d_client):
  1537. """Slot > 0 eliminates AMS-HT candidates; single regular AMS left → resolves."""
  1538. # AMS 0 + AMS-HT 128 both on right extruder
  1539. h2d_client.state.ams_extruder_map = {"0": 0, "128": 0}
  1540. h2d_client.state.tray_now = 255 # no current match
  1541. # Slot 2 → can't be AMS-HT → only AMS 0 → global = 0*4+2 = 2
  1542. h2d_client._process_message(_ams_payload(2))
  1543. assert h2d_client.state.tray_now == 2
  1544. def test_multiple_ams_slot_nonzero_narrows_to_single_ht_excluded(self, h2d_client):
  1545. """Two regular AMS + one AMS-HT, slot > 0 → AMS-HT excluded but still ambiguous."""
  1546. h2d_client.state.ams_extruder_map = {"0": 0, "1": 0, "128": 0}
  1547. h2d_client.state.tray_now = 255
  1548. # Slot 3 → excludes AMS-HT, but AMS 0 and AMS 1 both remain → ambiguous
  1549. h2d_client._process_message(_ams_payload(3))
  1550. assert h2d_client.state.tray_now == 3 # raw slot fallback
  1551. # ---------------------------------------------------------------------------
  1552. # 6b. H2D last_loaded_tray validation
  1553. # ---------------------------------------------------------------------------
  1554. class TestLastLoadedTrayValidation(_H2DFixtureMixin):
  1555. """last_loaded_tray only stores physically valid tray IDs."""
  1556. def test_regular_ams_tray_stored(self, h2d_client):
  1557. """Valid regular AMS tray (0-15) → stored in last_loaded_tray."""
  1558. h2d_client.state.tray_now = 7
  1559. # Trigger tray_now processing via AMS message
  1560. h2d_client._process_message(
  1561. _extruder_info_payload(
  1562. [
  1563. {"id": 0, "snow": 1 << 8 | 3}, # AMS 1 slot 3 → global 7
  1564. {"id": 1, "snow": 0xFF00FF},
  1565. ]
  1566. )
  1567. )
  1568. h2d_client._process_message(_ams_payload(3))
  1569. assert h2d_client.state.tray_now == 7
  1570. assert h2d_client.state.last_loaded_tray == 7
  1571. def test_ams_ht_tray_stored(self, h2d_client):
  1572. """Valid AMS-HT tray (128-135) → stored in last_loaded_tray."""
  1573. h2d_client._process_message(_extruder_state_payload(0x0100))
  1574. h2d_client._process_message(
  1575. _extruder_info_payload(
  1576. [
  1577. {"id": 0, "snow": 0xFF00FF},
  1578. {"id": 1, "snow": 128 << 8 | 0},
  1579. ]
  1580. )
  1581. )
  1582. h2d_client._process_message(_ams_payload(0))
  1583. assert h2d_client.state.tray_now == 128
  1584. assert h2d_client.state.last_loaded_tray == 128
  1585. def test_unloaded_not_stored(self, h2d_client):
  1586. """tray_now=255 (unloaded) → last_loaded_tray unchanged."""
  1587. h2d_client.state.last_loaded_tray = 5
  1588. h2d_client._process_message(_ams_payload(255))
  1589. assert h2d_client.state.tray_now == 255
  1590. assert h2d_client.state.last_loaded_tray == 5
  1591. # ---------------------------------------------------------------------------
  1592. # 7. H2D Active extruder switching
  1593. # ---------------------------------------------------------------------------
  1594. class TestTrayNowDualNozzleH2DActiveExtruder(_H2DFixtureMixin):
  1595. """Active extruder switching via device.extruder.state bit 8."""
  1596. def test_active_extruder_right_by_default(self, h2d_client):
  1597. """Initial state.active_extruder == 0 (right)."""
  1598. assert h2d_client.state.active_extruder == 0
  1599. def test_extruder_state_bit8_switches_to_left(self, h2d_client):
  1600. """state=0x100 → active_extruder=1 (left)."""
  1601. h2d_client._process_message(_extruder_state_payload(0x0100))
  1602. assert h2d_client.state.active_extruder == 1
  1603. def test_extruder_state_bit8_switches_back_to_right(self, h2d_client):
  1604. """Cycle 0 → 1 → 0."""
  1605. h2d_client._process_message(_extruder_state_payload(0x0100))
  1606. assert h2d_client.state.active_extruder == 1
  1607. h2d_client._process_message(_extruder_state_payload(0x0001))
  1608. assert h2d_client.state.active_extruder == 0
  1609. def test_extruder_switch_changes_tray_disambiguation(self, h2d_client):
  1610. """Snow on both extruders; switching active changes which snow is used."""
  1611. # Snow: ext 0 → AMS 0 slot 1 (global 1), ext 1 → AMS 128 slot 0 (global 128)
  1612. h2d_client._process_message(
  1613. _extruder_info_payload(
  1614. [
  1615. {"id": 0, "snow": 0 << 8 | 1}, # AMS 0 slot 1 → global 1
  1616. {"id": 1, "snow": 128 << 8 | 0}, # AMS HT → global 128
  1617. ]
  1618. )
  1619. )
  1620. # Right active (default) — tray_now="1" → snow ext[0] says global 1
  1621. h2d_client._process_message(_ams_payload(1))
  1622. assert h2d_client.state.tray_now == 1
  1623. # Switch to left
  1624. h2d_client._process_message(_extruder_state_payload(0x0100))
  1625. # Left active — tray_now="0" → snow ext[1] says AMS HT (128), slot 0 matches
  1626. h2d_client._process_message(_ams_payload(0))
  1627. assert h2d_client.state.tray_now == 128
  1628. # ---------------------------------------------------------------------------
  1629. # 8. H2D Full multi-message sequences
  1630. # ---------------------------------------------------------------------------
  1631. class TestTrayNowDualNozzleH2DFullSequence(_H2DFixtureMixin):
  1632. """Multi-message sequences simulating real H2D Pro prints."""
  1633. def test_h2d_right_nozzle_ams0_lifecycle(self, h2d_client):
  1634. """Setup → load AMS 0 slot 1 → verify tray_now=1."""
  1635. # Snow update: extruder 0 loading AMS 0 slot 1
  1636. h2d_client._process_message(
  1637. _extruder_info_payload(
  1638. [
  1639. {"id": 0, "snow": 0 << 8 | 1},
  1640. {"id": 1, "snow": 0xFF00FF},
  1641. ]
  1642. )
  1643. )
  1644. # Printer reports tray_now="1"
  1645. h2d_client._process_message(_ams_payload(1))
  1646. assert h2d_client.state.tray_now == 1
  1647. assert h2d_client.state.last_loaded_tray == 1
  1648. def test_h2d_left_nozzle_ams_ht_lifecycle(self, h2d_client):
  1649. """Setup → switch left → load AMS HT → verify tray_now=128."""
  1650. # Switch to left extruder
  1651. h2d_client._process_message(_extruder_state_payload(0x0100))
  1652. # Snow: ext 1 → AMS HT slot 0
  1653. h2d_client._process_message(
  1654. _extruder_info_payload(
  1655. [
  1656. {"id": 0, "snow": 0xFF00FF},
  1657. {"id": 1, "snow": 128 << 8 | 0},
  1658. ]
  1659. )
  1660. )
  1661. # Printer reports tray_now="0" (AMS HT single slot)
  1662. h2d_client._process_message(_ams_payload(0))
  1663. assert h2d_client.state.tray_now == 128
  1664. assert h2d_client.state.last_loaded_tray == 128
  1665. def test_h2d_multi_color_alternating_nozzles(self, h2d_client):
  1666. """Multi-color print alternating between right and left nozzles.
  1667. Sequence:
  1668. 1. Right loads AMS 0 slot 0 (tray=0)
  1669. 2. Switch left, load AMS HT (tray=128)
  1670. 3. Switch right, snow updates, load AMS 0 slot 2 (tray=2)
  1671. 4. Unload (255)
  1672. """
  1673. # Step 1: Right extruder loads AMS 0 slot 0
  1674. h2d_client._process_message(
  1675. _extruder_info_payload(
  1676. [
  1677. {"id": 0, "snow": 0 << 8 | 0},
  1678. {"id": 1, "snow": 0xFF00FF},
  1679. ]
  1680. )
  1681. )
  1682. h2d_client._process_message(_ams_payload(0))
  1683. assert h2d_client.state.tray_now == 0
  1684. # Step 2: Switch to left, load AMS HT
  1685. h2d_client._process_message(_extruder_state_payload(0x0100))
  1686. h2d_client._process_message(
  1687. _extruder_info_payload(
  1688. [
  1689. {"id": 0, "snow": 0 << 8 | 0},
  1690. {"id": 1, "snow": 128 << 8 | 0},
  1691. ]
  1692. )
  1693. )
  1694. h2d_client._process_message(_ams_payload(0))
  1695. assert h2d_client.state.tray_now == 128
  1696. # Step 3: Switch back to right, load AMS 0 slot 2
  1697. h2d_client._process_message(_extruder_state_payload(0x0001))
  1698. h2d_client._process_message(
  1699. _extruder_info_payload(
  1700. [
  1701. {"id": 0, "snow": 0 << 8 | 2},
  1702. {"id": 1, "snow": 128 << 8 | 0},
  1703. ]
  1704. )
  1705. )
  1706. h2d_client._process_message(_ams_payload(2))
  1707. assert h2d_client.state.tray_now == 2
  1708. # Step 4: Unload
  1709. h2d_client._process_message(_ams_payload(255))
  1710. assert h2d_client.state.tray_now == 255
  1711. assert h2d_client.state.last_loaded_tray == 2
  1712. class TestTrayChangeLog:
  1713. """Tests for tray_change_log tracking during prints (mid-print tray switch)."""
  1714. @pytest.fixture
  1715. def mqtt_client(self):
  1716. """Create a BambuMQTTClient instance for testing."""
  1717. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1718. client = BambuMQTTClient(
  1719. ip_address="192.168.1.100",
  1720. serial_number="TRAYLOG1",
  1721. access_code="12345678",
  1722. )
  1723. return client
  1724. def test_tray_change_log_defaults_empty(self, mqtt_client):
  1725. """tray_change_log starts as an empty list."""
  1726. assert mqtt_client.state.tray_change_log == []
  1727. def test_tray_change_log_seeded_on_print_start(self, mqtt_client):
  1728. """Print start clears log and seeds with initial tray at layer 0."""
  1729. mqtt_client.state.tray_now = 2
  1730. mqtt_client.state.last_loaded_tray = 2
  1731. mqtt_client._previous_gcode_state = "IDLE"
  1732. # Transition to RUNNING via _process_message
  1733. mqtt_client._process_message(
  1734. {
  1735. "print": {
  1736. "gcode_state": "RUNNING",
  1737. "gcode_file": "test.3mf",
  1738. }
  1739. }
  1740. )
  1741. assert mqtt_client.state.tray_change_log == [(2, 0)]
  1742. def test_tray_change_log_cleared_on_new_print(self, mqtt_client):
  1743. """Old log entries are cleared when a new print starts."""
  1744. mqtt_client.state.tray_change_log = [(5, 0), (3, 100)]
  1745. mqtt_client.state.tray_now = 1
  1746. mqtt_client.state.last_loaded_tray = 1
  1747. mqtt_client._previous_gcode_state = "IDLE"
  1748. mqtt_client._process_message(
  1749. {
  1750. "print": {
  1751. "gcode_state": "RUNNING",
  1752. "gcode_file": "new.3mf",
  1753. }
  1754. }
  1755. )
  1756. assert mqtt_client.state.tray_change_log == [(1, 0)]
  1757. def test_tray_change_recorded_during_running(self, mqtt_client):
  1758. """Tray change while RUNNING is appended to the log."""
  1759. mqtt_client.state.state = "RUNNING"
  1760. mqtt_client.state.layer_num = 50
  1761. mqtt_client.state.last_loaded_tray = 0
  1762. mqtt_client.state.tray_change_log = [(0, 0)]
  1763. # Simulate tray_now update via AMS data
  1764. mqtt_client.state.tray_now = 1
  1765. # Trigger the tracking code path
  1766. tn = mqtt_client.state.tray_now
  1767. if tn != mqtt_client.state.last_loaded_tray and mqtt_client.state.state in ("RUNNING", "PAUSE"):
  1768. mqtt_client.state.tray_change_log.append((tn, mqtt_client.state.layer_num))
  1769. mqtt_client.state.last_loaded_tray = tn
  1770. assert mqtt_client.state.tray_change_log == [(0, 0), (1, 50)]
  1771. def test_tray_change_not_recorded_when_idle(self, mqtt_client):
  1772. """Tray changes while IDLE are NOT logged."""
  1773. mqtt_client.state.state = "IDLE"
  1774. mqtt_client.state.layer_num = 0
  1775. mqtt_client.state.last_loaded_tray = 0
  1776. mqtt_client.state.tray_change_log = []
  1777. mqtt_client.state.tray_now = 3
  1778. tn = mqtt_client.state.tray_now
  1779. if tn != mqtt_client.state.last_loaded_tray and mqtt_client.state.state in ("RUNNING", "PAUSE"):
  1780. mqtt_client.state.tray_change_log.append((tn, mqtt_client.state.layer_num))
  1781. mqtt_client.state.last_loaded_tray = tn
  1782. assert mqtt_client.state.tray_change_log == []
  1783. def test_tray_change_recorded_during_pause(self, mqtt_client):
  1784. """Tray change while PAUSE is also logged (AMS can swap during pause)."""
  1785. mqtt_client.state.state = "PAUSE"
  1786. mqtt_client.state.layer_num = 75
  1787. mqtt_client.state.last_loaded_tray = 2
  1788. mqtt_client.state.tray_change_log = [(2, 0)]
  1789. mqtt_client.state.tray_now = 5
  1790. tn = mqtt_client.state.tray_now
  1791. if tn != mqtt_client.state.last_loaded_tray and mqtt_client.state.state in ("RUNNING", "PAUSE"):
  1792. mqtt_client.state.tray_change_log.append((tn, mqtt_client.state.layer_num))
  1793. mqtt_client.state.last_loaded_tray = tn
  1794. assert mqtt_client.state.tray_change_log == [(2, 0), (5, 75)]
  1795. def test_same_tray_not_logged_twice(self, mqtt_client):
  1796. """Same tray value doesn't create duplicate log entries."""
  1797. mqtt_client.state.state = "RUNNING"
  1798. mqtt_client.state.layer_num = 30
  1799. mqtt_client.state.last_loaded_tray = 2
  1800. mqtt_client.state.tray_change_log = [(2, 0)]
  1801. # Same tray again
  1802. mqtt_client.state.tray_now = 2
  1803. tn = mqtt_client.state.tray_now
  1804. if tn != mqtt_client.state.last_loaded_tray and mqtt_client.state.state in ("RUNNING", "PAUSE"):
  1805. mqtt_client.state.tray_change_log.append((tn, mqtt_client.state.layer_num))
  1806. mqtt_client.state.last_loaded_tray = tn
  1807. assert mqtt_client.state.tray_change_log == [(2, 0)]
  1808. def test_multiple_tray_changes(self, mqtt_client):
  1809. """Multiple tray changes create a full history."""
  1810. mqtt_client.state.state = "RUNNING"
  1811. mqtt_client.state.last_loaded_tray = 0
  1812. mqtt_client.state.tray_change_log = [(0, 0)]
  1813. changes = [(1, 50), (3, 120), (0, 200)]
  1814. for tray, layer in changes:
  1815. mqtt_client.state.tray_now = tray
  1816. mqtt_client.state.layer_num = layer
  1817. tn = mqtt_client.state.tray_now
  1818. if tn != mqtt_client.state.last_loaded_tray and mqtt_client.state.state in ("RUNNING", "PAUSE"):
  1819. mqtt_client.state.tray_change_log.append((tn, mqtt_client.state.layer_num))
  1820. mqtt_client.state.last_loaded_tray = tn
  1821. assert mqtt_client.state.tray_change_log == [(0, 0), (1, 50), (3, 120), (0, 200)]
  1822. class TestDeveloperModeDetection:
  1823. """Tests for developer LAN mode detection from MQTT 'fun' field."""
  1824. @pytest.fixture
  1825. def mqtt_client(self):
  1826. """Create a BambuMQTTClient instance for testing."""
  1827. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1828. client = BambuMQTTClient(
  1829. ip_address="192.168.1.100",
  1830. serial_number="TEST123",
  1831. access_code="12345678",
  1832. )
  1833. return client
  1834. def test_developer_mode_initially_none(self, mqtt_client):
  1835. """Verify developer_mode starts as None (unknown)."""
  1836. assert mqtt_client.state.developer_mode is None
  1837. def test_developer_mode_on_when_bit_clear(self, mqtt_client):
  1838. """Verify developer_mode is True when bit 0x20000000 is clear."""
  1839. # Bit 29 clear in lower 32 bits = developer mode ON
  1840. payload = {
  1841. "print": {
  1842. "gcode_state": "IDLE",
  1843. "fun": "1C8187FF9CFF",
  1844. }
  1845. }
  1846. mqtt_client._process_message(payload)
  1847. assert mqtt_client.state.developer_mode is True
  1848. def test_developer_mode_off_when_bit_set(self, mqtt_client):
  1849. """Verify developer_mode is False when bit 0x20000000 is set."""
  1850. # Bit 29 set in lower 32 bits = developer mode OFF (encryption required)
  1851. payload = {
  1852. "print": {
  1853. "gcode_state": "IDLE",
  1854. "fun": "1C81A7FF9CFF",
  1855. }
  1856. }
  1857. mqtt_client._process_message(payload)
  1858. assert mqtt_client.state.developer_mode is False
  1859. def test_developer_mode_exact_bit_check(self, mqtt_client):
  1860. """Verify only bit 0x20000000 matters, not other bits."""
  1861. # 0x20000000 in hex = bit 29. Set ONLY that bit.
  1862. payload = {
  1863. "print": {
  1864. "gcode_state": "IDLE",
  1865. "fun": "000020000000",
  1866. }
  1867. }
  1868. mqtt_client._process_message(payload)
  1869. assert mqtt_client.state.developer_mode is False
  1870. # All zeros = all bits clear = developer mode ON
  1871. payload["print"]["fun"] = "000000000000"
  1872. mqtt_client._process_message(payload)
  1873. assert mqtt_client.state.developer_mode is True
  1874. def test_developer_mode_invalid_fun_ignored(self, mqtt_client):
  1875. """Verify invalid fun values don't crash or change state."""
  1876. mqtt_client.state.developer_mode = True
  1877. payload = {
  1878. "print": {
  1879. "gcode_state": "IDLE",
  1880. "fun": "not_a_hex_value",
  1881. }
  1882. }
  1883. mqtt_client._process_message(payload)
  1884. # Should remain unchanged
  1885. assert mqtt_client.state.developer_mode is True
  1886. def test_developer_mode_missing_fun_preserves_state(self, mqtt_client):
  1887. """Verify messages without fun field don't reset developer_mode."""
  1888. mqtt_client.state.developer_mode = False
  1889. payload = {
  1890. "print": {
  1891. "gcode_state": "RUNNING",
  1892. "mc_percent": 50,
  1893. }
  1894. }
  1895. mqtt_client._process_message(payload)
  1896. assert mqtt_client.state.developer_mode is False
  1897. def test_developer_mode_persists_across_messages(self, mqtt_client):
  1898. """Verify developer_mode set by fun persists across messages without fun."""
  1899. # First message sets developer_mode
  1900. mqtt_client._process_message(
  1901. {
  1902. "print": {
  1903. "gcode_state": "IDLE",
  1904. "fun": "3EC1AFFF9CFF",
  1905. }
  1906. }
  1907. )
  1908. assert mqtt_client.state.developer_mode is False
  1909. # Subsequent messages without fun don't change it
  1910. for _ in range(3):
  1911. mqtt_client._process_message(
  1912. {
  1913. "print": {
  1914. "gcode_state": "RUNNING",
  1915. "mc_percent": 50,
  1916. }
  1917. }
  1918. )
  1919. assert mqtt_client.state.developer_mode is False