test_bambu_mqtt.py 70 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865
  1. """
  2. Tests for the BambuMQTTClient service.
  3. These tests focus on timelapse tracking during prints.
  4. """
  5. import pytest
  6. class TestTimelapseTracking:
  7. """Tests for timelapse state tracking during prints."""
  8. @pytest.fixture
  9. def mqtt_client(self):
  10. """Create a BambuMQTTClient instance for testing."""
  11. from backend.app.services.bambu_mqtt import BambuMQTTClient
  12. client = BambuMQTTClient(
  13. ip_address="192.168.1.100",
  14. serial_number="TEST123",
  15. access_code="12345678",
  16. )
  17. return client
  18. def test_timelapse_flag_initializes_to_false(self, mqtt_client):
  19. """Verify _timelapse_during_print starts as False."""
  20. assert mqtt_client._timelapse_during_print is False
  21. def test_timelapse_flag_set_when_timelapse_active_during_running(self, mqtt_client):
  22. """Verify timelapse flag is set when timelapse is active while printing."""
  23. # Simulate print running
  24. mqtt_client._was_running = True
  25. mqtt_client.state.timelapse = False
  26. # Simulate xcam data showing timelapse is enabled
  27. xcam_data = {"timelapse": "enable"}
  28. mqtt_client._parse_xcam_data(xcam_data)
  29. assert mqtt_client.state.timelapse is True
  30. assert mqtt_client._timelapse_during_print is True
  31. def test_timelapse_flag_not_set_when_not_running(self, mqtt_client):
  32. """Verify timelapse flag is NOT set when printer not running."""
  33. # Printer is idle (not running)
  34. mqtt_client._was_running = False
  35. mqtt_client.state.timelapse = False
  36. # Timelapse is enabled but we're not printing
  37. xcam_data = {"timelapse": "enable"}
  38. mqtt_client._parse_xcam_data(xcam_data)
  39. assert mqtt_client.state.timelapse is True
  40. # Flag should NOT be set since we're not printing
  41. assert mqtt_client._timelapse_during_print is False
  42. def test_timelapse_flag_persists_after_timelapse_stops(self, mqtt_client):
  43. """Verify timelapse flag stays True even after recording stops."""
  44. # Simulate print running with timelapse
  45. mqtt_client._was_running = True
  46. # Enable timelapse during print
  47. xcam_data = {"timelapse": "enable"}
  48. mqtt_client._parse_xcam_data(xcam_data)
  49. assert mqtt_client._timelapse_during_print is True
  50. # Disable timelapse (recording stops at end of print)
  51. xcam_data = {"timelapse": "disable"}
  52. mqtt_client._parse_xcam_data(xcam_data)
  53. # Flag should still be True (persists until reset)
  54. assert mqtt_client.state.timelapse is False
  55. assert mqtt_client._timelapse_during_print is True
  56. def test_timelapse_flag_from_print_data(self, mqtt_client):
  57. """Verify timelapse flag is set from print data (not just xcam)."""
  58. # Simulate print running
  59. mqtt_client._was_running = True
  60. mqtt_client.state.timelapse = False
  61. mqtt_client._timelapse_during_print = False
  62. # Manually test the timelapse parsing logic from _parse_print_data
  63. # This tests the "timelapse" field in the main print data
  64. data = {"timelapse": True}
  65. mqtt_client.state.timelapse = data["timelapse"] is True
  66. if mqtt_client.state.timelapse and mqtt_client._was_running:
  67. mqtt_client._timelapse_during_print = True
  68. assert mqtt_client._timelapse_during_print is True
  69. class TestPrintCompletionWithTimelapse:
  70. """Tests for print completion including timelapse flag."""
  71. @pytest.fixture
  72. def mqtt_client(self):
  73. """Create a BambuMQTTClient instance for testing."""
  74. from backend.app.services.bambu_mqtt import BambuMQTTClient
  75. client = BambuMQTTClient(
  76. ip_address="192.168.1.100",
  77. serial_number="TEST123",
  78. access_code="12345678",
  79. )
  80. return client
  81. def test_print_complete_includes_timelapse_flag(self, mqtt_client):
  82. """Verify print complete callback includes timelapse_was_active."""
  83. # Set up completion callback
  84. callback_data = {}
  85. def on_complete(data):
  86. callback_data.update(data)
  87. mqtt_client.on_print_complete = on_complete
  88. # Simulate a print that had timelapse active
  89. mqtt_client._was_running = True
  90. mqtt_client._completion_triggered = False
  91. mqtt_client._timelapse_during_print = True
  92. mqtt_client._previous_gcode_state = "RUNNING"
  93. mqtt_client._previous_gcode_file = "test.gcode"
  94. mqtt_client.state.subtask_name = "Test Print"
  95. # Simulate print finish
  96. mqtt_client.state.state = "FINISH"
  97. # Manually trigger the completion logic (simplified)
  98. # In real code this happens in _parse_print_data
  99. should_trigger = (
  100. mqtt_client.state.state in ("FINISH", "FAILED")
  101. and not mqtt_client._completion_triggered
  102. and mqtt_client.on_print_complete
  103. and mqtt_client._previous_gcode_state == "RUNNING"
  104. )
  105. if should_trigger:
  106. status = "completed" if mqtt_client.state.state == "FINISH" else "failed"
  107. timelapse_was_active = mqtt_client._timelapse_during_print
  108. mqtt_client._completion_triggered = True
  109. mqtt_client._was_running = False
  110. mqtt_client._timelapse_during_print = False
  111. mqtt_client.on_print_complete(
  112. {
  113. "status": status,
  114. "filename": mqtt_client._previous_gcode_file,
  115. "subtask_name": mqtt_client.state.subtask_name,
  116. "timelapse_was_active": timelapse_was_active,
  117. }
  118. )
  119. assert "timelapse_was_active" in callback_data
  120. assert callback_data["timelapse_was_active"] is True
  121. def test_print_complete_timelapse_flag_false_when_no_timelapse(self, mqtt_client):
  122. """Verify timelapse_was_active is False when no timelapse during print."""
  123. callback_data = {}
  124. def on_complete(data):
  125. callback_data.update(data)
  126. mqtt_client.on_print_complete = on_complete
  127. # Print without timelapse
  128. mqtt_client._was_running = True
  129. mqtt_client._completion_triggered = False
  130. mqtt_client._timelapse_during_print = False # No timelapse
  131. mqtt_client._previous_gcode_state = "RUNNING"
  132. mqtt_client._previous_gcode_file = "test.gcode"
  133. mqtt_client.state.subtask_name = "Test Print"
  134. mqtt_client.state.state = "FINISH"
  135. # Trigger completion
  136. timelapse_was_active = mqtt_client._timelapse_during_print
  137. mqtt_client.on_print_complete(
  138. {
  139. "status": "completed",
  140. "filename": mqtt_client._previous_gcode_file,
  141. "subtask_name": mqtt_client.state.subtask_name,
  142. "timelapse_was_active": timelapse_was_active,
  143. }
  144. )
  145. assert callback_data["timelapse_was_active"] is False
  146. def test_timelapse_flag_reset_after_completion(self, mqtt_client):
  147. """Verify _timelapse_during_print is reset after print completion."""
  148. mqtt_client._timelapse_during_print = True
  149. mqtt_client._was_running = True
  150. mqtt_client._completion_triggered = False
  151. # Simulate completion reset
  152. mqtt_client._completion_triggered = True
  153. mqtt_client._was_running = False
  154. mqtt_client._timelapse_during_print = False
  155. assert mqtt_client._timelapse_during_print is False
  156. class TestRealisticMessageFlow:
  157. """Tests that simulate realistic MQTT message sequences.
  158. These tests process messages through _process_message to test the full flow,
  159. including the order of xcam parsing vs state detection.
  160. """
  161. @pytest.fixture
  162. def mqtt_client(self):
  163. """Create a BambuMQTTClient instance for testing."""
  164. from backend.app.services.bambu_mqtt import BambuMQTTClient
  165. client = BambuMQTTClient(
  166. ip_address="192.168.1.100",
  167. serial_number="TEST123",
  168. access_code="12345678",
  169. )
  170. return client
  171. def test_timelapse_detected_at_print_start_in_same_message(self, mqtt_client):
  172. """Test that timelapse is detected when xcam and state come in same message.
  173. This is the critical race condition test - xcam data is parsed BEFORE
  174. state detection, so the timelapse flag must be set AFTER _was_running is True.
  175. """
  176. # Callbacks to track events
  177. start_callback_data = {}
  178. def on_start(data):
  179. start_callback_data.update(data)
  180. mqtt_client.on_print_start = on_start
  181. # Initial state - idle
  182. mqtt_client._was_running = False
  183. mqtt_client._timelapse_during_print = False
  184. mqtt_client._previous_gcode_state = None
  185. # Simulate first message when print starts - contains both xcam and gcode_state
  186. # This is the realistic scenario from the printer
  187. # NOTE: Real MQTT messages wrap print data inside a "print" key
  188. payload = {
  189. "print": {
  190. "gcode_state": "RUNNING",
  191. "gcode_file": "/data/Metadata/test_print.gcode",
  192. "subtask_name": "Test_Print",
  193. "xcam": {
  194. "timelapse": "enable", # Timelapse is enabled in this print
  195. "printing_monitor": True,
  196. },
  197. "mc_percent": 0,
  198. "mc_remaining_time": 3600,
  199. }
  200. }
  201. # Process the message (this is what happens in real MQTT flow)
  202. mqtt_client._process_message(payload)
  203. # Verify timelapse was detected even though xcam is parsed before state
  204. assert mqtt_client._was_running is True, "_was_running should be True after RUNNING state"
  205. assert mqtt_client.state.timelapse is True, "state.timelapse should be True"
  206. assert mqtt_client._timelapse_during_print is True, (
  207. "timelapse_during_print should be True when timelapse is in the same message as RUNNING state"
  208. )
  209. def test_timelapse_not_detected_when_disabled(self, mqtt_client):
  210. """Test that timelapse is NOT detected when disabled in xcam data."""
  211. mqtt_client.on_print_start = lambda data: None
  212. # Initial state - idle
  213. mqtt_client._was_running = False
  214. mqtt_client._timelapse_during_print = False
  215. mqtt_client._previous_gcode_state = None
  216. # Print starts without timelapse
  217. payload = {
  218. "print": {
  219. "gcode_state": "RUNNING",
  220. "gcode_file": "/data/Metadata/test_print.gcode",
  221. "subtask_name": "Test_Print",
  222. "xcam": {
  223. "timelapse": "disable", # Timelapse is disabled
  224. "printing_monitor": True,
  225. },
  226. }
  227. }
  228. mqtt_client._process_message(payload)
  229. assert mqtt_client._was_running is True
  230. assert mqtt_client.state.timelapse is False
  231. assert mqtt_client._timelapse_during_print is False
  232. def test_timelapse_detected_when_enabled_after_print_start(self, mqtt_client):
  233. """Test timelapse detected when enabled in a message after print starts."""
  234. mqtt_client.on_print_start = lambda data: None
  235. # First message - print starts without timelapse info
  236. payload_start = {
  237. "print": {
  238. "gcode_state": "RUNNING",
  239. "gcode_file": "/data/Metadata/test_print.gcode",
  240. "subtask_name": "Test_Print",
  241. }
  242. }
  243. mqtt_client._process_message(payload_start)
  244. assert mqtt_client._was_running is True
  245. assert mqtt_client._timelapse_during_print is False # Not detected yet
  246. # Second message - xcam data arrives with timelapse enabled
  247. payload_xcam = {
  248. "print": {
  249. "gcode_state": "RUNNING",
  250. "gcode_file": "/data/Metadata/test_print.gcode",
  251. "subtask_name": "Test_Print",
  252. "xcam": {
  253. "timelapse": "enable",
  254. },
  255. }
  256. }
  257. mqtt_client._process_message(payload_xcam)
  258. # Now timelapse should be detected because _was_running is already True
  259. assert mqtt_client._timelapse_during_print is True
  260. def test_print_complete_includes_timelapse_flag_full_flow(self, mqtt_client):
  261. """Test full print lifecycle with timelapse - from start to completion."""
  262. start_data = {}
  263. complete_data = {}
  264. def on_start(data):
  265. start_data.update(data)
  266. def on_complete(data):
  267. complete_data.update(data)
  268. mqtt_client.on_print_start = on_start
  269. mqtt_client.on_print_complete = on_complete
  270. # 1. Print starts with timelapse
  271. mqtt_client._process_message(
  272. {
  273. "print": {
  274. "gcode_state": "RUNNING",
  275. "gcode_file": "/data/Metadata/test.gcode",
  276. "subtask_name": "Test",
  277. "xcam": {"timelapse": "enable"},
  278. }
  279. }
  280. )
  281. assert mqtt_client._timelapse_during_print is True
  282. assert "subtask_name" in start_data
  283. # 2. Print continues (multiple messages)
  284. for _ in range(3):
  285. mqtt_client._process_message(
  286. {
  287. "print": {
  288. "gcode_state": "RUNNING",
  289. "gcode_file": "/data/Metadata/test.gcode",
  290. "subtask_name": "Test",
  291. "mc_percent": 50,
  292. }
  293. }
  294. )
  295. # Timelapse flag should still be True
  296. assert mqtt_client._timelapse_during_print is True
  297. # 3. Print completes
  298. mqtt_client._process_message(
  299. {
  300. "print": {
  301. "gcode_state": "FINISH",
  302. "gcode_file": "/data/Metadata/test.gcode",
  303. "subtask_name": "Test",
  304. }
  305. }
  306. )
  307. # Verify completion callback received timelapse flag
  308. assert "timelapse_was_active" in complete_data
  309. assert complete_data["timelapse_was_active"] is True
  310. assert complete_data["status"] == "completed"
  311. # Flags should be reset after completion
  312. assert mqtt_client._timelapse_during_print is False
  313. assert mqtt_client._was_running is False
  314. def test_print_failed_includes_timelapse_flag(self, mqtt_client):
  315. """Test that failed print also includes timelapse flag."""
  316. complete_data = {}
  317. def on_complete(data):
  318. complete_data.update(data)
  319. mqtt_client.on_print_start = lambda data: None
  320. mqtt_client.on_print_complete = on_complete
  321. # Start with timelapse
  322. mqtt_client._process_message(
  323. {
  324. "print": {
  325. "gcode_state": "RUNNING",
  326. "gcode_file": "/data/Metadata/test.gcode",
  327. "subtask_name": "Test",
  328. "xcam": {"timelapse": "enable"},
  329. }
  330. }
  331. )
  332. # Print fails
  333. mqtt_client._process_message(
  334. {
  335. "print": {
  336. "gcode_state": "FAILED",
  337. "gcode_file": "/data/Metadata/test.gcode",
  338. "subtask_name": "Test",
  339. }
  340. }
  341. )
  342. assert complete_data["timelapse_was_active"] is True
  343. assert complete_data["status"] == "failed"
  344. class TestAMSDataMerging:
  345. """Tests for AMS data merging, particularly handling empty slots."""
  346. @pytest.fixture
  347. def mqtt_client(self):
  348. """Create a BambuMQTTClient instance for testing."""
  349. from backend.app.services.bambu_mqtt import BambuMQTTClient
  350. client = BambuMQTTClient(
  351. ip_address="192.168.1.100",
  352. serial_number="TEST123",
  353. access_code="12345678",
  354. )
  355. return client
  356. def test_empty_slot_clears_tray_type(self, mqtt_client):
  357. """Test that empty slot update clears tray_type (Issue #147).
  358. When a spool is removed from an old AMS, the printer sends empty values.
  359. These must overwrite the previous values to show the slot as empty.
  360. """
  361. # Initial state: AMS unit with a loaded spool
  362. initial_ams = {
  363. "ams": [
  364. {
  365. "id": 0,
  366. "tray": [
  367. {
  368. "id": 0,
  369. "tray_type": "PLA",
  370. "tray_sub_brands": "Bambu PLA Basic",
  371. "tray_color": "FF0000",
  372. "tag_uid": "1234567890ABCDEF",
  373. "remain": 80,
  374. }
  375. ],
  376. }
  377. ]
  378. }
  379. mqtt_client._handle_ams_data(initial_ams)
  380. # Verify initial state
  381. ams_data = mqtt_client.state.raw_data.get("ams", [])
  382. assert len(ams_data) == 1
  383. tray = ams_data[0]["tray"][0]
  384. assert tray["tray_type"] == "PLA"
  385. assert tray["tray_color"] == "FF0000"
  386. # Now simulate spool removal - printer sends empty values
  387. empty_update = {
  388. "ams": [
  389. {
  390. "id": 0,
  391. "tray": [
  392. {
  393. "id": 0,
  394. "tray_type": "", # Empty = slot is empty
  395. "tray_sub_brands": "",
  396. "tray_color": "",
  397. "tag_uid": "0000000000000000", # Zero UID
  398. "remain": 0,
  399. }
  400. ],
  401. }
  402. ]
  403. }
  404. mqtt_client._handle_ams_data(empty_update)
  405. # Verify empty values were applied (not ignored by merge logic)
  406. ams_data = mqtt_client.state.raw_data.get("ams", [])
  407. tray = ams_data[0]["tray"][0]
  408. assert tray["tray_type"] == "", "tray_type should be cleared when slot is empty"
  409. assert tray["tray_color"] == "", "tray_color should be cleared when slot is empty"
  410. assert tray["tray_sub_brands"] == "", "tray_sub_brands should be cleared"
  411. assert tray["tag_uid"] == "0000000000000000", "tag_uid should be cleared"
  412. def test_partial_update_preserves_other_fields(self, mqtt_client):
  413. """Test that partial updates still preserve non-slot-status fields."""
  414. # Initial state with full data
  415. initial_ams = {
  416. "ams": [
  417. {
  418. "id": 0,
  419. "humidity": "3",
  420. "temp": "25.5",
  421. "tray": [
  422. {
  423. "id": 0,
  424. "tray_type": "PLA",
  425. "tray_color": "00FF00",
  426. "remain": 90,
  427. "k": 0.02,
  428. }
  429. ],
  430. }
  431. ]
  432. }
  433. mqtt_client._handle_ams_data(initial_ams)
  434. # Partial update - only remain changes
  435. partial_update = {
  436. "ams": [
  437. {
  438. "id": 0,
  439. "tray": [
  440. {
  441. "id": 0,
  442. "remain": 85, # Only this changed
  443. }
  444. ],
  445. }
  446. ]
  447. }
  448. mqtt_client._handle_ams_data(partial_update)
  449. # Verify remain was updated but other fields preserved
  450. ams_data = mqtt_client.state.raw_data.get("ams", [])
  451. tray = ams_data[0]["tray"][0]
  452. assert tray["remain"] == 85, "remain should be updated"
  453. assert tray["tray_type"] == "PLA", "tray_type should be preserved"
  454. assert tray["tray_color"] == "00FF00", "tray_color should be preserved"
  455. assert tray["k"] == 0.02, "k should be preserved"
  456. def test_tray_exist_bits_clears_empty_slots(self, mqtt_client):
  457. """Test that tray_exist_bits clears slots marked as empty (Issue #147).
  458. New AMS models (AMS 2 Pro) don't send empty tray data when a spool is removed.
  459. Instead, they update tray_exist_bits to indicate which slots have spools.
  460. """
  461. # Initial state: AMS 0 and AMS 1 with loaded spools
  462. initial_ams = {
  463. "ams": [
  464. {
  465. "id": 0,
  466. "tray": [
  467. {"id": 0, "tray_type": "PLA", "tray_color": "FF0000", "remain": 80},
  468. {"id": 1, "tray_type": "PETG", "tray_color": "00FF00", "remain": 60},
  469. {"id": 2, "tray_type": "ABS", "tray_color": "0000FF", "remain": 40},
  470. {"id": 3, "tray_type": "TPU", "tray_color": "FFFF00", "remain": 20},
  471. ],
  472. },
  473. {
  474. "id": 1,
  475. "tray": [
  476. {"id": 0, "tray_type": "PLA", "tray_color": "FFFFFF", "remain": 90},
  477. {"id": 1, "tray_type": "PLA", "tray_color": "000000", "remain": 70},
  478. {"id": 2, "tray_type": "PLA", "tray_color": "FF00FF", "remain": 50},
  479. {"id": 3, "tray_type": "PLA", "tray_color": "00FFFF", "remain": 30},
  480. ],
  481. },
  482. ],
  483. "tray_exist_bits": "ff", # All 8 slots have spools (0xFF = 11111111)
  484. }
  485. mqtt_client._handle_ams_data(initial_ams)
  486. # Verify initial state
  487. ams_data = mqtt_client.state.raw_data.get("ams", [])
  488. assert ams_data[1]["tray"][3]["tray_type"] == "PLA" # AMS 1 slot 3 (B4) has spool
  489. # Now simulate spool removal from AMS 1 slot 3 (B4)
  490. # tray_exist_bits: 0x7f = 01111111 (bit 7 = 0 means AMS 1 slot 3 is empty)
  491. update_ams = {
  492. "ams": [
  493. {"id": 0, "tray": [{"id": 0}, {"id": 1}, {"id": 2}, {"id": 3}]},
  494. {"id": 1, "tray": [{"id": 0}, {"id": 1}, {"id": 2}, {"id": 3}]},
  495. ],
  496. "tray_exist_bits": "7f", # Bit 7 = 0 -> AMS 1 slot 3 is empty
  497. }
  498. mqtt_client._handle_ams_data(update_ams)
  499. # Verify AMS 1 slot 3 was cleared
  500. ams_data = mqtt_client.state.raw_data.get("ams", [])
  501. b4_tray = ams_data[1]["tray"][3]
  502. assert b4_tray["tray_type"] == "", "tray_type should be cleared for empty slot"
  503. assert b4_tray["remain"] == 0, "remain should be 0 for empty slot"
  504. # Verify other slots are preserved
  505. assert ams_data[0]["tray"][0]["tray_type"] == "PLA", "A1 should still have PLA"
  506. assert ams_data[1]["tray"][0]["tray_type"] == "PLA", "B1 should still have PLA"
  507. class TestNozzleRackData:
  508. """Tests for nozzle rack data parsing from H2 series device.nozzle.info."""
  509. @pytest.fixture
  510. def mqtt_client(self):
  511. """Create a BambuMQTTClient instance for testing."""
  512. from backend.app.services.bambu_mqtt import BambuMQTTClient
  513. client = BambuMQTTClient(
  514. ip_address="192.168.1.100",
  515. serial_number="TEST123",
  516. access_code="12345678",
  517. )
  518. return client
  519. def test_h2c_nozzle_rack_populated_with_8_entries(self, mqtt_client):
  520. """H2C provides 8 nozzle entries: IDs 0,1 (L/R hotend) + 16-21 (rack)."""
  521. payload = {
  522. "print": {
  523. "device": {
  524. "nozzle": {
  525. "info": [
  526. {
  527. "id": 0,
  528. "type": "HS",
  529. "diameter": "0.4",
  530. "wear": 5,
  531. "stat": 1,
  532. "max_temp": 300,
  533. "serial_number": "SN-L",
  534. },
  535. {
  536. "id": 1,
  537. "type": "HS",
  538. "diameter": "0.4",
  539. "wear": 3,
  540. "stat": 0,
  541. "max_temp": 300,
  542. "serial_number": "SN-R",
  543. },
  544. {
  545. "id": 16,
  546. "type": "HS",
  547. "diameter": "0.4",
  548. "wear": 10,
  549. "stat": 0,
  550. "max_temp": 300,
  551. "serial_number": "SN-16",
  552. },
  553. {
  554. "id": 17,
  555. "type": "HH01",
  556. "diameter": "0.6",
  557. "wear": 0,
  558. "stat": 0,
  559. "max_temp": 300,
  560. "serial_number": "SN-17",
  561. },
  562. {
  563. "id": 18,
  564. "type": "HS",
  565. "diameter": "0.4",
  566. "wear": 2,
  567. "stat": 0,
  568. "max_temp": 300,
  569. "serial_number": "SN-18",
  570. },
  571. {
  572. "id": 19,
  573. "type": "",
  574. "diameter": "",
  575. "wear": None,
  576. "stat": None,
  577. "max_temp": 0,
  578. "serial_number": "",
  579. },
  580. {
  581. "id": 20,
  582. "type": "",
  583. "diameter": "",
  584. "wear": None,
  585. "stat": None,
  586. "max_temp": 0,
  587. "serial_number": "",
  588. },
  589. {
  590. "id": 21,
  591. "type": "",
  592. "diameter": "",
  593. "wear": None,
  594. "stat": None,
  595. "max_temp": 0,
  596. "serial_number": "",
  597. },
  598. ]
  599. }
  600. }
  601. }
  602. }
  603. mqtt_client._process_message(payload)
  604. assert len(mqtt_client.state.nozzle_rack) == 8
  605. ids = [n["id"] for n in mqtt_client.state.nozzle_rack]
  606. assert ids == [0, 1, 16, 17, 18, 19, 20, 21]
  607. def test_h2d_nozzle_rack_populated_with_2_entries(self, mqtt_client):
  608. """H2D provides 2 nozzle entries: IDs 0,1 (L/R hotend) — no rack slots."""
  609. payload = {
  610. "print": {
  611. "device": {
  612. "nozzle": {
  613. "info": [
  614. {
  615. "id": 0,
  616. "type": "HS",
  617. "diameter": "0.4",
  618. "wear": 5,
  619. "stat": 1,
  620. "max_temp": 300,
  621. "serial_number": "SN-L",
  622. },
  623. {
  624. "id": 1,
  625. "type": "HS",
  626. "diameter": "0.4",
  627. "wear": 3,
  628. "stat": 1,
  629. "max_temp": 300,
  630. "serial_number": "SN-R",
  631. },
  632. ]
  633. }
  634. }
  635. }
  636. }
  637. mqtt_client._process_message(payload)
  638. assert len(mqtt_client.state.nozzle_rack) == 2
  639. ids = [n["id"] for n in mqtt_client.state.nozzle_rack]
  640. assert ids == [0, 1]
  641. def test_single_nozzle_h2s_populated(self, mqtt_client):
  642. """H2S provides 1 nozzle entry: ID 0 only — single nozzle printer."""
  643. payload = {
  644. "print": {
  645. "device": {
  646. "nozzle": {
  647. "info": [
  648. {
  649. "id": 0,
  650. "type": "HS",
  651. "diameter": "0.4",
  652. "wear": 2,
  653. "stat": 1,
  654. "max_temp": 300,
  655. "serial_number": "SN-0",
  656. },
  657. ]
  658. }
  659. }
  660. }
  661. }
  662. mqtt_client._process_message(payload)
  663. assert len(mqtt_client.state.nozzle_rack) == 1
  664. assert mqtt_client.state.nozzle_rack[0]["id"] == 0
  665. def test_empty_nozzle_info_does_not_populate_rack(self, mqtt_client):
  666. """Empty nozzle info list should not populate nozzle_rack."""
  667. payload = {"print": {"device": {"nozzle": {"info": []}}}}
  668. mqtt_client._process_message(payload)
  669. assert mqtt_client.state.nozzle_rack == []
  670. def test_nozzle_rack_sorted_by_id(self, mqtt_client):
  671. """Nozzle rack entries should be sorted by ID regardless of input order."""
  672. payload = {
  673. "print": {
  674. "device": {
  675. "nozzle": {
  676. "info": [
  677. {"id": 17, "type": "HS", "diameter": "0.6"},
  678. {"id": 0, "type": "HS", "diameter": "0.4"},
  679. {"id": 16, "type": "HS", "diameter": "0.4"},
  680. {"id": 1, "type": "HS", "diameter": "0.4"},
  681. ]
  682. }
  683. }
  684. }
  685. }
  686. mqtt_client._process_message(payload)
  687. ids = [n["id"] for n in mqtt_client.state.nozzle_rack]
  688. assert ids == [0, 1, 16, 17]
  689. def test_nozzle_rack_field_mapping(self, mqtt_client):
  690. """Verify field mapping from MQTT nozzle_info to nozzle_rack dict keys."""
  691. payload = {
  692. "print": {
  693. "device": {
  694. "nozzle": {
  695. "info": [
  696. {
  697. "id": 16,
  698. "type": "HH01",
  699. "diameter": "0.6",
  700. "wear": 15,
  701. "stat": 0,
  702. "max_temp": 320,
  703. "serial_number": "SN-ABC123",
  704. "filament_colour": "FF8800",
  705. "filament_id": "F42",
  706. "tray_type": "ABS",
  707. }
  708. ]
  709. }
  710. }
  711. }
  712. }
  713. mqtt_client._process_message(payload)
  714. slot = mqtt_client.state.nozzle_rack[0]
  715. assert slot["id"] == 16
  716. assert slot["type"] == "HH01"
  717. assert slot["diameter"] == "0.6"
  718. assert slot["wear"] == 15
  719. assert slot["stat"] == 0
  720. assert slot["max_temp"] == 320
  721. assert slot["serial_number"] == "SN-ABC123"
  722. assert slot["filament_color"] == "FF8800"
  723. assert slot["filament_id"] == "F42"
  724. assert slot["filament_type"] == "ABS"
  725. def test_nozzle_info_updates_nozzle_state(self, mqtt_client):
  726. """Nozzle info for IDs 0,1 should also update nozzle state (type/diameter)."""
  727. payload = {
  728. "print": {
  729. "device": {
  730. "nozzle": {
  731. "info": [
  732. {"id": 0, "type": "HS", "diameter": "0.4"},
  733. {"id": 1, "type": "HH01", "diameter": "0.6"},
  734. ]
  735. }
  736. }
  737. }
  738. }
  739. mqtt_client._process_message(payload)
  740. assert mqtt_client.state.nozzles[0].nozzle_type == "HS"
  741. assert mqtt_client.state.nozzles[0].nozzle_diameter == "0.4"
  742. assert mqtt_client.state.nozzles[1].nozzle_type == "HH01"
  743. assert mqtt_client.state.nozzles[1].nozzle_diameter == "0.6"
  744. class TestRequestTopicFailSafe:
  745. """Tests for graceful degradation when broker rejects request topic subscription."""
  746. @pytest.fixture
  747. def mqtt_client(self):
  748. from backend.app.services.bambu_mqtt import BambuMQTTClient
  749. client = BambuMQTTClient(
  750. ip_address="192.168.1.100",
  751. serial_number="TEST123",
  752. access_code="12345678",
  753. )
  754. return client
  755. def test_request_topic_supported_by_default(self, mqtt_client):
  756. """Request topic subscription is attempted by default."""
  757. assert mqtt_client._request_topic_supported is True
  758. assert mqtt_client._request_topic_confirmed is False
  759. def test_on_subscribe_confirms_success(self, mqtt_client):
  760. """Successful SUBACK marks request topic as confirmed."""
  761. from paho.mqtt.reasoncodes import ReasonCode
  762. mqtt_client._request_topic_sub_mid = 42
  763. rc = ReasonCode(9, identifier=0) # SUBACK packetType=9, QoS 0 = success
  764. mqtt_client._on_subscribe(None, None, 42, [rc], None)
  765. assert mqtt_client._request_topic_confirmed is True
  766. assert mqtt_client._request_topic_supported is True
  767. assert mqtt_client._request_topic_sub_mid is None
  768. assert mqtt_client._request_topic_sub_time == 0.0
  769. def test_on_subscribe_detects_rejection(self, mqtt_client):
  770. """SUBACK with failure code disables request topic."""
  771. from paho.mqtt.reasoncodes import ReasonCode
  772. mqtt_client._request_topic_sub_mid = 42
  773. rc = ReasonCode(9, identifier=0x80) # SUBACK packetType=9, 0x80 = failure
  774. mqtt_client._on_subscribe(None, None, 42, [rc], None)
  775. assert mqtt_client._request_topic_supported is False
  776. assert mqtt_client._request_topic_confirmed is False
  777. def test_on_subscribe_ignores_other_mids(self, mqtt_client):
  778. """SUBACK for other subscriptions (e.g. report topic) is ignored."""
  779. from paho.mqtt.reasoncodes import ReasonCode
  780. mqtt_client._request_topic_sub_mid = 42
  781. rc = ReasonCode(9, identifier=0x80)
  782. mqtt_client._on_subscribe(None, None, 99, [rc], None)
  783. # Not affected — mid doesn't match
  784. assert mqtt_client._request_topic_supported is True
  785. def test_disconnect_after_subscription_disables_topic(self, mqtt_client):
  786. """Disconnect within 10s of subscription attempt disables request topic."""
  787. import time
  788. mqtt_client._request_topic_sub_time = time.time()
  789. mqtt_client._request_topic_confirmed = False
  790. mqtt_client._last_message_time = 0.0
  791. mqtt_client._on_disconnect(None, None)
  792. assert mqtt_client._request_topic_supported is False
  793. assert mqtt_client._request_topic_sub_time == 0.0
  794. def test_disconnect_after_confirmation_does_not_disable(self, mqtt_client):
  795. """Disconnect after SUBACK confirmation keeps request topic enabled."""
  796. import time
  797. mqtt_client._request_topic_sub_time = time.time()
  798. mqtt_client._request_topic_confirmed = True
  799. mqtt_client._last_message_time = 0.0
  800. mqtt_client._on_disconnect(None, None)
  801. assert mqtt_client._request_topic_supported is True
  802. def test_late_disconnect_does_not_disable(self, mqtt_client):
  803. """Disconnect long after subscription (>10s) doesn't blame request topic."""
  804. import time
  805. mqtt_client._request_topic_sub_time = time.time() - 30.0
  806. mqtt_client._request_topic_confirmed = False
  807. mqtt_client._last_message_time = 0.0
  808. mqtt_client._on_disconnect(None, None)
  809. assert mqtt_client._request_topic_supported is True
  810. def test_on_connect_skips_request_topic_when_unsupported(self, mqtt_client):
  811. """After marking unsupported, reconnect skips request topic subscription."""
  812. mqtt_client._request_topic_supported = False
  813. subscribe_calls = []
  814. mock_client = type(
  815. "MockClient",
  816. (),
  817. {
  818. "subscribe": lambda self, topic: subscribe_calls.append(topic) or (0, 1),
  819. },
  820. )()
  821. mqtt_client._on_connect(mock_client, None, None, 0)
  822. # Only report topic subscribed, not request topic
  823. assert len(subscribe_calls) == 1
  824. assert subscribe_calls[0] == mqtt_client.topic_subscribe
  825. class TestRequestTopicAmsMapping:
  826. """Tests for capturing ams_mapping from the MQTT request topic."""
  827. @pytest.fixture
  828. def mqtt_client(self):
  829. """Create a BambuMQTTClient instance for testing."""
  830. from backend.app.services.bambu_mqtt import BambuMQTTClient
  831. client = BambuMQTTClient(
  832. ip_address="192.168.1.100",
  833. serial_number="TEST123",
  834. access_code="12345678",
  835. )
  836. return client
  837. def test_captured_ams_mapping_initializes_to_none(self, mqtt_client):
  838. """Verify _captured_ams_mapping starts as None."""
  839. assert mqtt_client._captured_ams_mapping is None
  840. def test_handle_request_message_captures_ams_mapping(self, mqtt_client):
  841. """project_file command with ams_mapping stores the mapping."""
  842. data = {
  843. "print": {
  844. "command": "project_file",
  845. "ams_mapping": [0, 4, -1, -1],
  846. "url": "ftp://192.168.1.100/test.3mf",
  847. }
  848. }
  849. mqtt_client._handle_request_message(data)
  850. assert mqtt_client._captured_ams_mapping == [0, 4, -1, -1]
  851. def test_handle_request_message_ignores_non_print_commands(self, mqtt_client):
  852. """Non-project_file commands don't store ams_mapping."""
  853. data = {
  854. "print": {
  855. "command": "pause",
  856. }
  857. }
  858. mqtt_client._handle_request_message(data)
  859. assert mqtt_client._captured_ams_mapping is None
  860. def test_handle_request_message_ignores_missing_ams_mapping(self, mqtt_client):
  861. """project_file command without ams_mapping doesn't store anything."""
  862. data = {
  863. "print": {
  864. "command": "project_file",
  865. "url": "ftp://192.168.1.100/test.3mf",
  866. }
  867. }
  868. mqtt_client._handle_request_message(data)
  869. assert mqtt_client._captured_ams_mapping is None
  870. def test_handle_request_message_ignores_non_dict_print(self, mqtt_client):
  871. """Non-dict print value is safely ignored."""
  872. data = {"print": "not_a_dict"}
  873. mqtt_client._handle_request_message(data)
  874. assert mqtt_client._captured_ams_mapping is None
  875. def test_handle_request_message_ignores_missing_print(self, mqtt_client):
  876. """Message without print key is safely ignored."""
  877. data = {"pushing": {"command": "pushall"}}
  878. mqtt_client._handle_request_message(data)
  879. assert mqtt_client._captured_ams_mapping is None
  880. def test_captured_mapping_overwrites_previous(self, mqtt_client):
  881. """A new print command overwrites a previously captured mapping."""
  882. mqtt_client._captured_ams_mapping = [0, -1, -1, -1]
  883. data = {
  884. "print": {
  885. "command": "project_file",
  886. "ams_mapping": [4, 8, -1, -1],
  887. }
  888. }
  889. mqtt_client._handle_request_message(data)
  890. assert mqtt_client._captured_ams_mapping == [4, 8, -1, -1]
  891. def test_print_start_callback_includes_ams_mapping(self, mqtt_client):
  892. """on_print_start callback data includes captured ams_mapping."""
  893. start_data = {}
  894. def on_start(data):
  895. start_data.update(data)
  896. mqtt_client.on_print_start = on_start
  897. mqtt_client._captured_ams_mapping = [0, 4, -1, -1]
  898. # Trigger print start
  899. mqtt_client._process_message(
  900. {
  901. "print": {
  902. "gcode_state": "RUNNING",
  903. "gcode_file": "/data/Metadata/test.gcode",
  904. "subtask_name": "Test",
  905. }
  906. }
  907. )
  908. assert start_data.get("ams_mapping") == [0, 4, -1, -1]
  909. def test_print_start_callback_ams_mapping_none_when_not_captured(self, mqtt_client):
  910. """on_print_start callback has ams_mapping=None when no mapping captured."""
  911. start_data = {}
  912. def on_start(data):
  913. start_data.update(data)
  914. mqtt_client.on_print_start = on_start
  915. mqtt_client._process_message(
  916. {
  917. "print": {
  918. "gcode_state": "RUNNING",
  919. "gcode_file": "/data/Metadata/test.gcode",
  920. "subtask_name": "Test",
  921. }
  922. }
  923. )
  924. assert "ams_mapping" in start_data
  925. assert start_data["ams_mapping"] is None
  926. def test_print_complete_callback_includes_ams_mapping(self, mqtt_client):
  927. """on_print_complete callback data includes captured ams_mapping."""
  928. complete_data = {}
  929. def on_complete(data):
  930. complete_data.update(data)
  931. mqtt_client.on_print_start = lambda d: None
  932. mqtt_client.on_print_complete = on_complete
  933. mqtt_client._captured_ams_mapping = [0, 9, -1, -1]
  934. # Start print
  935. mqtt_client._process_message(
  936. {
  937. "print": {
  938. "gcode_state": "RUNNING",
  939. "gcode_file": "/data/Metadata/test.gcode",
  940. "subtask_name": "Test",
  941. }
  942. }
  943. )
  944. # Complete print
  945. mqtt_client._process_message(
  946. {
  947. "print": {
  948. "gcode_state": "FINISH",
  949. "gcode_file": "/data/Metadata/test.gcode",
  950. "subtask_name": "Test",
  951. }
  952. }
  953. )
  954. assert complete_data.get("ams_mapping") == [0, 9, -1, -1]
  955. def test_captured_mapping_cleared_after_print_complete(self, mqtt_client):
  956. """_captured_ams_mapping is reset to None after print completion."""
  957. mqtt_client.on_print_start = lambda d: None
  958. mqtt_client.on_print_complete = lambda d: None
  959. mqtt_client._captured_ams_mapping = [0, 4, -1, -1]
  960. # Start print
  961. mqtt_client._process_message(
  962. {
  963. "print": {
  964. "gcode_state": "RUNNING",
  965. "gcode_file": "/data/Metadata/test.gcode",
  966. "subtask_name": "Test",
  967. }
  968. }
  969. )
  970. # Complete print
  971. mqtt_client._process_message(
  972. {
  973. "print": {
  974. "gcode_state": "FINISH",
  975. "gcode_file": "/data/Metadata/test.gcode",
  976. "subtask_name": "Test",
  977. }
  978. }
  979. )
  980. assert mqtt_client._captured_ams_mapping is None
  981. def test_full_flow_capture_and_deliver(self, mqtt_client):
  982. """Full flow: slicer sends print command → MQTT captures mapping → completion delivers it."""
  983. complete_data = {}
  984. def on_complete(data):
  985. complete_data.update(data)
  986. mqtt_client.on_print_start = lambda d: None
  987. mqtt_client.on_print_complete = on_complete
  988. # 1. Slicer sends print command (captured from request topic)
  989. mqtt_client._handle_request_message(
  990. {
  991. "print": {
  992. "command": "project_file",
  993. "ams_mapping": [4, 9, -1, -1],
  994. "url": "ftp://192.168.1.100/model.3mf",
  995. }
  996. }
  997. )
  998. assert mqtt_client._captured_ams_mapping == [4, 9, -1, -1]
  999. # 2. Printer reports RUNNING
  1000. mqtt_client._process_message(
  1001. {
  1002. "print": {
  1003. "gcode_state": "RUNNING",
  1004. "gcode_file": "/data/Metadata/model.gcode",
  1005. "subtask_name": "Model",
  1006. }
  1007. }
  1008. )
  1009. # 3. Printer reports FINISH
  1010. mqtt_client._process_message(
  1011. {
  1012. "print": {
  1013. "gcode_state": "FINISH",
  1014. "gcode_file": "/data/Metadata/model.gcode",
  1015. "subtask_name": "Model",
  1016. }
  1017. }
  1018. )
  1019. assert complete_data["ams_mapping"] == [4, 9, -1, -1]
  1020. assert complete_data["status"] == "completed"
  1021. # Mapping cleared after completion
  1022. assert mqtt_client._captured_ams_mapping is None
  1023. # ---------------------------------------------------------------------------
  1024. # tray_now disambiguation helpers
  1025. # ---------------------------------------------------------------------------
  1026. def _ams_payload(tray_now, ams_units=None, tray_exist_bits=None):
  1027. """Build minimal print.ams payload for tray_now disambiguation tests."""
  1028. ams = {"tray_now": str(tray_now)}
  1029. if ams_units is not None:
  1030. ams["ams"] = ams_units
  1031. if tray_exist_bits is not None:
  1032. ams["tray_exist_bits"] = tray_exist_bits
  1033. return {"print": {"ams": ams}}
  1034. def _extruder_info_payload(extruders):
  1035. """Build device.extruder.info payload (dual-nozzle detection + snow).
  1036. Each entry in *extruders* is a dict with at least ``id`` and ``snow``.
  1037. """
  1038. return {
  1039. "print": {
  1040. "device": {
  1041. "extruder": {
  1042. "info": extruders,
  1043. }
  1044. }
  1045. }
  1046. }
  1047. def _extruder_state_payload(state_val):
  1048. """Build device.extruder.state payload (active extruder via bit 8)."""
  1049. return {
  1050. "print": {
  1051. "device": {
  1052. "extruder": {
  1053. "state": state_val,
  1054. }
  1055. }
  1056. }
  1057. }
  1058. # ---------------------------------------------------------------------------
  1059. # 1. Single-nozzle X1E — direct passthrough
  1060. # ---------------------------------------------------------------------------
  1061. class TestTrayNowSingleNozzleX1E:
  1062. """Single-nozzle, 1 AMS — tray_now is a direct passthrough."""
  1063. @pytest.fixture
  1064. def mqtt_client(self):
  1065. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1066. return BambuMQTTClient(
  1067. ip_address="192.168.1.100",
  1068. serial_number="TEST_X1E",
  1069. access_code="12345678",
  1070. )
  1071. def test_tray_now_direct_passthrough_slot_0_to_3(self, mqtt_client):
  1072. """Each tray_now 0-3 maps 1:1 on single-nozzle printers."""
  1073. for slot in range(4):
  1074. mqtt_client._process_message(_ams_payload(slot))
  1075. assert mqtt_client.state.tray_now == slot
  1076. def test_tray_now_255_means_unloaded(self, mqtt_client):
  1077. """tray_now=255 means no filament loaded."""
  1078. mqtt_client._process_message(_ams_payload(255))
  1079. assert mqtt_client.state.tray_now == 255
  1080. def test_single_extruder_does_not_trigger_dual_nozzle(self, mqtt_client):
  1081. """device.extruder.info with 1 entry must NOT set _is_dual_nozzle."""
  1082. mqtt_client._process_message(_extruder_info_payload([{"id": 0, "snow": 0xFF00FF}]))
  1083. assert mqtt_client._is_dual_nozzle is False
  1084. def test_last_loaded_tray_survives_unload(self, mqtt_client):
  1085. """Load tray 2, unload → last_loaded_tray stays 2."""
  1086. mqtt_client._process_message(_ams_payload(2))
  1087. assert mqtt_client.state.last_loaded_tray == 2
  1088. mqtt_client._process_message(_ams_payload(255))
  1089. assert mqtt_client.state.tray_now == 255
  1090. assert mqtt_client.state.last_loaded_tray == 2
  1091. # ---------------------------------------------------------------------------
  1092. # 2. Single-nozzle P2S — multiple AMS, global IDs pass through
  1093. # ---------------------------------------------------------------------------
  1094. class TestTrayNowSingleNozzleP2S:
  1095. """Single-nozzle, 2 AMS — global IDs 4-7 for AMS 1 pass through directly."""
  1096. @pytest.fixture
  1097. def mqtt_client(self):
  1098. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1099. return BambuMQTTClient(
  1100. ip_address="192.168.1.100",
  1101. serial_number="TEST_P2S",
  1102. access_code="12345678",
  1103. )
  1104. def test_tray_now_ams1_global_ids_4_to_7(self, mqtt_client):
  1105. """tray_now 4-7 are global IDs for AMS 1 on single-nozzle printers."""
  1106. for global_id in range(4, 8):
  1107. mqtt_client._process_message(_ams_payload(global_id))
  1108. assert mqtt_client.state.tray_now == global_id
  1109. def test_tray_change_across_ams_units(self, mqtt_client):
  1110. """Switch from AMS 0 slot 1 → AMS 1 slot 2 (global 6)."""
  1111. mqtt_client._process_message(_ams_payload(1))
  1112. assert mqtt_client.state.tray_now == 1
  1113. mqtt_client._process_message(_ams_payload(6))
  1114. assert mqtt_client.state.tray_now == 6
  1115. # ---------------------------------------------------------------------------
  1116. # 3. H2D Pro — initial state detection
  1117. # ---------------------------------------------------------------------------
  1118. class TestTrayNowDualNozzleH2DSetup:
  1119. """H2D Pro initial state detection."""
  1120. @pytest.fixture
  1121. def mqtt_client(self):
  1122. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1123. return BambuMQTTClient(
  1124. ip_address="192.168.1.100",
  1125. serial_number="TEST_H2D",
  1126. access_code="12345678",
  1127. )
  1128. def test_dual_nozzle_detected_from_extruder_info(self, mqtt_client):
  1129. """2 entries in device.extruder.info → _is_dual_nozzle=True."""
  1130. mqtt_client._process_message(
  1131. _extruder_info_payload(
  1132. [
  1133. {"id": 0, "snow": 0xFF00FF},
  1134. {"id": 1, "snow": 0xFF00FF},
  1135. ]
  1136. )
  1137. )
  1138. assert mqtt_client._is_dual_nozzle is True
  1139. def test_ams_extruder_map_parsed_from_info_field(self, mqtt_client):
  1140. """AMS 0 info=2003 → right (ext 0), AMS 128 info=2104 → left (ext 1)."""
  1141. ams_units = [
  1142. {"id": 0, "info": 2003, "tray": [{"id": i} for i in range(4)]},
  1143. {"id": 128, "info": 2104, "tray": [{"id": 0}]},
  1144. ]
  1145. payload = {
  1146. "print": {
  1147. "ams": {
  1148. "ams": ams_units,
  1149. "tray_now": "255",
  1150. "tray_exist_bits": "1000f",
  1151. },
  1152. }
  1153. }
  1154. mqtt_client._process_message(payload)
  1155. # info=2003: bit8 = (2003>>8)&1 = 7&1 = 1 → extruder = 1-1 = 0 (right)
  1156. # info=2104: bit8 = (2104>>8)&1 = 8&1 = 0 → extruder = 1-0 = 1 (left)
  1157. assert mqtt_client.state.ams_extruder_map == {"0": 0, "128": 1}
  1158. def test_dual_nozzle_detection_before_ams_in_same_message(self, mqtt_client):
  1159. """Dual-nozzle detection at line 538 happens before _handle_ams_data() at line 549.
  1160. If both arrive in the same message, tray_now disambiguation already uses dual-nozzle logic.
  1161. """
  1162. payload = {
  1163. "print": {
  1164. "device": {
  1165. "extruder": {
  1166. "info": [
  1167. {"id": 0, "snow": 0xFF00FF},
  1168. {"id": 1, "snow": 0xFF00FF},
  1169. ],
  1170. "state": 0x0001,
  1171. }
  1172. },
  1173. "ams": {
  1174. "ams": [
  1175. {"id": 0, "info": 2003, "tray": [{"id": i} for i in range(4)]},
  1176. ],
  1177. "tray_now": "2",
  1178. "tray_exist_bits": "f",
  1179. },
  1180. }
  1181. }
  1182. mqtt_client._process_message(payload)
  1183. # Dual-nozzle was detected; AMS 0 on right extruder (active by default);
  1184. # snow is 0xFF00FF (unloaded), so falls through to ams_extruder_map fallback.
  1185. # Single AMS on extruder 0 → global_id = 0*4+2 = 2
  1186. assert mqtt_client._is_dual_nozzle is True
  1187. assert mqtt_client.state.tray_now == 2
  1188. # ---------------------------------------------------------------------------
  1189. # Shared H2D fixture for classes 4-8
  1190. # ---------------------------------------------------------------------------
  1191. class _H2DFixtureMixin:
  1192. """Mixin providing a pre-configured H2D Pro client."""
  1193. @pytest.fixture
  1194. def mqtt_client(self):
  1195. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1196. return BambuMQTTClient(
  1197. ip_address="192.168.1.100",
  1198. serial_number="TEST_H2D",
  1199. access_code="12345678",
  1200. )
  1201. @pytest.fixture
  1202. def h2d_client(self, mqtt_client):
  1203. """Pre-configure as H2D Pro: dual-nozzle + ams_extruder_map."""
  1204. mqtt_client._process_message(
  1205. {
  1206. "print": {
  1207. "device": {
  1208. "extruder": {
  1209. "info": [
  1210. {"id": 0, "snow": 0xFF00FF},
  1211. {"id": 1, "snow": 0xFF00FF},
  1212. ],
  1213. "state": 0x0001, # right extruder active
  1214. }
  1215. },
  1216. "ams": {
  1217. "ams": [
  1218. {"id": 0, "info": 2003, "tray": [{"id": i} for i in range(4)]},
  1219. {"id": 128, "info": 2104, "tray": [{"id": 0}]},
  1220. ],
  1221. "tray_now": "255",
  1222. "tray_exist_bits": "1000f",
  1223. },
  1224. }
  1225. }
  1226. )
  1227. assert mqtt_client._is_dual_nozzle is True
  1228. assert mqtt_client.state.ams_extruder_map == {"0": 0, "128": 1}
  1229. return mqtt_client
  1230. # ---------------------------------------------------------------------------
  1231. # 4. H2D Snow field disambiguation
  1232. # ---------------------------------------------------------------------------
  1233. class TestTrayNowDualNozzleH2DSnow(_H2DFixtureMixin):
  1234. """Snow field disambiguation (primary path)."""
  1235. def test_snow_disambiguates_ams0_slot(self, h2d_client):
  1236. """snow ext[0]=AMS 0 slot 2, tray_now='2' → global 2."""
  1237. # Send snow update FIRST (snow is parsed AFTER tray_now in the same message,
  1238. # so we need it in a prior message).
  1239. snow_val = 0 << 8 | 2 # AMS 0 slot 2 = raw 2
  1240. h2d_client._process_message(
  1241. _extruder_info_payload(
  1242. [
  1243. {"id": 0, "snow": snow_val},
  1244. {"id": 1, "snow": 0xFF00FF},
  1245. ]
  1246. )
  1247. )
  1248. assert h2d_client.state.h2d_extruder_snow.get(0) == 2
  1249. # Now send tray_now=2
  1250. h2d_client._process_message(_ams_payload(2))
  1251. assert h2d_client.state.tray_now == 2
  1252. def test_snow_disambiguates_ams_ht_to_128(self, h2d_client):
  1253. """snow ext[1]=AMS HT (128), left active, tray_now='0' → global 128."""
  1254. # Snow: extruder 1 → AMS 128 slot 0
  1255. snow_val = 128 << 8 | 0 # = 32768
  1256. h2d_client._process_message(
  1257. _extruder_info_payload(
  1258. [
  1259. {"id": 0, "snow": 0xFF00FF},
  1260. {"id": 1, "snow": snow_val},
  1261. ]
  1262. )
  1263. )
  1264. assert h2d_client.state.h2d_extruder_snow.get(1) == 128
  1265. # Switch to left extruder
  1266. h2d_client._process_message(_extruder_state_payload(0x0100))
  1267. assert h2d_client.state.active_extruder == 1
  1268. # tray_now="0" with left extruder active, snow says AMS HT (128)
  1269. # AMS HT snow_slot = 0 (single slot), parsed_tray_now = 0 → match
  1270. h2d_client._process_message(_ams_payload(0))
  1271. assert h2d_client.state.tray_now == 128
  1272. def test_snow_updates_h2d_extruder_snow_state(self, h2d_client):
  1273. """Verify state.h2d_extruder_snow dict is populated correctly."""
  1274. snow_ext0 = 1 << 8 | 3 # AMS 1 slot 3 → global 7
  1275. snow_ext1 = 0 << 8 | 0 # AMS 0 slot 0 → global 0
  1276. h2d_client._process_message(
  1277. _extruder_info_payload(
  1278. [
  1279. {"id": 0, "snow": snow_ext0},
  1280. {"id": 1, "snow": snow_ext1},
  1281. ]
  1282. )
  1283. )
  1284. assert h2d_client.state.h2d_extruder_snow[0] == 7
  1285. assert h2d_client.state.h2d_extruder_snow[1] == 0
  1286. def test_snow_unloaded_value(self, h2d_client):
  1287. """snow=0xFFFF (ams_id=255, slot=255) → 255 (unloaded)."""
  1288. h2d_client._process_message(
  1289. _extruder_info_payload(
  1290. [
  1291. {"id": 0, "snow": 0xFFFF},
  1292. {"id": 1, "snow": 0xFFFF},
  1293. ]
  1294. )
  1295. )
  1296. assert h2d_client.state.h2d_extruder_snow[0] == 255
  1297. assert h2d_client.state.h2d_extruder_snow[1] == 255
  1298. def test_snow_initial_sentinel_not_stored(self, h2d_client):
  1299. """snow=0xFF00FF (firmware initial sentinel) is not parsed into h2d_extruder_snow."""
  1300. # 0xFF00FF has ams_id=0xFF00=65280 which doesn't match any branch
  1301. h2d_client._process_message(
  1302. _extruder_info_payload(
  1303. [
  1304. {"id": 0, "snow": 0xFF00FF},
  1305. {"id": 1, "snow": 0xFF00FF},
  1306. ]
  1307. )
  1308. )
  1309. # Snow dict should remain empty (no matching branch)
  1310. assert h2d_client.state.h2d_extruder_snow == {}
  1311. # ---------------------------------------------------------------------------
  1312. # 5. H2D Pending target disambiguation
  1313. # ---------------------------------------------------------------------------
  1314. class TestTrayNowDualNozzleH2DPendingTarget(_H2DFixtureMixin):
  1315. """Pending target disambiguation (when Bambuddy initiates load)."""
  1316. def test_pending_target_matches_slot(self, h2d_client):
  1317. """pending=5, tray_now='1' (5%4=1 matches) → tray_now=5."""
  1318. h2d_client.state.pending_tray_target = 5
  1319. h2d_client._process_message(_ams_payload(1))
  1320. assert h2d_client.state.tray_now == 5
  1321. assert h2d_client.state.pending_tray_target is None # cleared
  1322. def test_pending_target_slot_mismatch(self, h2d_client):
  1323. """pending=5, tray_now='2' → uses raw slot, clears pending."""
  1324. h2d_client.state.pending_tray_target = 5
  1325. h2d_client._process_message(_ams_payload(2))
  1326. # Slot 2 != 5%4=1 → mismatch, uses raw slot 2
  1327. assert h2d_client.state.tray_now == 2
  1328. assert h2d_client.state.pending_tray_target is None
  1329. def test_pending_target_takes_priority_over_snow(self, h2d_client):
  1330. """When both pending and snow are set, pending wins."""
  1331. # Set up snow for extruder 0 → AMS 0 slot 1 → global 1
  1332. snow_val = 0 << 8 | 1
  1333. h2d_client._process_message(
  1334. _extruder_info_payload(
  1335. [
  1336. {"id": 0, "snow": snow_val},
  1337. {"id": 1, "snow": 0xFF00FF},
  1338. ]
  1339. )
  1340. )
  1341. assert h2d_client.state.h2d_extruder_snow.get(0) == 1
  1342. # Set pending target to AMS 1 slot 1 (global 5)
  1343. h2d_client.state.pending_tray_target = 5
  1344. # tray_now="1" — matches pending (5%4=1), pending should win over snow
  1345. h2d_client._process_message(_ams_payload(1))
  1346. assert h2d_client.state.tray_now == 5
  1347. # ---------------------------------------------------------------------------
  1348. # 6. H2D ams_extruder_map fallback
  1349. # ---------------------------------------------------------------------------
  1350. class TestTrayNowDualNozzleH2DFallback(_H2DFixtureMixin):
  1351. """ams_extruder_map fallback (no pending, no snow)."""
  1352. def test_single_ams_on_extruder_computes_global_id(self, h2d_client):
  1353. """AMS 0 on right extruder, tray_now='2' → 0*4+2=2."""
  1354. # h2d_client has snow=0xFF00FF (unloaded) by default, so snow path skips
  1355. h2d_client._process_message(_ams_payload(2))
  1356. # AMS 0 is the only AMS on extruder 0 (right, active by default)
  1357. # Fallback: single AMS → global = 0*4+2 = 2
  1358. assert h2d_client.state.tray_now == 2
  1359. def test_multiple_ams_keeps_current_if_valid(self, h2d_client):
  1360. """Current tray matches slot → keeps it (multi-AMS on same extruder)."""
  1361. # Set up: two AMS units on the same extruder (right, ext 0)
  1362. h2d_client.state.ams_extruder_map = {"0": 0, "1": 0}
  1363. # Pre-set tray_now=5 (AMS 1 slot 1) — current_ams=1 which is in ams_on_extruder
  1364. h2d_client.state.tray_now = 5
  1365. # tray_now="1" → 5%4=1 matches → keep current=5
  1366. h2d_client._process_message(_ams_payload(1))
  1367. assert h2d_client.state.tray_now == 5
  1368. def test_no_ams_on_extruder_uses_raw_slot(self, h2d_client):
  1369. """No AMS mapped to the active extruder → raw slot as global ID."""
  1370. # All AMS on left extruder, but right is active
  1371. h2d_client.state.ams_extruder_map = {"0": 1, "128": 1}
  1372. h2d_client._process_message(_ams_payload(2))
  1373. assert h2d_client.state.tray_now == 2
  1374. def test_single_ams_ht_on_extruder_returns_unit_id(self, h2d_client):
  1375. """AMS-HT 128 alone on left extruder, slot 0 → global ID 128 (not 512)."""
  1376. # Switch to left extruder (where AMS-HT 128 is mapped)
  1377. h2d_client._process_message(_extruder_state_payload(0x0100))
  1378. # Only AMS-HT 128 on left extruder; no snow available
  1379. h2d_client._process_message(_ams_payload(0))
  1380. assert h2d_client.state.tray_now == 128
  1381. def test_single_ams_ht_ignores_nonzero_slot(self, h2d_client):
  1382. """AMS-HT has single slot; even if printer reports slot 1, global ID = unit ID."""
  1383. h2d_client.state.ams_extruder_map = {"129": 0}
  1384. h2d_client._process_message(_ams_payload(1))
  1385. # AMS-HT 129: global ID = 129, not 129*4+1=517
  1386. assert h2d_client.state.tray_now == 129
  1387. def test_multiple_ams_keeps_current_ams_ht(self, h2d_client):
  1388. """Current tray is AMS-HT 128, slot 0 reported → keeps 128."""
  1389. h2d_client.state.ams_extruder_map = {"0": 0, "128": 0}
  1390. h2d_client.state.tray_now = 128
  1391. h2d_client._process_message(_ams_payload(0))
  1392. assert h2d_client.state.tray_now == 128
  1393. def test_multiple_ams_slot_nonzero_excludes_ams_ht(self, h2d_client):
  1394. """Slot > 0 eliminates AMS-HT candidates; single regular AMS left → resolves."""
  1395. # AMS 0 + AMS-HT 128 both on right extruder
  1396. h2d_client.state.ams_extruder_map = {"0": 0, "128": 0}
  1397. h2d_client.state.tray_now = 255 # no current match
  1398. # Slot 2 → can't be AMS-HT → only AMS 0 → global = 0*4+2 = 2
  1399. h2d_client._process_message(_ams_payload(2))
  1400. assert h2d_client.state.tray_now == 2
  1401. def test_multiple_ams_slot_nonzero_narrows_to_single_ht_excluded(self, h2d_client):
  1402. """Two regular AMS + one AMS-HT, slot > 0 → AMS-HT excluded but still ambiguous."""
  1403. h2d_client.state.ams_extruder_map = {"0": 0, "1": 0, "128": 0}
  1404. h2d_client.state.tray_now = 255
  1405. # Slot 3 → excludes AMS-HT, but AMS 0 and AMS 1 both remain → ambiguous
  1406. h2d_client._process_message(_ams_payload(3))
  1407. assert h2d_client.state.tray_now == 3 # raw slot fallback
  1408. # ---------------------------------------------------------------------------
  1409. # 6b. H2D last_loaded_tray validation
  1410. # ---------------------------------------------------------------------------
  1411. class TestLastLoadedTrayValidation(_H2DFixtureMixin):
  1412. """last_loaded_tray only stores physically valid tray IDs."""
  1413. def test_regular_ams_tray_stored(self, h2d_client):
  1414. """Valid regular AMS tray (0-15) → stored in last_loaded_tray."""
  1415. h2d_client.state.tray_now = 7
  1416. # Trigger tray_now processing via AMS message
  1417. h2d_client._process_message(
  1418. _extruder_info_payload(
  1419. [
  1420. {"id": 0, "snow": 1 << 8 | 3}, # AMS 1 slot 3 → global 7
  1421. {"id": 1, "snow": 0xFF00FF},
  1422. ]
  1423. )
  1424. )
  1425. h2d_client._process_message(_ams_payload(3))
  1426. assert h2d_client.state.tray_now == 7
  1427. assert h2d_client.state.last_loaded_tray == 7
  1428. def test_ams_ht_tray_stored(self, h2d_client):
  1429. """Valid AMS-HT tray (128-135) → stored in last_loaded_tray."""
  1430. h2d_client._process_message(_extruder_state_payload(0x0100))
  1431. h2d_client._process_message(
  1432. _extruder_info_payload(
  1433. [
  1434. {"id": 0, "snow": 0xFF00FF},
  1435. {"id": 1, "snow": 128 << 8 | 0},
  1436. ]
  1437. )
  1438. )
  1439. h2d_client._process_message(_ams_payload(0))
  1440. assert h2d_client.state.tray_now == 128
  1441. assert h2d_client.state.last_loaded_tray == 128
  1442. def test_unloaded_not_stored(self, h2d_client):
  1443. """tray_now=255 (unloaded) → last_loaded_tray unchanged."""
  1444. h2d_client.state.last_loaded_tray = 5
  1445. h2d_client._process_message(_ams_payload(255))
  1446. assert h2d_client.state.tray_now == 255
  1447. assert h2d_client.state.last_loaded_tray == 5
  1448. # ---------------------------------------------------------------------------
  1449. # 7. H2D Active extruder switching
  1450. # ---------------------------------------------------------------------------
  1451. class TestTrayNowDualNozzleH2DActiveExtruder(_H2DFixtureMixin):
  1452. """Active extruder switching via device.extruder.state bit 8."""
  1453. def test_active_extruder_right_by_default(self, h2d_client):
  1454. """Initial state.active_extruder == 0 (right)."""
  1455. assert h2d_client.state.active_extruder == 0
  1456. def test_extruder_state_bit8_switches_to_left(self, h2d_client):
  1457. """state=0x100 → active_extruder=1 (left)."""
  1458. h2d_client._process_message(_extruder_state_payload(0x0100))
  1459. assert h2d_client.state.active_extruder == 1
  1460. def test_extruder_state_bit8_switches_back_to_right(self, h2d_client):
  1461. """Cycle 0 → 1 → 0."""
  1462. h2d_client._process_message(_extruder_state_payload(0x0100))
  1463. assert h2d_client.state.active_extruder == 1
  1464. h2d_client._process_message(_extruder_state_payload(0x0001))
  1465. assert h2d_client.state.active_extruder == 0
  1466. def test_extruder_switch_changes_tray_disambiguation(self, h2d_client):
  1467. """Snow on both extruders; switching active changes which snow is used."""
  1468. # Snow: ext 0 → AMS 0 slot 1 (global 1), ext 1 → AMS 128 slot 0 (global 128)
  1469. h2d_client._process_message(
  1470. _extruder_info_payload(
  1471. [
  1472. {"id": 0, "snow": 0 << 8 | 1}, # AMS 0 slot 1 → global 1
  1473. {"id": 1, "snow": 128 << 8 | 0}, # AMS HT → global 128
  1474. ]
  1475. )
  1476. )
  1477. # Right active (default) — tray_now="1" → snow ext[0] says global 1
  1478. h2d_client._process_message(_ams_payload(1))
  1479. assert h2d_client.state.tray_now == 1
  1480. # Switch to left
  1481. h2d_client._process_message(_extruder_state_payload(0x0100))
  1482. # Left active — tray_now="0" → snow ext[1] says AMS HT (128), slot 0 matches
  1483. h2d_client._process_message(_ams_payload(0))
  1484. assert h2d_client.state.tray_now == 128
  1485. # ---------------------------------------------------------------------------
  1486. # 8. H2D Full multi-message sequences
  1487. # ---------------------------------------------------------------------------
  1488. class TestTrayNowDualNozzleH2DFullSequence(_H2DFixtureMixin):
  1489. """Multi-message sequences simulating real H2D Pro prints."""
  1490. def test_h2d_right_nozzle_ams0_lifecycle(self, h2d_client):
  1491. """Setup → load AMS 0 slot 1 → verify tray_now=1."""
  1492. # Snow update: extruder 0 loading AMS 0 slot 1
  1493. h2d_client._process_message(
  1494. _extruder_info_payload(
  1495. [
  1496. {"id": 0, "snow": 0 << 8 | 1},
  1497. {"id": 1, "snow": 0xFF00FF},
  1498. ]
  1499. )
  1500. )
  1501. # Printer reports tray_now="1"
  1502. h2d_client._process_message(_ams_payload(1))
  1503. assert h2d_client.state.tray_now == 1
  1504. assert h2d_client.state.last_loaded_tray == 1
  1505. def test_h2d_left_nozzle_ams_ht_lifecycle(self, h2d_client):
  1506. """Setup → switch left → load AMS HT → verify tray_now=128."""
  1507. # Switch to left extruder
  1508. h2d_client._process_message(_extruder_state_payload(0x0100))
  1509. # Snow: ext 1 → AMS HT slot 0
  1510. h2d_client._process_message(
  1511. _extruder_info_payload(
  1512. [
  1513. {"id": 0, "snow": 0xFF00FF},
  1514. {"id": 1, "snow": 128 << 8 | 0},
  1515. ]
  1516. )
  1517. )
  1518. # Printer reports tray_now="0" (AMS HT single slot)
  1519. h2d_client._process_message(_ams_payload(0))
  1520. assert h2d_client.state.tray_now == 128
  1521. assert h2d_client.state.last_loaded_tray == 128
  1522. def test_h2d_multi_color_alternating_nozzles(self, h2d_client):
  1523. """Multi-color print alternating between right and left nozzles.
  1524. Sequence:
  1525. 1. Right loads AMS 0 slot 0 (tray=0)
  1526. 2. Switch left, load AMS HT (tray=128)
  1527. 3. Switch right, snow updates, load AMS 0 slot 2 (tray=2)
  1528. 4. Unload (255)
  1529. """
  1530. # Step 1: Right extruder loads AMS 0 slot 0
  1531. h2d_client._process_message(
  1532. _extruder_info_payload(
  1533. [
  1534. {"id": 0, "snow": 0 << 8 | 0},
  1535. {"id": 1, "snow": 0xFF00FF},
  1536. ]
  1537. )
  1538. )
  1539. h2d_client._process_message(_ams_payload(0))
  1540. assert h2d_client.state.tray_now == 0
  1541. # Step 2: Switch to left, load AMS HT
  1542. h2d_client._process_message(_extruder_state_payload(0x0100))
  1543. h2d_client._process_message(
  1544. _extruder_info_payload(
  1545. [
  1546. {"id": 0, "snow": 0 << 8 | 0},
  1547. {"id": 1, "snow": 128 << 8 | 0},
  1548. ]
  1549. )
  1550. )
  1551. h2d_client._process_message(_ams_payload(0))
  1552. assert h2d_client.state.tray_now == 128
  1553. # Step 3: Switch back to right, load AMS 0 slot 2
  1554. h2d_client._process_message(_extruder_state_payload(0x0001))
  1555. h2d_client._process_message(
  1556. _extruder_info_payload(
  1557. [
  1558. {"id": 0, "snow": 0 << 8 | 2},
  1559. {"id": 1, "snow": 128 << 8 | 0},
  1560. ]
  1561. )
  1562. )
  1563. h2d_client._process_message(_ams_payload(2))
  1564. assert h2d_client.state.tray_now == 2
  1565. # Step 4: Unload
  1566. h2d_client._process_message(_ams_payload(255))
  1567. assert h2d_client.state.tray_now == 255
  1568. assert h2d_client.state.last_loaded_tray == 2