test_bambu_mqtt.py 83 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192
  1. """
  2. Tests for the BambuMQTTClient service.
  3. These tests focus on timelapse tracking during prints.
  4. """
  5. import pytest
  6. class TestTimelapseTracking:
  7. """Tests for timelapse state tracking during prints."""
  8. @pytest.fixture
  9. def mqtt_client(self):
  10. """Create a BambuMQTTClient instance for testing."""
  11. from backend.app.services.bambu_mqtt import BambuMQTTClient
  12. client = BambuMQTTClient(
  13. ip_address="192.168.1.100",
  14. serial_number="TEST123",
  15. access_code="12345678",
  16. )
  17. return client
  18. def test_timelapse_flag_initializes_to_false(self, mqtt_client):
  19. """Verify _timelapse_during_print starts as False."""
  20. assert mqtt_client._timelapse_during_print is False
  21. def test_timelapse_flag_set_when_timelapse_active_during_running(self, mqtt_client):
  22. """Verify timelapse flag is set when timelapse is active while printing."""
  23. # Simulate print running
  24. mqtt_client._was_running = True
  25. mqtt_client.state.timelapse = False
  26. # Simulate xcam data showing timelapse is enabled
  27. xcam_data = {"timelapse": "enable"}
  28. mqtt_client._parse_xcam_data(xcam_data)
  29. assert mqtt_client.state.timelapse is True
  30. assert mqtt_client._timelapse_during_print is True
  31. def test_timelapse_flag_not_set_when_not_running(self, mqtt_client):
  32. """Verify timelapse flag is NOT set when printer not running."""
  33. # Printer is idle (not running)
  34. mqtt_client._was_running = False
  35. mqtt_client.state.timelapse = False
  36. # Timelapse is enabled but we're not printing
  37. xcam_data = {"timelapse": "enable"}
  38. mqtt_client._parse_xcam_data(xcam_data)
  39. assert mqtt_client.state.timelapse is True
  40. # Flag should NOT be set since we're not printing
  41. assert mqtt_client._timelapse_during_print is False
  42. def test_timelapse_flag_persists_after_timelapse_stops(self, mqtt_client):
  43. """Verify timelapse flag stays True even after recording stops."""
  44. # Simulate print running with timelapse
  45. mqtt_client._was_running = True
  46. # Enable timelapse during print
  47. xcam_data = {"timelapse": "enable"}
  48. mqtt_client._parse_xcam_data(xcam_data)
  49. assert mqtt_client._timelapse_during_print is True
  50. # Disable timelapse (recording stops at end of print)
  51. xcam_data = {"timelapse": "disable"}
  52. mqtt_client._parse_xcam_data(xcam_data)
  53. # Flag should still be True (persists until reset)
  54. assert mqtt_client.state.timelapse is False
  55. assert mqtt_client._timelapse_during_print is True
  56. def test_timelapse_flag_from_print_data(self, mqtt_client):
  57. """Verify timelapse flag is set from print data (not just xcam)."""
  58. # Simulate print running
  59. mqtt_client._was_running = True
  60. mqtt_client.state.timelapse = False
  61. mqtt_client._timelapse_during_print = False
  62. # Manually test the timelapse parsing logic from _parse_print_data
  63. # This tests the "timelapse" field in the main print data
  64. data = {"timelapse": True}
  65. mqtt_client.state.timelapse = data["timelapse"] is True
  66. if mqtt_client.state.timelapse and mqtt_client._was_running:
  67. mqtt_client._timelapse_during_print = True
  68. assert mqtt_client._timelapse_during_print is True
  69. class TestPrintCompletionWithTimelapse:
  70. """Tests for print completion including timelapse flag."""
  71. @pytest.fixture
  72. def mqtt_client(self):
  73. """Create a BambuMQTTClient instance for testing."""
  74. from backend.app.services.bambu_mqtt import BambuMQTTClient
  75. client = BambuMQTTClient(
  76. ip_address="192.168.1.100",
  77. serial_number="TEST123",
  78. access_code="12345678",
  79. )
  80. return client
  81. def test_print_complete_includes_timelapse_flag(self, mqtt_client):
  82. """Verify print complete callback includes timelapse_was_active."""
  83. # Set up completion callback
  84. callback_data = {}
  85. def on_complete(data):
  86. callback_data.update(data)
  87. mqtt_client.on_print_complete = on_complete
  88. # Simulate a print that had timelapse active
  89. mqtt_client._was_running = True
  90. mqtt_client._completion_triggered = False
  91. mqtt_client._timelapse_during_print = True
  92. mqtt_client._previous_gcode_state = "RUNNING"
  93. mqtt_client._previous_gcode_file = "test.gcode"
  94. mqtt_client.state.subtask_name = "Test Print"
  95. # Simulate print finish
  96. mqtt_client.state.state = "FINISH"
  97. # Manually trigger the completion logic (simplified)
  98. # In real code this happens in _parse_print_data
  99. should_trigger = (
  100. mqtt_client.state.state in ("FINISH", "FAILED")
  101. and not mqtt_client._completion_triggered
  102. and mqtt_client.on_print_complete
  103. and mqtt_client._previous_gcode_state == "RUNNING"
  104. )
  105. if should_trigger:
  106. status = "completed" if mqtt_client.state.state == "FINISH" else "failed"
  107. timelapse_was_active = mqtt_client._timelapse_during_print
  108. mqtt_client._completion_triggered = True
  109. mqtt_client._was_running = False
  110. mqtt_client._timelapse_during_print = False
  111. mqtt_client.on_print_complete(
  112. {
  113. "status": status,
  114. "filename": mqtt_client._previous_gcode_file,
  115. "subtask_name": mqtt_client.state.subtask_name,
  116. "timelapse_was_active": timelapse_was_active,
  117. }
  118. )
  119. assert "timelapse_was_active" in callback_data
  120. assert callback_data["timelapse_was_active"] is True
  121. def test_print_complete_timelapse_flag_false_when_no_timelapse(self, mqtt_client):
  122. """Verify timelapse_was_active is False when no timelapse during print."""
  123. callback_data = {}
  124. def on_complete(data):
  125. callback_data.update(data)
  126. mqtt_client.on_print_complete = on_complete
  127. # Print without timelapse
  128. mqtt_client._was_running = True
  129. mqtt_client._completion_triggered = False
  130. mqtt_client._timelapse_during_print = False # No timelapse
  131. mqtt_client._previous_gcode_state = "RUNNING"
  132. mqtt_client._previous_gcode_file = "test.gcode"
  133. mqtt_client.state.subtask_name = "Test Print"
  134. mqtt_client.state.state = "FINISH"
  135. # Trigger completion
  136. timelapse_was_active = mqtt_client._timelapse_during_print
  137. mqtt_client.on_print_complete(
  138. {
  139. "status": "completed",
  140. "filename": mqtt_client._previous_gcode_file,
  141. "subtask_name": mqtt_client.state.subtask_name,
  142. "timelapse_was_active": timelapse_was_active,
  143. }
  144. )
  145. assert callback_data["timelapse_was_active"] is False
  146. def test_timelapse_flag_reset_after_completion(self, mqtt_client):
  147. """Verify _timelapse_during_print is reset after print completion."""
  148. mqtt_client._timelapse_during_print = True
  149. mqtt_client._was_running = True
  150. mqtt_client._completion_triggered = False
  151. # Simulate completion reset
  152. mqtt_client._completion_triggered = True
  153. mqtt_client._was_running = False
  154. mqtt_client._timelapse_during_print = False
  155. assert mqtt_client._timelapse_during_print is False
  156. class TestRealisticMessageFlow:
  157. """Tests that simulate realistic MQTT message sequences.
  158. These tests process messages through _process_message to test the full flow,
  159. including the order of xcam parsing vs state detection.
  160. """
  161. @pytest.fixture
  162. def mqtt_client(self):
  163. """Create a BambuMQTTClient instance for testing."""
  164. from backend.app.services.bambu_mqtt import BambuMQTTClient
  165. client = BambuMQTTClient(
  166. ip_address="192.168.1.100",
  167. serial_number="TEST123",
  168. access_code="12345678",
  169. )
  170. return client
  171. def test_timelapse_detected_at_print_start_in_same_message(self, mqtt_client):
  172. """Test that timelapse is detected when xcam and state come in same message.
  173. This is the critical race condition test - xcam data is parsed BEFORE
  174. state detection, so the timelapse flag must be set AFTER _was_running is True.
  175. """
  176. # Callbacks to track events
  177. start_callback_data = {}
  178. def on_start(data):
  179. start_callback_data.update(data)
  180. mqtt_client.on_print_start = on_start
  181. # Initial state - idle
  182. mqtt_client._was_running = False
  183. mqtt_client._timelapse_during_print = False
  184. mqtt_client._previous_gcode_state = None
  185. # Simulate first message when print starts - contains both xcam and gcode_state
  186. # This is the realistic scenario from the printer
  187. # NOTE: Real MQTT messages wrap print data inside a "print" key
  188. payload = {
  189. "print": {
  190. "gcode_state": "RUNNING",
  191. "gcode_file": "/data/Metadata/test_print.gcode",
  192. "subtask_name": "Test_Print",
  193. "xcam": {
  194. "timelapse": "enable", # Timelapse is enabled in this print
  195. "printing_monitor": True,
  196. },
  197. "mc_percent": 0,
  198. "mc_remaining_time": 3600,
  199. }
  200. }
  201. # Process the message (this is what happens in real MQTT flow)
  202. mqtt_client._process_message(payload)
  203. # Verify timelapse was detected even though xcam is parsed before state
  204. assert mqtt_client._was_running is True, "_was_running should be True after RUNNING state"
  205. assert mqtt_client.state.timelapse is True, "state.timelapse should be True"
  206. assert mqtt_client._timelapse_during_print is True, (
  207. "timelapse_during_print should be True when timelapse is in the same message as RUNNING state"
  208. )
  209. def test_timelapse_not_detected_when_disabled(self, mqtt_client):
  210. """Test that timelapse is NOT detected when disabled in xcam data."""
  211. mqtt_client.on_print_start = lambda data: None
  212. # Initial state - idle
  213. mqtt_client._was_running = False
  214. mqtt_client._timelapse_during_print = False
  215. mqtt_client._previous_gcode_state = None
  216. # Print starts without timelapse
  217. payload = {
  218. "print": {
  219. "gcode_state": "RUNNING",
  220. "gcode_file": "/data/Metadata/test_print.gcode",
  221. "subtask_name": "Test_Print",
  222. "xcam": {
  223. "timelapse": "disable", # Timelapse is disabled
  224. "printing_monitor": True,
  225. },
  226. }
  227. }
  228. mqtt_client._process_message(payload)
  229. assert mqtt_client._was_running is True
  230. assert mqtt_client.state.timelapse is False
  231. assert mqtt_client._timelapse_during_print is False
  232. def test_timelapse_detected_when_enabled_after_print_start(self, mqtt_client):
  233. """Test timelapse detected when enabled in a message after print starts."""
  234. mqtt_client.on_print_start = lambda data: None
  235. # First message - print starts without timelapse info
  236. payload_start = {
  237. "print": {
  238. "gcode_state": "RUNNING",
  239. "gcode_file": "/data/Metadata/test_print.gcode",
  240. "subtask_name": "Test_Print",
  241. }
  242. }
  243. mqtt_client._process_message(payload_start)
  244. assert mqtt_client._was_running is True
  245. assert mqtt_client._timelapse_during_print is False # Not detected yet
  246. # Second message - xcam data arrives with timelapse enabled
  247. payload_xcam = {
  248. "print": {
  249. "gcode_state": "RUNNING",
  250. "gcode_file": "/data/Metadata/test_print.gcode",
  251. "subtask_name": "Test_Print",
  252. "xcam": {
  253. "timelapse": "enable",
  254. },
  255. }
  256. }
  257. mqtt_client._process_message(payload_xcam)
  258. # Now timelapse should be detected because _was_running is already True
  259. assert mqtt_client._timelapse_during_print is True
  260. def test_print_complete_includes_timelapse_flag_full_flow(self, mqtt_client):
  261. """Test full print lifecycle with timelapse - from start to completion."""
  262. start_data = {}
  263. complete_data = {}
  264. def on_start(data):
  265. start_data.update(data)
  266. def on_complete(data):
  267. complete_data.update(data)
  268. mqtt_client.on_print_start = on_start
  269. mqtt_client.on_print_complete = on_complete
  270. # 1. Print starts with timelapse
  271. mqtt_client._process_message(
  272. {
  273. "print": {
  274. "gcode_state": "RUNNING",
  275. "gcode_file": "/data/Metadata/test.gcode",
  276. "subtask_name": "Test",
  277. "xcam": {"timelapse": "enable"},
  278. }
  279. }
  280. )
  281. assert mqtt_client._timelapse_during_print is True
  282. assert "subtask_name" in start_data
  283. # 2. Print continues (multiple messages)
  284. for _ in range(3):
  285. mqtt_client._process_message(
  286. {
  287. "print": {
  288. "gcode_state": "RUNNING",
  289. "gcode_file": "/data/Metadata/test.gcode",
  290. "subtask_name": "Test",
  291. "mc_percent": 50,
  292. }
  293. }
  294. )
  295. # Timelapse flag should still be True
  296. assert mqtt_client._timelapse_during_print is True
  297. # 3. Print completes
  298. mqtt_client._process_message(
  299. {
  300. "print": {
  301. "gcode_state": "FINISH",
  302. "gcode_file": "/data/Metadata/test.gcode",
  303. "subtask_name": "Test",
  304. }
  305. }
  306. )
  307. # Verify completion callback received timelapse flag
  308. assert "timelapse_was_active" in complete_data
  309. assert complete_data["timelapse_was_active"] is True
  310. assert complete_data["status"] == "completed"
  311. # Flags should be reset after completion
  312. assert mqtt_client._timelapse_during_print is False
  313. assert mqtt_client._was_running is False
  314. def test_print_failed_includes_timelapse_flag(self, mqtt_client):
  315. """Test that failed print also includes timelapse flag."""
  316. complete_data = {}
  317. def on_complete(data):
  318. complete_data.update(data)
  319. mqtt_client.on_print_start = lambda data: None
  320. mqtt_client.on_print_complete = on_complete
  321. # Start with timelapse
  322. mqtt_client._process_message(
  323. {
  324. "print": {
  325. "gcode_state": "RUNNING",
  326. "gcode_file": "/data/Metadata/test.gcode",
  327. "subtask_name": "Test",
  328. "xcam": {"timelapse": "enable"},
  329. }
  330. }
  331. )
  332. # Print fails
  333. mqtt_client._process_message(
  334. {
  335. "print": {
  336. "gcode_state": "FAILED",
  337. "gcode_file": "/data/Metadata/test.gcode",
  338. "subtask_name": "Test",
  339. }
  340. }
  341. )
  342. assert complete_data["timelapse_was_active"] is True
  343. assert complete_data["status"] == "failed"
  344. class TestAMSDataMerging:
  345. """Tests for AMS data merging, particularly handling empty slots."""
  346. @pytest.fixture
  347. def mqtt_client(self):
  348. """Create a BambuMQTTClient instance for testing."""
  349. from backend.app.services.bambu_mqtt import BambuMQTTClient
  350. client = BambuMQTTClient(
  351. ip_address="192.168.1.100",
  352. serial_number="TEST123",
  353. access_code="12345678",
  354. )
  355. return client
  356. def test_empty_slot_clears_tray_type(self, mqtt_client):
  357. """Test that empty slot update clears tray_type (Issue #147).
  358. When a spool is removed from an old AMS, the printer sends empty values.
  359. These must overwrite the previous values to show the slot as empty.
  360. """
  361. # Initial state: AMS unit with a loaded spool
  362. initial_ams = {
  363. "ams": [
  364. {
  365. "id": 0,
  366. "tray": [
  367. {
  368. "id": 0,
  369. "tray_type": "PLA",
  370. "tray_sub_brands": "Bambu PLA Basic",
  371. "tray_color": "FF0000",
  372. "tag_uid": "1234567890ABCDEF",
  373. "remain": 80,
  374. }
  375. ],
  376. }
  377. ]
  378. }
  379. mqtt_client._handle_ams_data(initial_ams)
  380. # Verify initial state
  381. ams_data = mqtt_client.state.raw_data.get("ams", [])
  382. assert len(ams_data) == 1
  383. tray = ams_data[0]["tray"][0]
  384. assert tray["tray_type"] == "PLA"
  385. assert tray["tray_color"] == "FF0000"
  386. # Now simulate spool removal - printer sends empty values
  387. empty_update = {
  388. "ams": [
  389. {
  390. "id": 0,
  391. "tray": [
  392. {
  393. "id": 0,
  394. "tray_type": "", # Empty = slot is empty
  395. "tray_sub_brands": "",
  396. "tray_color": "",
  397. "tag_uid": "0000000000000000", # Zero UID
  398. "remain": 0,
  399. }
  400. ],
  401. }
  402. ]
  403. }
  404. mqtt_client._handle_ams_data(empty_update)
  405. # Verify empty values were applied (not ignored by merge logic)
  406. ams_data = mqtt_client.state.raw_data.get("ams", [])
  407. tray = ams_data[0]["tray"][0]
  408. assert tray["tray_type"] == "", "tray_type should be cleared when slot is empty"
  409. assert tray["tray_color"] == "", "tray_color should be cleared when slot is empty"
  410. assert tray["tray_sub_brands"] == "", "tray_sub_brands should be cleared"
  411. assert tray["tag_uid"] == "0000000000000000", "tag_uid should be cleared"
  412. def test_partial_update_preserves_other_fields(self, mqtt_client):
  413. """Test that partial updates still preserve non-slot-status fields."""
  414. # Initial state with full data
  415. initial_ams = {
  416. "ams": [
  417. {
  418. "id": 0,
  419. "humidity": "3",
  420. "temp": "25.5",
  421. "tray": [
  422. {
  423. "id": 0,
  424. "tray_type": "PLA",
  425. "tray_color": "00FF00",
  426. "remain": 90,
  427. "k": 0.02,
  428. }
  429. ],
  430. }
  431. ]
  432. }
  433. mqtt_client._handle_ams_data(initial_ams)
  434. # Partial update - only remain changes
  435. partial_update = {
  436. "ams": [
  437. {
  438. "id": 0,
  439. "tray": [
  440. {
  441. "id": 0,
  442. "remain": 85, # Only this changed
  443. }
  444. ],
  445. }
  446. ]
  447. }
  448. mqtt_client._handle_ams_data(partial_update)
  449. # Verify remain was updated but other fields preserved
  450. ams_data = mqtt_client.state.raw_data.get("ams", [])
  451. tray = ams_data[0]["tray"][0]
  452. assert tray["remain"] == 85, "remain should be updated"
  453. assert tray["tray_type"] == "PLA", "tray_type should be preserved"
  454. assert tray["tray_color"] == "00FF00", "tray_color should be preserved"
  455. assert tray["k"] == 0.02, "k should be preserved"
  456. def test_tray_exist_bits_clears_empty_slots(self, mqtt_client):
  457. """Test that tray_exist_bits clears slots marked as empty (Issue #147).
  458. New AMS models (AMS 2 Pro) don't send empty tray data when a spool is removed.
  459. Instead, they update tray_exist_bits to indicate which slots have spools.
  460. """
  461. # Initial state: AMS 0 and AMS 1 with loaded spools
  462. initial_ams = {
  463. "ams": [
  464. {
  465. "id": 0,
  466. "tray": [
  467. {"id": 0, "tray_type": "PLA", "tray_color": "FF0000", "remain": 80},
  468. {"id": 1, "tray_type": "PETG", "tray_color": "00FF00", "remain": 60},
  469. {"id": 2, "tray_type": "ABS", "tray_color": "0000FF", "remain": 40},
  470. {"id": 3, "tray_type": "TPU", "tray_color": "FFFF00", "remain": 20},
  471. ],
  472. },
  473. {
  474. "id": 1,
  475. "tray": [
  476. {"id": 0, "tray_type": "PLA", "tray_color": "FFFFFF", "remain": 90},
  477. {"id": 1, "tray_type": "PLA", "tray_color": "000000", "remain": 70},
  478. {"id": 2, "tray_type": "PLA", "tray_color": "FF00FF", "remain": 50},
  479. {"id": 3, "tray_type": "PLA", "tray_color": "00FFFF", "remain": 30},
  480. ],
  481. },
  482. ],
  483. "tray_exist_bits": "ff", # All 8 slots have spools (0xFF = 11111111)
  484. }
  485. mqtt_client._handle_ams_data(initial_ams)
  486. # Verify initial state
  487. ams_data = mqtt_client.state.raw_data.get("ams", [])
  488. assert ams_data[1]["tray"][3]["tray_type"] == "PLA" # AMS 1 slot 3 (B4) has spool
  489. # Now simulate spool removal from AMS 1 slot 3 (B4)
  490. # tray_exist_bits: 0x7f = 01111111 (bit 7 = 0 means AMS 1 slot 3 is empty)
  491. update_ams = {
  492. "ams": [
  493. {"id": 0, "tray": [{"id": 0}, {"id": 1}, {"id": 2}, {"id": 3}]},
  494. {"id": 1, "tray": [{"id": 0}, {"id": 1}, {"id": 2}, {"id": 3}]},
  495. ],
  496. "tray_exist_bits": "7f", # Bit 7 = 0 -> AMS 1 slot 3 is empty
  497. }
  498. mqtt_client._handle_ams_data(update_ams)
  499. # Verify AMS 1 slot 3 was cleared
  500. ams_data = mqtt_client.state.raw_data.get("ams", [])
  501. b4_tray = ams_data[1]["tray"][3]
  502. assert b4_tray["tray_type"] == "", "tray_type should be cleared for empty slot"
  503. assert b4_tray["remain"] == 0, "remain should be 0 for empty slot"
  504. # Verify other slots are preserved
  505. assert ams_data[0]["tray"][0]["tray_type"] == "PLA", "A1 should still have PLA"
  506. assert ams_data[1]["tray"][0]["tray_type"] == "PLA", "B1 should still have PLA"
  507. class TestNozzleRackData:
  508. """Tests for nozzle rack data parsing from H2 series device.nozzle.info."""
  509. @pytest.fixture
  510. def mqtt_client(self):
  511. """Create a BambuMQTTClient instance for testing."""
  512. from backend.app.services.bambu_mqtt import BambuMQTTClient
  513. client = BambuMQTTClient(
  514. ip_address="192.168.1.100",
  515. serial_number="TEST123",
  516. access_code="12345678",
  517. )
  518. return client
  519. def test_h2c_nozzle_rack_populated_with_8_entries(self, mqtt_client):
  520. """H2C provides 8 nozzle entries: IDs 0,1 (L/R hotend) + 16-21 (rack)."""
  521. payload = {
  522. "print": {
  523. "device": {
  524. "nozzle": {
  525. "info": [
  526. {
  527. "id": 0,
  528. "type": "HS",
  529. "diameter": "0.4",
  530. "wear": 5,
  531. "stat": 1,
  532. "max_temp": 300,
  533. "serial_number": "SN-L",
  534. },
  535. {
  536. "id": 1,
  537. "type": "HS",
  538. "diameter": "0.4",
  539. "wear": 3,
  540. "stat": 0,
  541. "max_temp": 300,
  542. "serial_number": "SN-R",
  543. },
  544. {
  545. "id": 16,
  546. "type": "HS",
  547. "diameter": "0.4",
  548. "wear": 10,
  549. "stat": 0,
  550. "max_temp": 300,
  551. "serial_number": "SN-16",
  552. },
  553. {
  554. "id": 17,
  555. "type": "HH01",
  556. "diameter": "0.6",
  557. "wear": 0,
  558. "stat": 0,
  559. "max_temp": 300,
  560. "serial_number": "SN-17",
  561. },
  562. {
  563. "id": 18,
  564. "type": "HS",
  565. "diameter": "0.4",
  566. "wear": 2,
  567. "stat": 0,
  568. "max_temp": 300,
  569. "serial_number": "SN-18",
  570. },
  571. {
  572. "id": 19,
  573. "type": "",
  574. "diameter": "",
  575. "wear": None,
  576. "stat": None,
  577. "max_temp": 0,
  578. "serial_number": "",
  579. },
  580. {
  581. "id": 20,
  582. "type": "",
  583. "diameter": "",
  584. "wear": None,
  585. "stat": None,
  586. "max_temp": 0,
  587. "serial_number": "",
  588. },
  589. {
  590. "id": 21,
  591. "type": "",
  592. "diameter": "",
  593. "wear": None,
  594. "stat": None,
  595. "max_temp": 0,
  596. "serial_number": "",
  597. },
  598. ]
  599. }
  600. }
  601. }
  602. }
  603. mqtt_client._process_message(payload)
  604. assert len(mqtt_client.state.nozzle_rack) == 8
  605. ids = [n["id"] for n in mqtt_client.state.nozzle_rack]
  606. assert ids == [0, 1, 16, 17, 18, 19, 20, 21]
  607. def test_h2d_nozzle_rack_populated_with_2_entries(self, mqtt_client):
  608. """H2D provides 2 nozzle entries: IDs 0,1 (L/R hotend) — no rack slots."""
  609. payload = {
  610. "print": {
  611. "device": {
  612. "nozzle": {
  613. "info": [
  614. {
  615. "id": 0,
  616. "type": "HS",
  617. "diameter": "0.4",
  618. "wear": 5,
  619. "stat": 1,
  620. "max_temp": 300,
  621. "serial_number": "SN-L",
  622. },
  623. {
  624. "id": 1,
  625. "type": "HS",
  626. "diameter": "0.4",
  627. "wear": 3,
  628. "stat": 1,
  629. "max_temp": 300,
  630. "serial_number": "SN-R",
  631. },
  632. ]
  633. }
  634. }
  635. }
  636. }
  637. mqtt_client._process_message(payload)
  638. assert len(mqtt_client.state.nozzle_rack) == 2
  639. ids = [n["id"] for n in mqtt_client.state.nozzle_rack]
  640. assert ids == [0, 1]
  641. def test_single_nozzle_h2s_populated(self, mqtt_client):
  642. """H2S provides 1 nozzle entry: ID 0 only — single nozzle printer."""
  643. payload = {
  644. "print": {
  645. "device": {
  646. "nozzle": {
  647. "info": [
  648. {
  649. "id": 0,
  650. "type": "HS",
  651. "diameter": "0.4",
  652. "wear": 2,
  653. "stat": 1,
  654. "max_temp": 300,
  655. "serial_number": "SN-0",
  656. },
  657. ]
  658. }
  659. }
  660. }
  661. }
  662. mqtt_client._process_message(payload)
  663. assert len(mqtt_client.state.nozzle_rack) == 1
  664. assert mqtt_client.state.nozzle_rack[0]["id"] == 0
  665. def test_empty_nozzle_info_does_not_populate_rack(self, mqtt_client):
  666. """Empty nozzle info list should not populate nozzle_rack."""
  667. payload = {"print": {"device": {"nozzle": {"info": []}}}}
  668. mqtt_client._process_message(payload)
  669. assert mqtt_client.state.nozzle_rack == []
  670. def test_nozzle_rack_sorted_by_id(self, mqtt_client):
  671. """Nozzle rack entries should be sorted by ID regardless of input order."""
  672. payload = {
  673. "print": {
  674. "device": {
  675. "nozzle": {
  676. "info": [
  677. {"id": 17, "type": "HS", "diameter": "0.6"},
  678. {"id": 0, "type": "HS", "diameter": "0.4"},
  679. {"id": 16, "type": "HS", "diameter": "0.4"},
  680. {"id": 1, "type": "HS", "diameter": "0.4"},
  681. ]
  682. }
  683. }
  684. }
  685. }
  686. mqtt_client._process_message(payload)
  687. ids = [n["id"] for n in mqtt_client.state.nozzle_rack]
  688. assert ids == [0, 1, 16, 17]
  689. def test_nozzle_rack_field_mapping(self, mqtt_client):
  690. """Verify field mapping from MQTT nozzle_info to nozzle_rack dict keys."""
  691. payload = {
  692. "print": {
  693. "device": {
  694. "nozzle": {
  695. "info": [
  696. {
  697. "id": 16,
  698. "type": "HH01",
  699. "diameter": "0.6",
  700. "wear": 15,
  701. "stat": 0,
  702. "max_temp": 320,
  703. "serial_number": "SN-ABC123",
  704. "filament_colour": "FF8800",
  705. "filament_id": "F42",
  706. "tray_type": "ABS",
  707. }
  708. ]
  709. }
  710. }
  711. }
  712. }
  713. mqtt_client._process_message(payload)
  714. slot = mqtt_client.state.nozzle_rack[0]
  715. assert slot["id"] == 16
  716. assert slot["type"] == "HH01"
  717. assert slot["diameter"] == "0.6"
  718. assert slot["wear"] == 15
  719. assert slot["stat"] == 0
  720. assert slot["max_temp"] == 320
  721. assert slot["serial_number"] == "SN-ABC123"
  722. assert slot["filament_color"] == "FF8800"
  723. assert slot["filament_id"] == "F42"
  724. assert slot["filament_type"] == "ABS"
  725. def test_nozzle_info_updates_nozzle_state(self, mqtt_client):
  726. """Nozzle info for IDs 0,1 should also update nozzle state (type/diameter)."""
  727. payload = {
  728. "print": {
  729. "device": {
  730. "nozzle": {
  731. "info": [
  732. {"id": 0, "type": "HS", "diameter": "0.4"},
  733. {"id": 1, "type": "HH01", "diameter": "0.6"},
  734. ]
  735. }
  736. }
  737. }
  738. }
  739. mqtt_client._process_message(payload)
  740. assert mqtt_client.state.nozzles[0].nozzle_type == "HS"
  741. assert mqtt_client.state.nozzles[0].nozzle_diameter == "0.4"
  742. assert mqtt_client.state.nozzles[1].nozzle_type == "HH01"
  743. assert mqtt_client.state.nozzles[1].nozzle_diameter == "0.6"
  744. class TestRequestTopicFailSafe:
  745. """Tests for graceful degradation when broker rejects request topic subscription."""
  746. @pytest.fixture
  747. def mqtt_client(self):
  748. from backend.app.services.bambu_mqtt import BambuMQTTClient
  749. client = BambuMQTTClient(
  750. ip_address="192.168.1.100",
  751. serial_number="TEST123",
  752. access_code="12345678",
  753. )
  754. return client
  755. def test_request_topic_supported_by_default(self, mqtt_client):
  756. """Request topic subscription is attempted by default."""
  757. assert mqtt_client._request_topic_supported is True
  758. assert mqtt_client._request_topic_confirmed is False
  759. def test_on_subscribe_confirms_success(self, mqtt_client):
  760. """Successful SUBACK marks request topic as confirmed."""
  761. from paho.mqtt.reasoncodes import ReasonCode
  762. mqtt_client._request_topic_sub_mid = 42
  763. rc = ReasonCode(9, identifier=0) # SUBACK packetType=9, QoS 0 = success
  764. mqtt_client._on_subscribe(None, None, 42, [rc], None)
  765. assert mqtt_client._request_topic_confirmed is True
  766. assert mqtt_client._request_topic_supported is True
  767. assert mqtt_client._request_topic_sub_mid is None
  768. assert mqtt_client._request_topic_sub_time == 0.0
  769. def test_on_subscribe_detects_rejection(self, mqtt_client):
  770. """SUBACK with failure code disables request topic."""
  771. from paho.mqtt.reasoncodes import ReasonCode
  772. mqtt_client._request_topic_sub_mid = 42
  773. rc = ReasonCode(9, identifier=0x80) # SUBACK packetType=9, 0x80 = failure
  774. mqtt_client._on_subscribe(None, None, 42, [rc], None)
  775. assert mqtt_client._request_topic_supported is False
  776. assert mqtt_client._request_topic_confirmed is False
  777. def test_on_subscribe_ignores_other_mids(self, mqtt_client):
  778. """SUBACK for other subscriptions (e.g. report topic) is ignored."""
  779. from paho.mqtt.reasoncodes import ReasonCode
  780. mqtt_client._request_topic_sub_mid = 42
  781. rc = ReasonCode(9, identifier=0x80)
  782. mqtt_client._on_subscribe(None, None, 99, [rc], None)
  783. # Not affected — mid doesn't match
  784. assert mqtt_client._request_topic_supported is True
  785. def test_disconnect_after_subscription_disables_topic(self, mqtt_client):
  786. """Disconnect within 10s of subscription attempt disables request topic."""
  787. import time
  788. mqtt_client._request_topic_sub_time = time.time()
  789. mqtt_client._request_topic_confirmed = False
  790. mqtt_client._last_message_time = 0.0
  791. mqtt_client._on_disconnect(None, None)
  792. assert mqtt_client._request_topic_supported is False
  793. assert mqtt_client._request_topic_sub_time == 0.0
  794. def test_disconnect_after_confirmation_does_not_disable(self, mqtt_client):
  795. """Disconnect after SUBACK confirmation keeps request topic enabled."""
  796. import time
  797. mqtt_client._request_topic_sub_time = time.time()
  798. mqtt_client._request_topic_confirmed = True
  799. mqtt_client._last_message_time = 0.0
  800. mqtt_client._on_disconnect(None, None)
  801. assert mqtt_client._request_topic_supported is True
  802. def test_late_disconnect_does_not_disable(self, mqtt_client):
  803. """Disconnect long after subscription (>10s) doesn't blame request topic."""
  804. import time
  805. mqtt_client._request_topic_sub_time = time.time() - 30.0
  806. mqtt_client._request_topic_confirmed = False
  807. mqtt_client._last_message_time = 0.0
  808. mqtt_client._on_disconnect(None, None)
  809. assert mqtt_client._request_topic_supported is True
  810. def test_on_connect_skips_request_topic_when_unsupported(self, mqtt_client):
  811. """After marking unsupported, reconnect skips request topic subscription."""
  812. mqtt_client._request_topic_supported = False
  813. subscribe_calls = []
  814. mock_client = type(
  815. "MockClient",
  816. (),
  817. {
  818. "subscribe": lambda self, topic: subscribe_calls.append(topic) or (0, 1),
  819. },
  820. )()
  821. mqtt_client._on_connect(mock_client, None, None, 0)
  822. # Only report topic subscribed, not request topic
  823. assert len(subscribe_calls) == 1
  824. assert subscribe_calls[0] == mqtt_client.topic_subscribe
  825. class TestRequestTopicAmsMapping:
  826. """Tests for capturing ams_mapping from the MQTT request topic."""
  827. @pytest.fixture
  828. def mqtt_client(self):
  829. """Create a BambuMQTTClient instance for testing."""
  830. from backend.app.services.bambu_mqtt import BambuMQTTClient
  831. client = BambuMQTTClient(
  832. ip_address="192.168.1.100",
  833. serial_number="TEST123",
  834. access_code="12345678",
  835. )
  836. return client
  837. def test_captured_ams_mapping_initializes_to_none(self, mqtt_client):
  838. """Verify _captured_ams_mapping starts as None."""
  839. assert mqtt_client._captured_ams_mapping is None
  840. def test_handle_request_message_captures_ams_mapping(self, mqtt_client):
  841. """project_file command with ams_mapping stores the mapping."""
  842. data = {
  843. "print": {
  844. "command": "project_file",
  845. "ams_mapping": [0, 4, -1, -1],
  846. "url": "ftp://192.168.1.100/test.3mf",
  847. }
  848. }
  849. mqtt_client._handle_request_message(data)
  850. assert mqtt_client._captured_ams_mapping == [0, 4, -1, -1]
  851. def test_handle_request_message_ignores_non_print_commands(self, mqtt_client):
  852. """Non-project_file commands don't store ams_mapping."""
  853. data = {
  854. "print": {
  855. "command": "pause",
  856. }
  857. }
  858. mqtt_client._handle_request_message(data)
  859. assert mqtt_client._captured_ams_mapping is None
  860. def test_handle_request_message_ignores_missing_ams_mapping(self, mqtt_client):
  861. """project_file command without ams_mapping doesn't store anything."""
  862. data = {
  863. "print": {
  864. "command": "project_file",
  865. "url": "ftp://192.168.1.100/test.3mf",
  866. }
  867. }
  868. mqtt_client._handle_request_message(data)
  869. assert mqtt_client._captured_ams_mapping is None
  870. def test_handle_request_message_ignores_non_dict_print(self, mqtt_client):
  871. """Non-dict print value is safely ignored."""
  872. data = {"print": "not_a_dict"}
  873. mqtt_client._handle_request_message(data)
  874. assert mqtt_client._captured_ams_mapping is None
  875. def test_handle_request_message_ignores_missing_print(self, mqtt_client):
  876. """Message without print key is safely ignored."""
  877. data = {"pushing": {"command": "pushall"}}
  878. mqtt_client._handle_request_message(data)
  879. assert mqtt_client._captured_ams_mapping is None
  880. def test_captured_mapping_overwrites_previous(self, mqtt_client):
  881. """A new print command overwrites a previously captured mapping."""
  882. mqtt_client._captured_ams_mapping = [0, -1, -1, -1]
  883. data = {
  884. "print": {
  885. "command": "project_file",
  886. "ams_mapping": [4, 8, -1, -1],
  887. }
  888. }
  889. mqtt_client._handle_request_message(data)
  890. assert mqtt_client._captured_ams_mapping == [4, 8, -1, -1]
  891. def test_print_start_callback_includes_ams_mapping(self, mqtt_client):
  892. """on_print_start callback data includes captured ams_mapping."""
  893. start_data = {}
  894. def on_start(data):
  895. start_data.update(data)
  896. mqtt_client.on_print_start = on_start
  897. mqtt_client._captured_ams_mapping = [0, 4, -1, -1]
  898. # Trigger print start
  899. mqtt_client._process_message(
  900. {
  901. "print": {
  902. "gcode_state": "RUNNING",
  903. "gcode_file": "/data/Metadata/test.gcode",
  904. "subtask_name": "Test",
  905. }
  906. }
  907. )
  908. assert start_data.get("ams_mapping") == [0, 4, -1, -1]
  909. def test_print_start_callback_ams_mapping_none_when_not_captured(self, mqtt_client):
  910. """on_print_start callback has ams_mapping=None when no mapping captured."""
  911. start_data = {}
  912. def on_start(data):
  913. start_data.update(data)
  914. mqtt_client.on_print_start = on_start
  915. mqtt_client._process_message(
  916. {
  917. "print": {
  918. "gcode_state": "RUNNING",
  919. "gcode_file": "/data/Metadata/test.gcode",
  920. "subtask_name": "Test",
  921. }
  922. }
  923. )
  924. assert "ams_mapping" in start_data
  925. assert start_data["ams_mapping"] is None
  926. def test_print_complete_callback_includes_ams_mapping(self, mqtt_client):
  927. """on_print_complete callback data includes captured ams_mapping."""
  928. complete_data = {}
  929. def on_complete(data):
  930. complete_data.update(data)
  931. mqtt_client.on_print_start = lambda d: None
  932. mqtt_client.on_print_complete = on_complete
  933. mqtt_client._captured_ams_mapping = [0, 9, -1, -1]
  934. # Start print
  935. mqtt_client._process_message(
  936. {
  937. "print": {
  938. "gcode_state": "RUNNING",
  939. "gcode_file": "/data/Metadata/test.gcode",
  940. "subtask_name": "Test",
  941. }
  942. }
  943. )
  944. # Complete print
  945. mqtt_client._process_message(
  946. {
  947. "print": {
  948. "gcode_state": "FINISH",
  949. "gcode_file": "/data/Metadata/test.gcode",
  950. "subtask_name": "Test",
  951. }
  952. }
  953. )
  954. assert complete_data.get("ams_mapping") == [0, 9, -1, -1]
  955. def test_captured_mapping_cleared_after_print_complete(self, mqtt_client):
  956. """_captured_ams_mapping is reset to None after print completion."""
  957. mqtt_client.on_print_start = lambda d: None
  958. mqtt_client.on_print_complete = lambda d: None
  959. mqtt_client._captured_ams_mapping = [0, 4, -1, -1]
  960. # Start print
  961. mqtt_client._process_message(
  962. {
  963. "print": {
  964. "gcode_state": "RUNNING",
  965. "gcode_file": "/data/Metadata/test.gcode",
  966. "subtask_name": "Test",
  967. }
  968. }
  969. )
  970. # Complete print
  971. mqtt_client._process_message(
  972. {
  973. "print": {
  974. "gcode_state": "FINISH",
  975. "gcode_file": "/data/Metadata/test.gcode",
  976. "subtask_name": "Test",
  977. }
  978. }
  979. )
  980. assert mqtt_client._captured_ams_mapping is None
  981. def test_full_flow_capture_and_deliver(self, mqtt_client):
  982. """Full flow: slicer sends print command → MQTT captures mapping → completion delivers it."""
  983. complete_data = {}
  984. def on_complete(data):
  985. complete_data.update(data)
  986. mqtt_client.on_print_start = lambda d: None
  987. mqtt_client.on_print_complete = on_complete
  988. # 1. Slicer sends print command (captured from request topic)
  989. mqtt_client._handle_request_message(
  990. {
  991. "print": {
  992. "command": "project_file",
  993. "ams_mapping": [4, 9, -1, -1],
  994. "url": "ftp://192.168.1.100/model.3mf",
  995. }
  996. }
  997. )
  998. assert mqtt_client._captured_ams_mapping == [4, 9, -1, -1]
  999. # 2. Printer reports RUNNING
  1000. mqtt_client._process_message(
  1001. {
  1002. "print": {
  1003. "gcode_state": "RUNNING",
  1004. "gcode_file": "/data/Metadata/model.gcode",
  1005. "subtask_name": "Model",
  1006. }
  1007. }
  1008. )
  1009. # 3. Printer reports FINISH
  1010. mqtt_client._process_message(
  1011. {
  1012. "print": {
  1013. "gcode_state": "FINISH",
  1014. "gcode_file": "/data/Metadata/model.gcode",
  1015. "subtask_name": "Model",
  1016. }
  1017. }
  1018. )
  1019. assert complete_data["ams_mapping"] == [4, 9, -1, -1]
  1020. assert complete_data["status"] == "completed"
  1021. # Mapping cleared after completion
  1022. assert mqtt_client._captured_ams_mapping is None
  1023. # ---------------------------------------------------------------------------
  1024. # tray_now disambiguation helpers
  1025. # ---------------------------------------------------------------------------
  1026. def _ams_payload(tray_now, ams_units=None, tray_exist_bits=None, ams_exist_bits=None):
  1027. """Build minimal print.ams payload for tray_now disambiguation tests."""
  1028. ams = {"tray_now": str(tray_now)}
  1029. if ams_units is not None:
  1030. ams["ams"] = ams_units
  1031. if tray_exist_bits is not None:
  1032. ams["tray_exist_bits"] = tray_exist_bits
  1033. if ams_exist_bits is not None:
  1034. ams["ams_exist_bits"] = ams_exist_bits
  1035. return {"print": {"ams": ams}}
  1036. def _extruder_info_payload(extruders):
  1037. """Build device.extruder.info payload (dual-nozzle detection + snow).
  1038. Each entry in *extruders* is a dict with at least ``id`` and ``snow``.
  1039. """
  1040. return {
  1041. "print": {
  1042. "device": {
  1043. "extruder": {
  1044. "info": extruders,
  1045. }
  1046. }
  1047. }
  1048. }
  1049. def _extruder_state_payload(state_val):
  1050. """Build device.extruder.state payload (active extruder via bit 8)."""
  1051. return {
  1052. "print": {
  1053. "device": {
  1054. "extruder": {
  1055. "state": state_val,
  1056. }
  1057. }
  1058. }
  1059. }
  1060. # ---------------------------------------------------------------------------
  1061. # 1. Single-nozzle X1E — direct passthrough
  1062. # ---------------------------------------------------------------------------
  1063. class TestTrayNowSingleNozzleX1E:
  1064. """Single-nozzle, 1 AMS — tray_now is a direct passthrough."""
  1065. @pytest.fixture
  1066. def mqtt_client(self):
  1067. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1068. return BambuMQTTClient(
  1069. ip_address="192.168.1.100",
  1070. serial_number="TEST_X1E",
  1071. access_code="12345678",
  1072. )
  1073. def test_tray_now_direct_passthrough_slot_0_to_3(self, mqtt_client):
  1074. """Each tray_now 0-3 maps 1:1 on single-nozzle printers."""
  1075. for slot in range(4):
  1076. mqtt_client._process_message(_ams_payload(slot))
  1077. assert mqtt_client.state.tray_now == slot
  1078. def test_tray_now_255_means_unloaded(self, mqtt_client):
  1079. """tray_now=255 means no filament loaded."""
  1080. mqtt_client._process_message(_ams_payload(255))
  1081. assert mqtt_client.state.tray_now == 255
  1082. def test_single_extruder_does_not_trigger_dual_nozzle(self, mqtt_client):
  1083. """device.extruder.info with 1 entry must NOT set _is_dual_nozzle."""
  1084. mqtt_client._process_message(_extruder_info_payload([{"id": 0, "snow": 0xFF00FF}]))
  1085. assert mqtt_client._is_dual_nozzle is False
  1086. def test_last_loaded_tray_survives_unload(self, mqtt_client):
  1087. """Load tray 2, unload → last_loaded_tray stays 2."""
  1088. mqtt_client._process_message(_ams_payload(2))
  1089. assert mqtt_client.state.last_loaded_tray == 2
  1090. mqtt_client._process_message(_ams_payload(255))
  1091. assert mqtt_client.state.tray_now == 255
  1092. assert mqtt_client.state.last_loaded_tray == 2
  1093. # ---------------------------------------------------------------------------
  1094. # 2. Single-nozzle P2S — multiple AMS, global IDs pass through
  1095. # ---------------------------------------------------------------------------
  1096. class TestTrayNowSingleNozzleP2S:
  1097. """Single-nozzle, 2 AMS — tray_now > 3 passes through as global ID."""
  1098. @pytest.fixture
  1099. def mqtt_client(self):
  1100. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1101. return BambuMQTTClient(
  1102. ip_address="192.168.1.100",
  1103. serial_number="TEST_P2S",
  1104. access_code="12345678",
  1105. )
  1106. def test_tray_now_ams1_global_ids_4_to_7(self, mqtt_client):
  1107. """tray_now 4-7 are global IDs for AMS 1 on single-nozzle printers."""
  1108. for global_id in range(4, 8):
  1109. mqtt_client._process_message(_ams_payload(global_id))
  1110. assert mqtt_client.state.tray_now == global_id
  1111. def test_tray_change_across_ams_units(self, mqtt_client):
  1112. """Switch from AMS 0 slot 1 → AMS 1 slot 2 (global 6)."""
  1113. mqtt_client._process_message(_ams_payload(1))
  1114. assert mqtt_client.state.tray_now == 1
  1115. mqtt_client._process_message(_ams_payload(6))
  1116. assert mqtt_client.state.tray_now == 6
  1117. # ---------------------------------------------------------------------------
  1118. # 2b. Single-nozzle P2S — multi-AMS local slot disambiguation (#420)
  1119. # ---------------------------------------------------------------------------
  1120. class TestTrayNowP2SMultiAmsDisambiguation:
  1121. """P2S firmware sends local slot IDs (0-3) in tray_now even with dual AMS.
  1122. When ams_exist_bits indicates >1 AMS unit and tray_now is 0-3, the backend
  1123. should use the MQTT mapping field (snow-encoded) to resolve the correct
  1124. global tray ID.
  1125. """
  1126. @pytest.fixture
  1127. def mqtt_client(self):
  1128. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1129. client = BambuMQTTClient(
  1130. ip_address="192.168.1.100",
  1131. serial_number="TEST_P2S_DUAL",
  1132. access_code="12345678",
  1133. )
  1134. return client
  1135. def test_resolves_ams1_slot1_from_mapping(self, mqtt_client):
  1136. """tray_now=1 with mapping=[257] → global ID 5 (AMS1-T1).
  1137. 257 snow-decoded: ams_hw_id=1, slot=1 → global 1*4+1=5.
  1138. """
  1139. # Set mapping field in raw_data (as the MQTT handler would)
  1140. mqtt_client.state.raw_data["mapping"] = [257]
  1141. mqtt_client._process_message(
  1142. _ams_payload(1, ams_exist_bits="3") # '3' = 0b11 → AMS 0 and 1
  1143. )
  1144. assert mqtt_client.state.tray_now == 5
  1145. def test_resolves_ams1_slot0_from_mapping(self, mqtt_client):
  1146. """tray_now=0 with mapping=[256] → global ID 4 (AMS1-T0).
  1147. 256 snow-decoded: ams_hw_id=1, slot=0 → global 1*4+0=4.
  1148. """
  1149. mqtt_client.state.raw_data["mapping"] = [256]
  1150. mqtt_client._process_message(_ams_payload(0, ams_exist_bits="3"))
  1151. assert mqtt_client.state.tray_now == 4
  1152. def test_resolves_ams1_slot3_from_mapping(self, mqtt_client):
  1153. """tray_now=3 with mapping=[259] → global ID 7 (AMS1-T3).
  1154. 259 snow-decoded: ams_hw_id=1, slot=3 → global 1*4+3=7.
  1155. """
  1156. mqtt_client.state.raw_data["mapping"] = [259]
  1157. mqtt_client._process_message(_ams_payload(3, ams_exist_bits="3"))
  1158. assert mqtt_client.state.tray_now == 7
  1159. def test_ams0_slot_unchanged_when_mapping_confirms_ams0(self, mqtt_client):
  1160. """tray_now=1 with mapping=[1] → stays 1 (AMS0-T1).
  1161. 1 snow-decoded: ams_hw_id=0, slot=1 → global 0*4+1=1.
  1162. """
  1163. mqtt_client.state.raw_data["mapping"] = [1]
  1164. mqtt_client._process_message(_ams_payload(1, ams_exist_bits="3"))
  1165. assert mqtt_client.state.tray_now == 1
  1166. def test_multicolor_resolves_ams1_from_multi_entry_mapping(self, mqtt_client):
  1167. """Multi-color print: mapping=[0, 257] → tray_now=1 resolves to AMS1-T1 (5).
  1168. Entry 0: ams_hw_id=0, slot=0 (local 0) — doesn't match tray_now=1.
  1169. Entry 257: ams_hw_id=1, slot=1 (local 1) — matches tray_now=1 → global 5.
  1170. """
  1171. mqtt_client.state.raw_data["mapping"] = [0, 257]
  1172. mqtt_client._process_message(_ams_payload(1, ams_exist_bits="3"))
  1173. assert mqtt_client.state.tray_now == 5
  1174. def test_multicolor_four_slot_mapping(self, mqtt_client):
  1175. """mapping=[65535, 65535, 65535, 257] → tray_now=1 resolves to global 5.
  1176. Only entry 257 has local slot=1, other entries are unmapped (65535).
  1177. Reproduces exact data from issue #420 support package.
  1178. """
  1179. mqtt_client.state.raw_data["mapping"] = [65535, 65535, 65535, 257]
  1180. mqtt_client._process_message(_ams_payload(1, ams_exist_bits="3"))
  1181. assert mqtt_client.state.tray_now == 5
  1182. def test_ambiguous_mapping_falls_back_to_local_slot(self, mqtt_client):
  1183. """Two AMS units with same local slot in mapping → ambiguous, keep local slot.
  1184. mapping=[1, 257]: both have local slot 1 (AMS0-T1 and AMS1-T1).
  1185. Cannot disambiguate → fall back to tray_now=1.
  1186. """
  1187. mqtt_client.state.raw_data["mapping"] = [1, 257]
  1188. mqtt_client._process_message(_ams_payload(1, ams_exist_bits="3"))
  1189. assert mqtt_client.state.tray_now == 1
  1190. def test_no_mapping_falls_back_to_local_slot(self, mqtt_client):
  1191. """No mapping field available → fall back to raw tray_now."""
  1192. # No mapping in raw_data (e.g. manual filament load, not during print)
  1193. mqtt_client._process_message(_ams_payload(1, ams_exist_bits="3"))
  1194. assert mqtt_client.state.tray_now == 1
  1195. def test_empty_mapping_falls_back_to_local_slot(self, mqtt_client):
  1196. """Empty mapping list → fall back to raw tray_now."""
  1197. mqtt_client.state.raw_data["mapping"] = []
  1198. mqtt_client._process_message(_ams_payload(1, ams_exist_bits="3"))
  1199. assert mqtt_client.state.tray_now == 1
  1200. def test_single_ams_passthrough(self, mqtt_client):
  1201. """Single AMS (ams_exist_bits='1') → tray_now 0-3 is direct global ID."""
  1202. mqtt_client._process_message(_ams_payload(2, ams_exist_bits="1"))
  1203. assert mqtt_client.state.tray_now == 2
  1204. def test_no_ams_exist_bits_passthrough(self, mqtt_client):
  1205. """No ams_exist_bits in payload → fall back to raw tray_now."""
  1206. mqtt_client._process_message(_ams_payload(1))
  1207. assert mqtt_client.state.tray_now == 1
  1208. def test_tray_now_255_unaffected_by_multi_ams(self, mqtt_client):
  1209. """tray_now=255 (unloaded) passes through regardless of AMS count."""
  1210. mqtt_client.state.raw_data["mapping"] = [257]
  1211. mqtt_client._process_message(_ams_payload(255, ams_exist_bits="3"))
  1212. assert mqtt_client.state.tray_now == 255
  1213. def test_tray_now_above_3_unaffected(self, mqtt_client):
  1214. """tray_now > 3 is already a global ID and passes through directly."""
  1215. mqtt_client._process_message(_ams_payload(6, ams_exist_bits="3"))
  1216. assert mqtt_client.state.tray_now == 6
  1217. def test_last_loaded_tray_uses_resolved_global_id(self, mqtt_client):
  1218. """last_loaded_tray should reflect the resolved global ID, not local slot."""
  1219. mqtt_client.state.raw_data["mapping"] = [257]
  1220. mqtt_client.state.state = "RUNNING"
  1221. mqtt_client._process_message(_ams_payload(1, ams_exist_bits="3"))
  1222. assert mqtt_client.state.tray_now == 5
  1223. assert mqtt_client.state.last_loaded_tray == 5
  1224. class TestResolveLocalSlotFromMapping:
  1225. """Unit tests for _resolve_local_slot_from_mapping static method."""
  1226. def test_single_match_ams0(self):
  1227. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1228. assert BambuMQTTClient._resolve_local_slot_from_mapping(1, [1]) == 1
  1229. def test_single_match_ams1(self):
  1230. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1231. # 257 = 1*256 + 1 → AMS1 slot1 → global 5
  1232. assert BambuMQTTClient._resolve_local_slot_from_mapping(1, [257]) == 5
  1233. def test_single_match_ams2(self):
  1234. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1235. # 514 = 2*256 + 2 → AMS2 slot2 → global 10
  1236. assert BambuMQTTClient._resolve_local_slot_from_mapping(2, [514]) == 10
  1237. def test_unmapped_entries_skipped(self):
  1238. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1239. assert BambuMQTTClient._resolve_local_slot_from_mapping(1, [65535, 65535, 65535, 257]) == 5
  1240. def test_no_match_returns_none(self):
  1241. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1242. # mapping has slot 0 only, looking for slot 2
  1243. assert BambuMQTTClient._resolve_local_slot_from_mapping(2, [0]) is None
  1244. def test_ambiguous_returns_none(self):
  1245. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1246. # Both AMS0 slot1 (1) and AMS1 slot1 (257) → ambiguous
  1247. assert BambuMQTTClient._resolve_local_slot_from_mapping(1, [1, 257]) is None
  1248. def test_none_mapping_returns_none(self):
  1249. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1250. assert BambuMQTTClient._resolve_local_slot_from_mapping(1, None) is None
  1251. def test_empty_mapping_returns_none(self):
  1252. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1253. assert BambuMQTTClient._resolve_local_slot_from_mapping(1, []) is None
  1254. def test_ams_ht_slot0_match(self):
  1255. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1256. # AMS-HT id=128: snow = 128*256 + 0 = 32768
  1257. assert BambuMQTTClient._resolve_local_slot_from_mapping(0, [32768]) == 128
  1258. # ---------------------------------------------------------------------------
  1259. # 3. H2D Pro — initial state detection
  1260. # ---------------------------------------------------------------------------
  1261. class TestTrayNowDualNozzleH2DSetup:
  1262. """H2D Pro initial state detection."""
  1263. @pytest.fixture
  1264. def mqtt_client(self):
  1265. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1266. return BambuMQTTClient(
  1267. ip_address="192.168.1.100",
  1268. serial_number="TEST_H2D",
  1269. access_code="12345678",
  1270. )
  1271. def test_dual_nozzle_detected_from_extruder_info(self, mqtt_client):
  1272. """2 entries in device.extruder.info → _is_dual_nozzle=True."""
  1273. mqtt_client._process_message(
  1274. _extruder_info_payload(
  1275. [
  1276. {"id": 0, "snow": 0xFF00FF},
  1277. {"id": 1, "snow": 0xFF00FF},
  1278. ]
  1279. )
  1280. )
  1281. assert mqtt_client._is_dual_nozzle is True
  1282. def test_ams_extruder_map_parsed_from_info_field(self, mqtt_client):
  1283. """AMS 0 info=2003 → right (ext 0), AMS 128 info=2104 → left (ext 1)."""
  1284. ams_units = [
  1285. {"id": 0, "info": 2003, "tray": [{"id": i} for i in range(4)]},
  1286. {"id": 128, "info": 2104, "tray": [{"id": 0}]},
  1287. ]
  1288. payload = {
  1289. "print": {
  1290. "ams": {
  1291. "ams": ams_units,
  1292. "tray_now": "255",
  1293. "tray_exist_bits": "1000f",
  1294. },
  1295. }
  1296. }
  1297. mqtt_client._process_message(payload)
  1298. # info=2003: bit8 = (2003>>8)&1 = 7&1 = 1 → extruder = 1-1 = 0 (right)
  1299. # info=2104: bit8 = (2104>>8)&1 = 8&1 = 0 → extruder = 1-0 = 1 (left)
  1300. assert mqtt_client.state.ams_extruder_map == {"0": 0, "128": 1}
  1301. def test_dual_nozzle_detection_before_ams_in_same_message(self, mqtt_client):
  1302. """Dual-nozzle detection at line 538 happens before _handle_ams_data() at line 549.
  1303. If both arrive in the same message, tray_now disambiguation already uses dual-nozzle logic.
  1304. """
  1305. payload = {
  1306. "print": {
  1307. "device": {
  1308. "extruder": {
  1309. "info": [
  1310. {"id": 0, "snow": 0xFF00FF},
  1311. {"id": 1, "snow": 0xFF00FF},
  1312. ],
  1313. "state": 0x0001,
  1314. }
  1315. },
  1316. "ams": {
  1317. "ams": [
  1318. {"id": 0, "info": 2003, "tray": [{"id": i} for i in range(4)]},
  1319. ],
  1320. "tray_now": "2",
  1321. "tray_exist_bits": "f",
  1322. },
  1323. }
  1324. }
  1325. mqtt_client._process_message(payload)
  1326. # Dual-nozzle was detected; AMS 0 on right extruder (active by default);
  1327. # snow is 0xFF00FF (unloaded), so falls through to ams_extruder_map fallback.
  1328. # Single AMS on extruder 0 → global_id = 0*4+2 = 2
  1329. assert mqtt_client._is_dual_nozzle is True
  1330. assert mqtt_client.state.tray_now == 2
  1331. # ---------------------------------------------------------------------------
  1332. # Shared H2D fixture for classes 4-8
  1333. # ---------------------------------------------------------------------------
  1334. class _H2DFixtureMixin:
  1335. """Mixin providing a pre-configured H2D Pro client."""
  1336. @pytest.fixture
  1337. def mqtt_client(self):
  1338. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1339. return BambuMQTTClient(
  1340. ip_address="192.168.1.100",
  1341. serial_number="TEST_H2D",
  1342. access_code="12345678",
  1343. )
  1344. @pytest.fixture
  1345. def h2d_client(self, mqtt_client):
  1346. """Pre-configure as H2D Pro: dual-nozzle + ams_extruder_map."""
  1347. mqtt_client._process_message(
  1348. {
  1349. "print": {
  1350. "device": {
  1351. "extruder": {
  1352. "info": [
  1353. {"id": 0, "snow": 0xFF00FF},
  1354. {"id": 1, "snow": 0xFF00FF},
  1355. ],
  1356. "state": 0x0001, # right extruder active
  1357. }
  1358. },
  1359. "ams": {
  1360. "ams": [
  1361. {"id": 0, "info": 2003, "tray": [{"id": i} for i in range(4)]},
  1362. {"id": 128, "info": 2104, "tray": [{"id": 0}]},
  1363. ],
  1364. "tray_now": "255",
  1365. "tray_exist_bits": "1000f",
  1366. },
  1367. }
  1368. }
  1369. )
  1370. assert mqtt_client._is_dual_nozzle is True
  1371. assert mqtt_client.state.ams_extruder_map == {"0": 0, "128": 1}
  1372. return mqtt_client
  1373. # ---------------------------------------------------------------------------
  1374. # 4. H2D Snow field disambiguation
  1375. # ---------------------------------------------------------------------------
  1376. class TestTrayNowDualNozzleH2DSnow(_H2DFixtureMixin):
  1377. """Snow field disambiguation (primary path)."""
  1378. def test_snow_disambiguates_ams0_slot(self, h2d_client):
  1379. """snow ext[0]=AMS 0 slot 2, tray_now='2' → global 2."""
  1380. # Send snow update FIRST (snow is parsed AFTER tray_now in the same message,
  1381. # so we need it in a prior message).
  1382. snow_val = 0 << 8 | 2 # AMS 0 slot 2 = raw 2
  1383. h2d_client._process_message(
  1384. _extruder_info_payload(
  1385. [
  1386. {"id": 0, "snow": snow_val},
  1387. {"id": 1, "snow": 0xFF00FF},
  1388. ]
  1389. )
  1390. )
  1391. assert h2d_client.state.h2d_extruder_snow.get(0) == 2
  1392. # Now send tray_now=2
  1393. h2d_client._process_message(_ams_payload(2))
  1394. assert h2d_client.state.tray_now == 2
  1395. def test_snow_disambiguates_ams_ht_to_128(self, h2d_client):
  1396. """snow ext[1]=AMS HT (128), left active, tray_now='0' → global 128."""
  1397. # Snow: extruder 1 → AMS 128 slot 0
  1398. snow_val = 128 << 8 | 0 # = 32768
  1399. h2d_client._process_message(
  1400. _extruder_info_payload(
  1401. [
  1402. {"id": 0, "snow": 0xFF00FF},
  1403. {"id": 1, "snow": snow_val},
  1404. ]
  1405. )
  1406. )
  1407. assert h2d_client.state.h2d_extruder_snow.get(1) == 128
  1408. # Switch to left extruder
  1409. h2d_client._process_message(_extruder_state_payload(0x0100))
  1410. assert h2d_client.state.active_extruder == 1
  1411. # tray_now="0" with left extruder active, snow says AMS HT (128)
  1412. # AMS HT snow_slot = 0 (single slot), parsed_tray_now = 0 → match
  1413. h2d_client._process_message(_ams_payload(0))
  1414. assert h2d_client.state.tray_now == 128
  1415. def test_snow_updates_h2d_extruder_snow_state(self, h2d_client):
  1416. """Verify state.h2d_extruder_snow dict is populated correctly."""
  1417. snow_ext0 = 1 << 8 | 3 # AMS 1 slot 3 → global 7
  1418. snow_ext1 = 0 << 8 | 0 # AMS 0 slot 0 → global 0
  1419. h2d_client._process_message(
  1420. _extruder_info_payload(
  1421. [
  1422. {"id": 0, "snow": snow_ext0},
  1423. {"id": 1, "snow": snow_ext1},
  1424. ]
  1425. )
  1426. )
  1427. assert h2d_client.state.h2d_extruder_snow[0] == 7
  1428. assert h2d_client.state.h2d_extruder_snow[1] == 0
  1429. def test_snow_unloaded_value(self, h2d_client):
  1430. """snow=0xFFFF (ams_id=255, slot=255) → 255 (unloaded)."""
  1431. h2d_client._process_message(
  1432. _extruder_info_payload(
  1433. [
  1434. {"id": 0, "snow": 0xFFFF},
  1435. {"id": 1, "snow": 0xFFFF},
  1436. ]
  1437. )
  1438. )
  1439. assert h2d_client.state.h2d_extruder_snow[0] == 255
  1440. assert h2d_client.state.h2d_extruder_snow[1] == 255
  1441. def test_snow_initial_sentinel_not_stored(self, h2d_client):
  1442. """snow=0xFF00FF (firmware initial sentinel) is not parsed into h2d_extruder_snow."""
  1443. # 0xFF00FF has ams_id=0xFF00=65280 which doesn't match any branch
  1444. h2d_client._process_message(
  1445. _extruder_info_payload(
  1446. [
  1447. {"id": 0, "snow": 0xFF00FF},
  1448. {"id": 1, "snow": 0xFF00FF},
  1449. ]
  1450. )
  1451. )
  1452. # Snow dict should remain empty (no matching branch)
  1453. assert h2d_client.state.h2d_extruder_snow == {}
  1454. # ---------------------------------------------------------------------------
  1455. # 5. H2D Pending target disambiguation
  1456. # ---------------------------------------------------------------------------
  1457. class TestTrayNowDualNozzleH2DPendingTarget(_H2DFixtureMixin):
  1458. """Pending target disambiguation (when Bambuddy initiates load)."""
  1459. def test_pending_target_matches_slot(self, h2d_client):
  1460. """pending=5, tray_now='1' (5%4=1 matches) → tray_now=5."""
  1461. h2d_client.state.pending_tray_target = 5
  1462. h2d_client._process_message(_ams_payload(1))
  1463. assert h2d_client.state.tray_now == 5
  1464. assert h2d_client.state.pending_tray_target is None # cleared
  1465. def test_pending_target_slot_mismatch(self, h2d_client):
  1466. """pending=5, tray_now='2' → uses raw slot, clears pending."""
  1467. h2d_client.state.pending_tray_target = 5
  1468. h2d_client._process_message(_ams_payload(2))
  1469. # Slot 2 != 5%4=1 → mismatch, uses raw slot 2
  1470. assert h2d_client.state.tray_now == 2
  1471. assert h2d_client.state.pending_tray_target is None
  1472. def test_pending_target_takes_priority_over_snow(self, h2d_client):
  1473. """When both pending and snow are set, pending wins."""
  1474. # Set up snow for extruder 0 → AMS 0 slot 1 → global 1
  1475. snow_val = 0 << 8 | 1
  1476. h2d_client._process_message(
  1477. _extruder_info_payload(
  1478. [
  1479. {"id": 0, "snow": snow_val},
  1480. {"id": 1, "snow": 0xFF00FF},
  1481. ]
  1482. )
  1483. )
  1484. assert h2d_client.state.h2d_extruder_snow.get(0) == 1
  1485. # Set pending target to AMS 1 slot 1 (global 5)
  1486. h2d_client.state.pending_tray_target = 5
  1487. # tray_now="1" — matches pending (5%4=1), pending should win over snow
  1488. h2d_client._process_message(_ams_payload(1))
  1489. assert h2d_client.state.tray_now == 5
  1490. # ---------------------------------------------------------------------------
  1491. # 6. H2D ams_extruder_map fallback
  1492. # ---------------------------------------------------------------------------
  1493. class TestTrayNowDualNozzleH2DFallback(_H2DFixtureMixin):
  1494. """ams_extruder_map fallback (no pending, no snow)."""
  1495. def test_single_ams_on_extruder_computes_global_id(self, h2d_client):
  1496. """AMS 0 on right extruder, tray_now='2' → 0*4+2=2."""
  1497. # h2d_client has snow=0xFF00FF (unloaded) by default, so snow path skips
  1498. h2d_client._process_message(_ams_payload(2))
  1499. # AMS 0 is the only AMS on extruder 0 (right, active by default)
  1500. # Fallback: single AMS → global = 0*4+2 = 2
  1501. assert h2d_client.state.tray_now == 2
  1502. def test_multiple_ams_keeps_current_if_valid(self, h2d_client):
  1503. """Current tray matches slot → keeps it (multi-AMS on same extruder)."""
  1504. # Set up: two AMS units on the same extruder (right, ext 0)
  1505. h2d_client.state.ams_extruder_map = {"0": 0, "1": 0}
  1506. # Pre-set tray_now=5 (AMS 1 slot 1) — current_ams=1 which is in ams_on_extruder
  1507. h2d_client.state.tray_now = 5
  1508. # tray_now="1" → 5%4=1 matches → keep current=5
  1509. h2d_client._process_message(_ams_payload(1))
  1510. assert h2d_client.state.tray_now == 5
  1511. def test_no_ams_on_extruder_uses_raw_slot(self, h2d_client):
  1512. """No AMS mapped to the active extruder → raw slot as global ID."""
  1513. # All AMS on left extruder, but right is active
  1514. h2d_client.state.ams_extruder_map = {"0": 1, "128": 1}
  1515. h2d_client._process_message(_ams_payload(2))
  1516. assert h2d_client.state.tray_now == 2
  1517. def test_single_ams_ht_on_extruder_returns_unit_id(self, h2d_client):
  1518. """AMS-HT 128 alone on left extruder, slot 0 → global ID 128 (not 512)."""
  1519. # Switch to left extruder (where AMS-HT 128 is mapped)
  1520. h2d_client._process_message(_extruder_state_payload(0x0100))
  1521. # Only AMS-HT 128 on left extruder; no snow available
  1522. h2d_client._process_message(_ams_payload(0))
  1523. assert h2d_client.state.tray_now == 128
  1524. def test_single_ams_ht_ignores_nonzero_slot(self, h2d_client):
  1525. """AMS-HT has single slot; even if printer reports slot 1, global ID = unit ID."""
  1526. h2d_client.state.ams_extruder_map = {"129": 0}
  1527. h2d_client._process_message(_ams_payload(1))
  1528. # AMS-HT 129: global ID = 129, not 129*4+1=517
  1529. assert h2d_client.state.tray_now == 129
  1530. def test_multiple_ams_keeps_current_ams_ht(self, h2d_client):
  1531. """Current tray is AMS-HT 128, slot 0 reported → keeps 128."""
  1532. h2d_client.state.ams_extruder_map = {"0": 0, "128": 0}
  1533. h2d_client.state.tray_now = 128
  1534. h2d_client._process_message(_ams_payload(0))
  1535. assert h2d_client.state.tray_now == 128
  1536. def test_multiple_ams_slot_nonzero_excludes_ams_ht(self, h2d_client):
  1537. """Slot > 0 eliminates AMS-HT candidates; single regular AMS left → resolves."""
  1538. # AMS 0 + AMS-HT 128 both on right extruder
  1539. h2d_client.state.ams_extruder_map = {"0": 0, "128": 0}
  1540. h2d_client.state.tray_now = 255 # no current match
  1541. # Slot 2 → can't be AMS-HT → only AMS 0 → global = 0*4+2 = 2
  1542. h2d_client._process_message(_ams_payload(2))
  1543. assert h2d_client.state.tray_now == 2
  1544. def test_multiple_ams_slot_nonzero_narrows_to_single_ht_excluded(self, h2d_client):
  1545. """Two regular AMS + one AMS-HT, slot > 0 → AMS-HT excluded but still ambiguous."""
  1546. h2d_client.state.ams_extruder_map = {"0": 0, "1": 0, "128": 0}
  1547. h2d_client.state.tray_now = 255
  1548. # Slot 3 → excludes AMS-HT, but AMS 0 and AMS 1 both remain → ambiguous
  1549. h2d_client._process_message(_ams_payload(3))
  1550. assert h2d_client.state.tray_now == 3 # raw slot fallback
  1551. # ---------------------------------------------------------------------------
  1552. # 6b. H2D last_loaded_tray validation
  1553. # ---------------------------------------------------------------------------
  1554. class TestLastLoadedTrayValidation(_H2DFixtureMixin):
  1555. """last_loaded_tray only stores physically valid tray IDs."""
  1556. def test_regular_ams_tray_stored(self, h2d_client):
  1557. """Valid regular AMS tray (0-15) → stored in last_loaded_tray."""
  1558. h2d_client.state.tray_now = 7
  1559. # Trigger tray_now processing via AMS message
  1560. h2d_client._process_message(
  1561. _extruder_info_payload(
  1562. [
  1563. {"id": 0, "snow": 1 << 8 | 3}, # AMS 1 slot 3 → global 7
  1564. {"id": 1, "snow": 0xFF00FF},
  1565. ]
  1566. )
  1567. )
  1568. h2d_client._process_message(_ams_payload(3))
  1569. assert h2d_client.state.tray_now == 7
  1570. assert h2d_client.state.last_loaded_tray == 7
  1571. def test_ams_ht_tray_stored(self, h2d_client):
  1572. """Valid AMS-HT tray (128-135) → stored in last_loaded_tray."""
  1573. h2d_client._process_message(_extruder_state_payload(0x0100))
  1574. h2d_client._process_message(
  1575. _extruder_info_payload(
  1576. [
  1577. {"id": 0, "snow": 0xFF00FF},
  1578. {"id": 1, "snow": 128 << 8 | 0},
  1579. ]
  1580. )
  1581. )
  1582. h2d_client._process_message(_ams_payload(0))
  1583. assert h2d_client.state.tray_now == 128
  1584. assert h2d_client.state.last_loaded_tray == 128
  1585. def test_unloaded_not_stored(self, h2d_client):
  1586. """tray_now=255 (unloaded) → last_loaded_tray unchanged."""
  1587. h2d_client.state.last_loaded_tray = 5
  1588. h2d_client._process_message(_ams_payload(255))
  1589. assert h2d_client.state.tray_now == 255
  1590. assert h2d_client.state.last_loaded_tray == 5
  1591. # ---------------------------------------------------------------------------
  1592. # 7. H2D Active extruder switching
  1593. # ---------------------------------------------------------------------------
  1594. class TestTrayNowDualNozzleH2DActiveExtruder(_H2DFixtureMixin):
  1595. """Active extruder switching via device.extruder.state bit 8."""
  1596. def test_active_extruder_right_by_default(self, h2d_client):
  1597. """Initial state.active_extruder == 0 (right)."""
  1598. assert h2d_client.state.active_extruder == 0
  1599. def test_extruder_state_bit8_switches_to_left(self, h2d_client):
  1600. """state=0x100 → active_extruder=1 (left)."""
  1601. h2d_client._process_message(_extruder_state_payload(0x0100))
  1602. assert h2d_client.state.active_extruder == 1
  1603. def test_extruder_state_bit8_switches_back_to_right(self, h2d_client):
  1604. """Cycle 0 → 1 → 0."""
  1605. h2d_client._process_message(_extruder_state_payload(0x0100))
  1606. assert h2d_client.state.active_extruder == 1
  1607. h2d_client._process_message(_extruder_state_payload(0x0001))
  1608. assert h2d_client.state.active_extruder == 0
  1609. def test_extruder_switch_changes_tray_disambiguation(self, h2d_client):
  1610. """Snow on both extruders; switching active changes which snow is used."""
  1611. # Snow: ext 0 → AMS 0 slot 1 (global 1), ext 1 → AMS 128 slot 0 (global 128)
  1612. h2d_client._process_message(
  1613. _extruder_info_payload(
  1614. [
  1615. {"id": 0, "snow": 0 << 8 | 1}, # AMS 0 slot 1 → global 1
  1616. {"id": 1, "snow": 128 << 8 | 0}, # AMS HT → global 128
  1617. ]
  1618. )
  1619. )
  1620. # Right active (default) — tray_now="1" → snow ext[0] says global 1
  1621. h2d_client._process_message(_ams_payload(1))
  1622. assert h2d_client.state.tray_now == 1
  1623. # Switch to left
  1624. h2d_client._process_message(_extruder_state_payload(0x0100))
  1625. # Left active — tray_now="0" → snow ext[1] says AMS HT (128), slot 0 matches
  1626. h2d_client._process_message(_ams_payload(0))
  1627. assert h2d_client.state.tray_now == 128
  1628. # ---------------------------------------------------------------------------
  1629. # 8. H2D Full multi-message sequences
  1630. # ---------------------------------------------------------------------------
  1631. class TestTrayNowDualNozzleH2DFullSequence(_H2DFixtureMixin):
  1632. """Multi-message sequences simulating real H2D Pro prints."""
  1633. def test_h2d_right_nozzle_ams0_lifecycle(self, h2d_client):
  1634. """Setup → load AMS 0 slot 1 → verify tray_now=1."""
  1635. # Snow update: extruder 0 loading AMS 0 slot 1
  1636. h2d_client._process_message(
  1637. _extruder_info_payload(
  1638. [
  1639. {"id": 0, "snow": 0 << 8 | 1},
  1640. {"id": 1, "snow": 0xFF00FF},
  1641. ]
  1642. )
  1643. )
  1644. # Printer reports tray_now="1"
  1645. h2d_client._process_message(_ams_payload(1))
  1646. assert h2d_client.state.tray_now == 1
  1647. assert h2d_client.state.last_loaded_tray == 1
  1648. def test_h2d_left_nozzle_ams_ht_lifecycle(self, h2d_client):
  1649. """Setup → switch left → load AMS HT → verify tray_now=128."""
  1650. # Switch to left extruder
  1651. h2d_client._process_message(_extruder_state_payload(0x0100))
  1652. # Snow: ext 1 → AMS HT slot 0
  1653. h2d_client._process_message(
  1654. _extruder_info_payload(
  1655. [
  1656. {"id": 0, "snow": 0xFF00FF},
  1657. {"id": 1, "snow": 128 << 8 | 0},
  1658. ]
  1659. )
  1660. )
  1661. # Printer reports tray_now="0" (AMS HT single slot)
  1662. h2d_client._process_message(_ams_payload(0))
  1663. assert h2d_client.state.tray_now == 128
  1664. assert h2d_client.state.last_loaded_tray == 128
  1665. def test_h2d_multi_color_alternating_nozzles(self, h2d_client):
  1666. """Multi-color print alternating between right and left nozzles.
  1667. Sequence:
  1668. 1. Right loads AMS 0 slot 0 (tray=0)
  1669. 2. Switch left, load AMS HT (tray=128)
  1670. 3. Switch right, snow updates, load AMS 0 slot 2 (tray=2)
  1671. 4. Unload (255)
  1672. """
  1673. # Step 1: Right extruder loads AMS 0 slot 0
  1674. h2d_client._process_message(
  1675. _extruder_info_payload(
  1676. [
  1677. {"id": 0, "snow": 0 << 8 | 0},
  1678. {"id": 1, "snow": 0xFF00FF},
  1679. ]
  1680. )
  1681. )
  1682. h2d_client._process_message(_ams_payload(0))
  1683. assert h2d_client.state.tray_now == 0
  1684. # Step 2: Switch to left, load AMS HT
  1685. h2d_client._process_message(_extruder_state_payload(0x0100))
  1686. h2d_client._process_message(
  1687. _extruder_info_payload(
  1688. [
  1689. {"id": 0, "snow": 0 << 8 | 0},
  1690. {"id": 1, "snow": 128 << 8 | 0},
  1691. ]
  1692. )
  1693. )
  1694. h2d_client._process_message(_ams_payload(0))
  1695. assert h2d_client.state.tray_now == 128
  1696. # Step 3: Switch back to right, load AMS 0 slot 2
  1697. h2d_client._process_message(_extruder_state_payload(0x0001))
  1698. h2d_client._process_message(
  1699. _extruder_info_payload(
  1700. [
  1701. {"id": 0, "snow": 0 << 8 | 2},
  1702. {"id": 1, "snow": 128 << 8 | 0},
  1703. ]
  1704. )
  1705. )
  1706. h2d_client._process_message(_ams_payload(2))
  1707. assert h2d_client.state.tray_now == 2
  1708. # Step 4: Unload
  1709. h2d_client._process_message(_ams_payload(255))
  1710. assert h2d_client.state.tray_now == 255
  1711. assert h2d_client.state.last_loaded_tray == 2
  1712. class TestTrayChangeLog:
  1713. """Tests for tray_change_log tracking during prints (mid-print tray switch)."""
  1714. @pytest.fixture
  1715. def mqtt_client(self):
  1716. """Create a BambuMQTTClient instance for testing."""
  1717. from backend.app.services.bambu_mqtt import BambuMQTTClient
  1718. client = BambuMQTTClient(
  1719. ip_address="192.168.1.100",
  1720. serial_number="TRAYLOG1",
  1721. access_code="12345678",
  1722. )
  1723. return client
  1724. def test_tray_change_log_defaults_empty(self, mqtt_client):
  1725. """tray_change_log starts as an empty list."""
  1726. assert mqtt_client.state.tray_change_log == []
  1727. def test_tray_change_log_seeded_on_print_start(self, mqtt_client):
  1728. """Print start clears log and seeds with initial tray at layer 0."""
  1729. mqtt_client.state.tray_now = 2
  1730. mqtt_client.state.last_loaded_tray = 2
  1731. mqtt_client._previous_gcode_state = "IDLE"
  1732. # Transition to RUNNING via _process_message
  1733. mqtt_client._process_message(
  1734. {
  1735. "print": {
  1736. "gcode_state": "RUNNING",
  1737. "gcode_file": "test.3mf",
  1738. }
  1739. }
  1740. )
  1741. assert mqtt_client.state.tray_change_log == [(2, 0)]
  1742. def test_tray_change_log_cleared_on_new_print(self, mqtt_client):
  1743. """Old log entries are cleared when a new print starts."""
  1744. mqtt_client.state.tray_change_log = [(5, 0), (3, 100)]
  1745. mqtt_client.state.tray_now = 1
  1746. mqtt_client.state.last_loaded_tray = 1
  1747. mqtt_client._previous_gcode_state = "IDLE"
  1748. mqtt_client._process_message(
  1749. {
  1750. "print": {
  1751. "gcode_state": "RUNNING",
  1752. "gcode_file": "new.3mf",
  1753. }
  1754. }
  1755. )
  1756. assert mqtt_client.state.tray_change_log == [(1, 0)]
  1757. def test_tray_change_recorded_during_running(self, mqtt_client):
  1758. """Tray change while RUNNING is appended to the log."""
  1759. mqtt_client.state.state = "RUNNING"
  1760. mqtt_client.state.layer_num = 50
  1761. mqtt_client.state.last_loaded_tray = 0
  1762. mqtt_client.state.tray_change_log = [(0, 0)]
  1763. # Simulate tray_now update via AMS data
  1764. mqtt_client.state.tray_now = 1
  1765. # Trigger the tracking code path
  1766. tn = mqtt_client.state.tray_now
  1767. if tn != mqtt_client.state.last_loaded_tray and mqtt_client.state.state in ("RUNNING", "PAUSE"):
  1768. mqtt_client.state.tray_change_log.append((tn, mqtt_client.state.layer_num))
  1769. mqtt_client.state.last_loaded_tray = tn
  1770. assert mqtt_client.state.tray_change_log == [(0, 0), (1, 50)]
  1771. def test_tray_change_not_recorded_when_idle(self, mqtt_client):
  1772. """Tray changes while IDLE are NOT logged."""
  1773. mqtt_client.state.state = "IDLE"
  1774. mqtt_client.state.layer_num = 0
  1775. mqtt_client.state.last_loaded_tray = 0
  1776. mqtt_client.state.tray_change_log = []
  1777. mqtt_client.state.tray_now = 3
  1778. tn = mqtt_client.state.tray_now
  1779. if tn != mqtt_client.state.last_loaded_tray and mqtt_client.state.state in ("RUNNING", "PAUSE"):
  1780. mqtt_client.state.tray_change_log.append((tn, mqtt_client.state.layer_num))
  1781. mqtt_client.state.last_loaded_tray = tn
  1782. assert mqtt_client.state.tray_change_log == []
  1783. def test_tray_change_recorded_during_pause(self, mqtt_client):
  1784. """Tray change while PAUSE is also logged (AMS can swap during pause)."""
  1785. mqtt_client.state.state = "PAUSE"
  1786. mqtt_client.state.layer_num = 75
  1787. mqtt_client.state.last_loaded_tray = 2
  1788. mqtt_client.state.tray_change_log = [(2, 0)]
  1789. mqtt_client.state.tray_now = 5
  1790. tn = mqtt_client.state.tray_now
  1791. if tn != mqtt_client.state.last_loaded_tray and mqtt_client.state.state in ("RUNNING", "PAUSE"):
  1792. mqtt_client.state.tray_change_log.append((tn, mqtt_client.state.layer_num))
  1793. mqtt_client.state.last_loaded_tray = tn
  1794. assert mqtt_client.state.tray_change_log == [(2, 0), (5, 75)]
  1795. def test_same_tray_not_logged_twice(self, mqtt_client):
  1796. """Same tray value doesn't create duplicate log entries."""
  1797. mqtt_client.state.state = "RUNNING"
  1798. mqtt_client.state.layer_num = 30
  1799. mqtt_client.state.last_loaded_tray = 2
  1800. mqtt_client.state.tray_change_log = [(2, 0)]
  1801. # Same tray again
  1802. mqtt_client.state.tray_now = 2
  1803. tn = mqtt_client.state.tray_now
  1804. if tn != mqtt_client.state.last_loaded_tray and mqtt_client.state.state in ("RUNNING", "PAUSE"):
  1805. mqtt_client.state.tray_change_log.append((tn, mqtt_client.state.layer_num))
  1806. mqtt_client.state.last_loaded_tray = tn
  1807. assert mqtt_client.state.tray_change_log == [(2, 0)]
  1808. def test_multiple_tray_changes(self, mqtt_client):
  1809. """Multiple tray changes create a full history."""
  1810. mqtt_client.state.state = "RUNNING"
  1811. mqtt_client.state.last_loaded_tray = 0
  1812. mqtt_client.state.tray_change_log = [(0, 0)]
  1813. changes = [(1, 50), (3, 120), (0, 200)]
  1814. for tray, layer in changes:
  1815. mqtt_client.state.tray_now = tray
  1816. mqtt_client.state.layer_num = layer
  1817. tn = mqtt_client.state.tray_now
  1818. if tn != mqtt_client.state.last_loaded_tray and mqtt_client.state.state in ("RUNNING", "PAUSE"):
  1819. mqtt_client.state.tray_change_log.append((tn, mqtt_client.state.layer_num))
  1820. mqtt_client.state.last_loaded_tray = tn
  1821. assert mqtt_client.state.tray_change_log == [(0, 0), (1, 50), (3, 120), (0, 200)]