| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640 |
- """Unit tests for usage_tracker.py — 3MF-primary filament tracking.
- Tests the unified tracking logic: 3MF slicer estimates as primary path,
- AMS remain% delta as fallback, per-layer gcode for partial prints,
- slot-to-tray mapping resolution, and notification variable formatting.
- """
- from datetime import datetime, timezone
- from types import SimpleNamespace
- from unittest.mock import AsyncMock, MagicMock, patch
- import pytest
- from backend.app.services.usage_tracker import (
- PrintSession,
- _active_sessions,
- _decode_mqtt_mapping,
- _match_slots_by_color,
- _track_from_3mf,
- on_print_complete,
- on_print_start,
- )
- def _make_spool(spool_id=1, label_weight=1000, weight_used=0, tag_uid=None, tray_uuid=None):
- """Create a mock Spool object."""
- spool = MagicMock()
- spool.id = spool_id
- spool.label_weight = label_weight
- spool.weight_used = weight_used
- spool.tag_uid = tag_uid
- spool.tray_uuid = tray_uuid
- spool.last_used = None
- spool.cost_per_kg = None
- spool.material = "PLA"
- return spool
- def _make_assignment(spool_id=1, printer_id=1, ams_id=0, tray_id=0):
- """Create a mock SpoolAssignment object."""
- assignment = MagicMock()
- assignment.spool_id = spool_id
- assignment.printer_id = printer_id
- assignment.ams_id = ams_id
- assignment.tray_id = tray_id
- return assignment
- def _make_archive(archive_id=1, file_path="archives/1/test.3mf", extra_data=None):
- """Create a mock PrintArchive object."""
- archive = MagicMock()
- archive.id = archive_id
- archive.file_path = file_path
- archive.extra_data = extra_data
- return archive
- def _make_queue_item(ams_mapping=None, status="printing"):
- """Create a mock PrintQueueItem object."""
- item = MagicMock()
- item.ams_mapping = ams_mapping
- item.status = status
- return item
- def _mock_db_execute(*return_values):
- """Create a mock db with execute() that returns values in sequence."""
- db = AsyncMock()
- results = []
- for val in return_values:
- result = MagicMock()
- result.scalar_one_or_none.return_value = val
- results.append(result)
- db.execute = AsyncMock(side_effect=results)
- return db
- def _mock_db_sequential(responses):
- """Create mock db that returns responses in order."""
- db = AsyncMock()
- call_count = [0]
- async def mock_execute(*args, **kwargs):
- idx = call_count[0]
- call_count[0] += 1
- result = MagicMock()
- if idx < len(responses):
- result.scalar_one_or_none.return_value = responses[idx]
- else:
- result.scalar_one_or_none.return_value = None
- # For cost aggregation queries that use .scalar() instead of .scalar_one_or_none()
- result.scalar.return_value = None
- return result
- db.execute = mock_execute
- return db
- class TestOnPrintStart:
- """Tests for on_print_start()."""
- @pytest.fixture(autouse=True)
- def _clear_sessions(self):
- _active_sessions.clear()
- yield
- _active_sessions.clear()
- @pytest.mark.asyncio
- async def test_captures_remain_data(self):
- """Captures AMS remain% at print start."""
- printer_manager = MagicMock()
- printer_manager.get_status.return_value = SimpleNamespace(
- raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 80}, {"id": 1, "remain": 50}]}]},
- tray_now=5,
- )
- await on_print_start(1, {"subtask_name": "Benchy"}, printer_manager)
- assert 1 in _active_sessions
- session = _active_sessions[1]
- assert session.print_name == "Benchy"
- assert session.tray_remain_start == {(0, 0): 80, (0, 1): 50}
- @pytest.mark.asyncio
- async def test_captures_tray_now_at_start(self):
- """Captures tray_now at print start for later use in usage tracking."""
- printer_manager = MagicMock()
- printer_manager.get_status.return_value = SimpleNamespace(
- raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 80}]}]},
- tray_now=9,
- )
- await on_print_start(1, {"subtask_name": "Test"}, printer_manager)
- assert _active_sessions[1].tray_now_at_start == 9
- @pytest.mark.asyncio
- async def test_tray_now_at_start_255_when_unloaded(self):
- """Captures tray_now=255 when printer has no filament loaded at start."""
- printer_manager = MagicMock()
- printer_manager.get_status.return_value = SimpleNamespace(
- raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 80}]}]},
- tray_now=255,
- )
- await on_print_start(1, {"subtask_name": "Test"}, printer_manager)
- assert _active_sessions[1].tray_now_at_start == 255
- @pytest.mark.asyncio
- async def test_creates_session_without_remain(self):
- """Creates session even without valid remain data (for 3MF tracking)."""
- printer_manager = MagicMock()
- printer_manager.get_status.return_value = SimpleNamespace(
- raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": -1}]}]},
- tray_now=255,
- )
- await on_print_start(1, {"subtask_name": "Test"}, printer_manager)
- assert 1 in _active_sessions
- assert _active_sessions[1].tray_remain_start == {}
- class TestOnPrintComplete:
- """Tests for on_print_complete() — path ordering and interaction."""
- @pytest.fixture(autouse=True)
- def _clear_sessions(self):
- _active_sessions.clear()
- yield
- _active_sessions.clear()
- @pytest.fixture(autouse=True)
- def _mock_get_setting(self):
- with patch(
- "backend.app.api.routes.settings.get_setting",
- new_callable=AsyncMock,
- return_value=None,
- ):
- yield
- @pytest.mark.asyncio
- async def test_bl_spool_uses_3mf(self):
- """BL spool (with tag_uid) is tracked via 3MF, not just AMS delta."""
- spool = _make_spool(spool_id=1, tag_uid="AABB1122", label_weight=1000)
- assignment = _make_assignment(spool_id=1, printer_id=1, ams_id=0, tray_id=0)
- archive = _make_archive(archive_id=10)
- # Setup: session with AMS remain data
- _active_sessions[1] = PrintSession(
- printer_id=1,
- print_name="Benchy",
- started_at=datetime.now(timezone.utc),
- tray_remain_start={(0, 0): 80},
- )
- # Mock printer state: tray_now=0 (AMS0-T0), single filament
- printer_manager = MagicMock()
- printer_manager.get_status.return_value = SimpleNamespace(
- raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
- progress=100,
- layer_num=50,
- tray_now=0,
- )
- # db returns: archive, queue_item(None), assignment, spool
- db = _mock_db_sequential([archive, None, assignment, spool])
- filament_usage = [{"slot_id": 1, "used_g": 15.0, "type": "PLA", "color": "#FF0000"}]
- with (
- patch("backend.app.core.config.settings") as mock_settings,
- patch(
- "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
- return_value=filament_usage,
- ),
- ):
- mock_settings.base_dir = MagicMock()
- mock_path = MagicMock()
- mock_path.exists.return_value = True
- mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
- results = await on_print_complete(
- printer_id=1,
- data={"status": "completed"},
- printer_manager=printer_manager,
- db=db,
- archive_id=10,
- )
- # 3MF path should handle it (BL guard removed)
- assert len(results) >= 1
- assert results[0]["spool_id"] == 1
- assert results[0]["weight_used"] == 15.0
- @pytest.mark.asyncio
- async def test_ams_delta_fallback_no_archive(self):
- """AMS delta tracks consumption when archive_id is None."""
- spool = _make_spool(spool_id=2, label_weight=1000)
- assignment = _make_assignment(spool_id=2)
- _active_sessions[1] = PrintSession(
- printer_id=1,
- print_name="Test",
- started_at=datetime.now(timezone.utc),
- tray_remain_start={(0, 0): 80},
- )
- printer_manager = MagicMock()
- printer_manager.get_status.return_value = SimpleNamespace(
- raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
- tray_now=0,
- last_loaded_tray=-1,
- )
- # db returns assignment then spool
- db = _mock_db_sequential([assignment, spool])
- results = await on_print_complete(
- printer_id=1,
- data={"status": "completed"},
- printer_manager=printer_manager,
- db=db,
- archive_id=None,
- )
- assert len(results) == 1
- assert results[0]["spool_id"] == 2
- # 10% of 1000g = 100g
- assert results[0]["weight_used"] == 100.0
- assert results[0]["percent_used"] == 10
- @pytest.mark.asyncio
- async def test_no_double_tracking(self):
- """When 3MF handles a tray, AMS delta skips it."""
- spool = _make_spool(spool_id=1, label_weight=1000)
- assignment = _make_assignment(spool_id=1)
- archive = _make_archive(archive_id=10)
- _active_sessions[1] = PrintSession(
- printer_id=1,
- print_name="Benchy",
- started_at=datetime.now(timezone.utc),
- tray_remain_start={(0, 0): 80},
- )
- # tray_now=0 matches the single filament slot
- printer_manager = MagicMock()
- printer_manager.get_status.return_value = SimpleNamespace(
- raw_data={"ams": [{"id": 0, "tray": [{"id": 0, "remain": 70}]}]},
- progress=100,
- layer_num=50,
- tray_now=0,
- )
- # db returns: archive, queue_item(None), assignment, spool
- db = _mock_db_sequential([archive, None, assignment, spool])
- filament_usage = [{"slot_id": 1, "used_g": 15.0, "type": "PLA", "color": "#FF0000"}]
- with (
- patch("backend.app.core.config.settings") as mock_settings,
- patch(
- "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
- return_value=filament_usage,
- ),
- ):
- mock_settings.base_dir = MagicMock()
- mock_path = MagicMock()
- mock_path.exists.return_value = True
- mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
- results = await on_print_complete(
- printer_id=1,
- data={"status": "completed"},
- printer_manager=printer_manager,
- db=db,
- archive_id=10,
- )
- # Only 1 result (3MF), NOT 2 (3MF + AMS delta)
- assert len(results) == 1
- assert results[0]["weight_used"] == 15.0
- class TestTrackFrom3mf:
- """Tests for _track_from_3mf() — per-layer, linear scaling, and slot mapping."""
- @pytest.mark.asyncio
- async def test_linear_fallback_for_partial_print(self):
- """Falls back to linear scaling when gcode layer data unavailable."""
- spool = _make_spool(spool_id=1, label_weight=1000)
- assignment = _make_assignment(spool_id=1)
- archive = _make_archive(archive_id=10)
- # db: archive, queue_item(None), assignment, spool
- db = _mock_db_sequential([archive, None, assignment, spool])
- printer_manager = MagicMock()
- printer_manager.get_status.return_value = SimpleNamespace(
- progress=50,
- layer_num=25,
- tray_now=0,
- )
- filament_usage = [{"slot_id": 1, "used_g": 20.0, "type": "PLA", "color": ""}]
- handled_trays: set[tuple[int, int]] = set()
- with (
- patch("backend.app.core.config.settings") as mock_settings,
- patch(
- "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
- return_value=filament_usage,
- ),
- patch(
- "backend.app.utils.threemf_tools.extract_layer_filament_usage_from_3mf",
- return_value=None, # No layer data available
- ),
- ):
- mock_settings.base_dir = MagicMock()
- mock_path = MagicMock()
- mock_path.exists.return_value = True
- mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
- results = await _track_from_3mf(
- printer_id=1,
- archive_id=10,
- status="failed",
- print_name="Benchy",
- handled_trays=handled_trays,
- printer_manager=printer_manager,
- db=db,
- )
- assert len(results) == 1
- # 50% of 20g = 10g
- assert results[0]["weight_used"] == 10.0
- # Tray should be marked as handled
- assert (0, 0) in handled_trays
- @pytest.mark.asyncio
- async def test_per_layer_partial_print(self):
- """Failed print at layer N uses gcode cumulative data."""
- spool = _make_spool(spool_id=1, label_weight=1000)
- assignment = _make_assignment(spool_id=1)
- archive = _make_archive(archive_id=10)
- # db: archive, queue_item(None), assignment, spool
- db = _mock_db_sequential([archive, None, assignment, spool])
- printer_manager = MagicMock()
- printer_manager.get_status.return_value = SimpleNamespace(
- progress=50,
- layer_num=25,
- tray_now=0,
- )
- filament_usage = [{"slot_id": 1, "used_g": 20.0, "type": "PLA", "color": ""}]
- # Per-layer data: at layer 25, filament 0 used 5000mm
- layer_data = {10: {0: 2000.0}, 25: {0: 5000.0}, 50: {0: 10000.0}}
- filament_props = {1: {"density": 1.24, "diameter": 1.75}}
- handled_trays: set[tuple[int, int]] = set()
- with (
- patch("backend.app.core.config.settings") as mock_settings,
- patch(
- "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
- return_value=filament_usage,
- ),
- patch(
- "backend.app.utils.threemf_tools.extract_layer_filament_usage_from_3mf",
- return_value=layer_data,
- ),
- patch(
- "backend.app.utils.threemf_tools.get_cumulative_usage_at_layer",
- return_value={0: 5000.0},
- ),
- patch(
- "backend.app.utils.threemf_tools.extract_filament_properties_from_3mf",
- return_value=filament_props,
- ),
- patch(
- "backend.app.utils.threemf_tools.mm_to_grams",
- return_value=12.0, # 5000mm at 1.75mm/1.24g/cm3
- ),
- ):
- mock_settings.base_dir = MagicMock()
- mock_path = MagicMock()
- mock_path.exists.return_value = True
- mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
- results = await _track_from_3mf(
- printer_id=1,
- archive_id=10,
- status="failed",
- print_name="Benchy",
- handled_trays=handled_trays,
- printer_manager=printer_manager,
- db=db,
- )
- assert len(results) == 1
- # Should use per-layer grams (12.0g), not linear scale (10.0g)
- assert results[0]["weight_used"] == 12.0
- @pytest.mark.asyncio
- async def test_completed_print_uses_full_weight(self):
- """Completed print uses full 3MF weight (scale=1.0)."""
- spool = _make_spool(spool_id=1, label_weight=1000)
- assignment = _make_assignment(spool_id=1)
- archive = _make_archive(archive_id=10)
- # db: archive, queue_item(None), assignment, spool
- db = _mock_db_sequential([archive, None, assignment, spool])
- printer_manager = MagicMock()
- printer_manager.get_status.return_value = SimpleNamespace(
- progress=100,
- layer_num=50,
- tray_now=0,
- )
- filament_usage = [{"slot_id": 1, "used_g": 20.0, "type": "PLA", "color": ""}]
- handled_trays: set[tuple[int, int]] = set()
- with (
- patch("backend.app.core.config.settings") as mock_settings,
- patch(
- "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
- return_value=filament_usage,
- ),
- ):
- mock_settings.base_dir = MagicMock()
- mock_path = MagicMock()
- mock_path.exists.return_value = True
- mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
- results = await _track_from_3mf(
- printer_id=1,
- archive_id=10,
- status="completed",
- print_name="Benchy",
- handled_trays=handled_trays,
- printer_manager=printer_manager,
- db=db,
- )
- assert len(results) == 1
- assert results[0]["weight_used"] == 20.0
- @pytest.mark.asyncio
- async def test_tray_now_override_for_single_filament(self):
- """Single-filament non-queue print uses tray_now instead of slot_id mapping."""
- # Spool 2 is at AMS1-T3 (global_tray_id=7)
- spool = _make_spool(spool_id=2, label_weight=1000)
- assignment = _make_assignment(spool_id=2, ams_id=1, tray_id=3)
- archive = _make_archive(archive_id=10)
- # db: archive, queue_item(None), assignment, spool
- db = _mock_db_sequential([archive, None, assignment, spool])
- # tray_now=7 = (ams_id=1, tray_id=3), the ACTUAL tray used
- printer_manager = MagicMock()
- printer_manager.get_status.return_value = SimpleNamespace(
- progress=100,
- layer_num=50,
- tray_now=7,
- )
- # 3MF has slot_id=12 (would default-map to ams_id=2, tray_id=3 — WRONG)
- filament_usage = [{"slot_id": 12, "used_g": 10.6, "type": "PLA", "color": "#FF0000"}]
- handled_trays: set[tuple[int, int]] = set()
- with (
- patch("backend.app.core.config.settings") as mock_settings,
- patch(
- "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
- return_value=filament_usage,
- ),
- ):
- mock_settings.base_dir = MagicMock()
- mock_path = MagicMock()
- mock_path.exists.return_value = True
- mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
- results = await _track_from_3mf(
- printer_id=1,
- archive_id=10,
- status="completed",
- print_name="Test",
- handled_trays=handled_trays,
- printer_manager=printer_manager,
- db=db,
- )
- assert len(results) == 1
- assert results[0]["spool_id"] == 2
- assert results[0]["ams_id"] == 1
- assert results[0]["tray_id"] == 3
- assert results[0]["weight_used"] == 10.6
- assert (1, 3) in handled_trays
- @pytest.mark.asyncio
- async def test_queue_ams_mapping_overrides_default(self):
- """Queue item ams_mapping overrides default slot_id mapping."""
- # Spool at AMS1-T3 (global_tray_id=7)
- spool = _make_spool(spool_id=5, label_weight=1000)
- assignment = _make_assignment(spool_id=5, ams_id=1, tray_id=3)
- archive = _make_archive(archive_id=20)
- # Queue item maps slot 1 → global tray 7 (ams_id=1, tray_id=3)
- queue_item = _make_queue_item(ams_mapping="[7, -1, -1, -1]")
- # db: archive, queue_item, assignment, spool
- db = _mock_db_sequential([archive, queue_item, assignment, spool])
- printer_manager = MagicMock()
- printer_manager.get_status.return_value = SimpleNamespace(
- progress=100,
- layer_num=50,
- tray_now=7,
- )
- filament_usage = [{"slot_id": 1, "used_g": 25.0, "type": "PETG", "color": ""}]
- handled_trays: set[tuple[int, int]] = set()
- with (
- patch("backend.app.core.config.settings") as mock_settings,
- patch(
- "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
- return_value=filament_usage,
- ),
- ):
- mock_settings.base_dir = MagicMock()
- mock_path = MagicMock()
- mock_path.exists.return_value = True
- mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
- results = await _track_from_3mf(
- printer_id=1,
- archive_id=20,
- status="completed",
- print_name="Queue Print",
- handled_trays=handled_trays,
- printer_manager=printer_manager,
- db=db,
- )
- assert len(results) == 1
- assert results[0]["spool_id"] == 5
- assert results[0]["ams_id"] == 1
- assert results[0]["tray_id"] == 3
- assert results[0]["weight_used"] == 25.0
- @pytest.mark.asyncio
- async def test_multi_filament_uses_queue_mapping(self):
- """Multi-filament queue prints use ams_mapping for each slot."""
- spool_a = _make_spool(spool_id=1, label_weight=1000)
- spool_b = _make_spool(spool_id=2, label_weight=1000)
- assign_a = _make_assignment(spool_id=1, ams_id=0, tray_id=0)
- assign_b = _make_assignment(spool_id=2, ams_id=1, tray_id=2)
- archive = _make_archive(archive_id=30)
- # slot 1 → tray 0 (AMS0-T0), slot 2 → tray 6 (AMS1-T2)
- queue_item = _make_queue_item(ams_mapping="[0, 6]")
- # db: archive, queue_item, assign_a, spool_a, assign_b, spool_b
- db = _mock_db_sequential([archive, queue_item, assign_a, spool_a, assign_b, spool_b])
- printer_manager = MagicMock()
- printer_manager.get_status.return_value = SimpleNamespace(
- progress=100,
- layer_num=50,
- tray_now=6,
- )
- filament_usage = [
- {"slot_id": 1, "used_g": 10.0, "type": "PLA", "color": ""},
- {"slot_id": 2, "used_g": 5.0, "type": "PETG", "color": ""},
- ]
- handled_trays: set[tuple[int, int]] = set()
- with (
- patch("backend.app.core.config.settings") as mock_settings,
- patch(
- "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
- return_value=filament_usage,
- ),
- ):
- mock_settings.base_dir = MagicMock()
- mock_path = MagicMock()
- mock_path.exists.return_value = True
- mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
- results = await _track_from_3mf(
- printer_id=1,
- archive_id=30,
- status="completed",
- print_name="Multi",
- handled_trays=handled_trays,
- printer_manager=printer_manager,
- db=db,
- )
- assert len(results) == 2
- assert results[0]["spool_id"] == 1
- assert results[0]["ams_id"] == 0
- assert results[0]["tray_id"] == 0
- assert results[0]["weight_used"] == 10.0
- assert results[1]["spool_id"] == 2
- assert results[1]["ams_id"] == 1
- assert results[1]["tray_id"] == 2
- assert results[1]["weight_used"] == 5.0
- @pytest.mark.asyncio
- async def test_no_tray_now_override_for_multi_filament(self):
- """Multi-filament non-queue prints fall back to default mapping, not tray_now."""
- spool = _make_spool(spool_id=1, label_weight=1000)
- assignment = _make_assignment(spool_id=1, ams_id=0, tray_id=0)
- archive = _make_archive(archive_id=10)
- # db: archive, queue_item(None), assignment, spool (2nd slot has no assignment)
- db = _mock_db_sequential([archive, None, assignment, spool, None])
- printer_manager = MagicMock()
- printer_manager.get_status.return_value = SimpleNamespace(
- progress=100,
- layer_num=50,
- tray_now=4, # tray_now won't be used
- )
- # Two filament slots with usage
- filament_usage = [
- {"slot_id": 1, "used_g": 10.0, "type": "PLA", "color": ""},
- {"slot_id": 2, "used_g": 5.0, "type": "PETG", "color": ""},
- ]
- handled_trays: set[tuple[int, int]] = set()
- with (
- patch("backend.app.core.config.settings") as mock_settings,
- patch(
- "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
- return_value=filament_usage,
- ),
- ):
- mock_settings.base_dir = MagicMock()
- mock_path = MagicMock()
- mock_path.exists.return_value = True
- mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
- results = await _track_from_3mf(
- printer_id=1,
- archive_id=10,
- status="completed",
- print_name="Test",
- handled_trays=handled_trays,
- printer_manager=printer_manager,
- db=db,
- )
- # Should use default mapping (slot 1 → tray 0, slot 2 → tray 1)
- assert len(results) == 1 # Only slot 1 has assignment
- assert results[0]["ams_id"] == 0
- assert results[0]["tray_id"] == 0
- @pytest.mark.asyncio
- async def test_stored_ams_mapping_overrides_all(self):
- """Stored ams_mapping from print command takes priority over queue and tray_now."""
- # Spool at AMS2-T1 (global_tray_id=9)
- spool = _make_spool(spool_id=10, label_weight=1000)
- assignment = _make_assignment(spool_id=10, ams_id=2, tray_id=1)
- archive = _make_archive(archive_id=50)
- # db: archive, assignment, spool (no queue lookup when ams_mapping provided)
- db = _mock_db_sequential([archive, assignment, spool])
- printer_manager = MagicMock()
- printer_manager.get_status.return_value = SimpleNamespace(
- progress=100,
- layer_num=50,
- tray_now=0, # Different from mapped tray — should be ignored
- last_loaded_tray=0,
- )
- filament_usage = [{"slot_id": 2, "used_g": 1.57, "type": "PLA", "color": "#FFFFFF"}]
- handled_trays: set[tuple[int, int]] = set()
- with (
- patch("backend.app.core.config.settings") as mock_settings,
- patch(
- "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
- return_value=filament_usage,
- ),
- ):
- mock_settings.base_dir = MagicMock()
- mock_path = MagicMock()
- mock_path.exists.return_value = True
- mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
- # ams_mapping: slot 2 (index 1) -> tray 9 (AMS2-T1)
- results = await _track_from_3mf(
- printer_id=1,
- archive_id=50,
- status="completed",
- print_name="Test",
- handled_trays=handled_trays,
- printer_manager=printer_manager,
- db=db,
- ams_mapping=[-1, 9],
- )
- assert len(results) == 1
- assert results[0]["spool_id"] == 10
- assert results[0]["ams_id"] == 2
- assert results[0]["tray_id"] == 1
- assert results[0]["weight_used"] == 1.6 # rounded
- @pytest.mark.asyncio
- async def test_last_loaded_tray_fallback(self):
- """Falls back to last_loaded_tray when tray_now_at_start and current tray_now are both 255."""
- # Spool at AMS2-T1 (global_tray_id=9)
- spool = _make_spool(spool_id=11, label_weight=1000)
- assignment = _make_assignment(spool_id=11, ams_id=2, tray_id=1)
- archive = _make_archive(archive_id=60)
- # db: archive, queue_item(None), assignment, spool
- db = _mock_db_sequential([archive, None, assignment, spool])
- # H2D scenario: tray_now=255 at completion, but last_loaded_tray=9
- printer_manager = MagicMock()
- printer_manager.get_status.return_value = SimpleNamespace(
- progress=100,
- layer_num=50,
- tray_now=255,
- last_loaded_tray=9,
- )
- filament_usage = [{"slot_id": 6, "used_g": 1.52, "type": "PLA", "color": "#7CC4D5"}]
- handled_trays: set[tuple[int, int]] = set()
- with (
- patch("backend.app.core.config.settings") as mock_settings,
- patch(
- "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
- return_value=filament_usage,
- ),
- ):
- mock_settings.base_dir = MagicMock()
- mock_path = MagicMock()
- mock_path.exists.return_value = True
- mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
- results = await _track_from_3mf(
- printer_id=1,
- archive_id=60,
- status="completed",
- print_name="Cube",
- handled_trays=handled_trays,
- printer_manager=printer_manager,
- db=db,
- tray_now_at_start=255, # H2D: 255 at start too
- )
- assert len(results) == 1
- assert results[0]["spool_id"] == 11
- assert results[0]["ams_id"] == 2
- assert results[0]["tray_id"] == 1
- @pytest.mark.asyncio
- async def test_tray_now_at_start_preferred_over_last_loaded(self):
- """tray_now_at_start is used before last_loaded_tray fallback."""
- spool = _make_spool(spool_id=3, label_weight=1000)
- assignment = _make_assignment(spool_id=3, ams_id=1, tray_id=1)
- archive = _make_archive(archive_id=70)
- db = _mock_db_sequential([archive, None, assignment, spool])
- # tray_now_at_start=5 (valid), last_loaded_tray=9 (different) — should use 5
- printer_manager = MagicMock()
- printer_manager.get_status.return_value = SimpleNamespace(
- progress=100,
- layer_num=50,
- tray_now=255,
- last_loaded_tray=9,
- )
- filament_usage = [{"slot_id": 1, "used_g": 5.0, "type": "PLA", "color": ""}]
- handled_trays: set[tuple[int, int]] = set()
- with (
- patch("backend.app.core.config.settings") as mock_settings,
- patch(
- "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
- return_value=filament_usage,
- ),
- ):
- mock_settings.base_dir = MagicMock()
- mock_path = MagicMock()
- mock_path.exists.return_value = True
- mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
- results = await _track_from_3mf(
- printer_id=1,
- archive_id=70,
- status="completed",
- print_name="Test",
- handled_trays=handled_trays,
- printer_manager=printer_manager,
- db=db,
- tray_now_at_start=5, # AMS1-T1
- )
- assert len(results) == 1
- assert results[0]["ams_id"] == 1
- assert results[0]["tray_id"] == 1
- class TestTrayChangeSplit:
- """Tests for mid-print tray switch weight splitting in _track_from_3mf()."""
- @pytest.mark.asyncio
- async def test_tray_switch_splits_weight_with_gcode(self):
- """Two-tray runout: weight split using per-layer gcode data."""
- spool_a = _make_spool(spool_id=10, label_weight=1000)
- spool_b = _make_spool(spool_id=20, label_weight=1000)
- assign_a = _make_assignment(spool_id=10, ams_id=0, tray_id=1)
- assign_b = _make_assignment(spool_id=20, ams_id=0, tray_id=0)
- archive = _make_archive(archive_id=100)
- # db: archive, queue_item(None), then for each segment: assignment, spool
- db = _mock_db_sequential([archive, None, assign_a, spool_a, assign_b, spool_b])
- # Tray change log: started on tray 1, switched to tray 0 at layer 60
- printer_manager = MagicMock()
- printer_manager.get_status.return_value = SimpleNamespace(
- progress=100,
- layer_num=100,
- tray_now=0,
- last_loaded_tray=0,
- total_layers=100,
- tray_change_log=[(1, 0), (0, 60)],
- )
- filament_usage = [{"slot_id": 1, "used_g": 30.0, "type": "PLA", "color": ""}]
- handled_trays: set[tuple[int, int]] = set()
- with (
- patch("backend.app.core.config.settings") as mock_settings,
- patch(
- "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
- return_value=filament_usage,
- ),
- patch(
- "backend.app.utils.threemf_tools.extract_layer_filament_usage_from_3mf",
- return_value={30: {0: 3000.0}, 60: {0: 6000.0}, 100: {0: 10000.0}},
- ),
- patch(
- "backend.app.utils.threemf_tools.get_cumulative_usage_at_layer",
- side_effect=lambda data, layer: {0: {0: 0.0, 60: 6000.0, 100: 10000.0}.get(layer, 0.0)},
- ),
- patch(
- "backend.app.utils.threemf_tools.extract_filament_properties_from_3mf",
- return_value={1: {"density": 1.24, "diameter": 1.75}},
- ),
- patch(
- "backend.app.utils.threemf_tools.mm_to_grams",
- side_effect=lambda mm, d, dens: round(mm * 0.003, 1), # Simple conversion
- ),
- ):
- mock_settings.base_dir = MagicMock()
- mock_path = MagicMock()
- mock_path.exists.return_value = True
- mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
- results = await _track_from_3mf(
- printer_id=1,
- archive_id=100,
- status="completed",
- print_name="Runout Test",
- handled_trays=handled_trays,
- printer_manager=printer_manager,
- db=db,
- )
- # Two results: one per tray segment
- assert len(results) == 2
- # First segment: tray 1 (AMS0-T1), layers 0→60
- assert results[0]["ams_id"] == 0
- assert results[0]["tray_id"] == 1
- assert results[0]["spool_id"] == 10
- assert results[0]["weight_used"] == 18.0 # 6000mm * 0.003
- # Second segment: tray 0 (AMS0-T0), layers 60→end = 30.0 - 18.0 = 12.0
- assert results[1]["ams_id"] == 0
- assert results[1]["tray_id"] == 0
- assert results[1]["spool_id"] == 20
- assert results[1]["weight_used"] == 12.0
- # Both trays handled
- assert (0, 1) in handled_trays
- assert (0, 0) in handled_trays
- @pytest.mark.asyncio
- async def test_tray_switch_linear_fallback(self):
- """Two-tray runout without per-layer gcode: linear split by layer ratio."""
- spool_a = _make_spool(spool_id=10, label_weight=1000)
- spool_b = _make_spool(spool_id=20, label_weight=1000)
- assign_a = _make_assignment(spool_id=10, ams_id=0, tray_id=2)
- assign_b = _make_assignment(spool_id=20, ams_id=0, tray_id=1)
- archive = _make_archive(archive_id=101)
- db = _mock_db_sequential([archive, None, assign_a, spool_a, assign_b, spool_b])
- # Tray 2 from layer 0, switched to tray 1 at layer 40 (of 100 total)
- printer_manager = MagicMock()
- printer_manager.get_status.return_value = SimpleNamespace(
- progress=100,
- layer_num=100,
- tray_now=1,
- last_loaded_tray=1,
- total_layers=100,
- tray_change_log=[(2, 0), (1, 40)],
- )
- filament_usage = [{"slot_id": 1, "used_g": 50.0, "type": "PLA", "color": ""}]
- handled_trays: set[tuple[int, int]] = set()
- with (
- patch("backend.app.core.config.settings") as mock_settings,
- patch(
- "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
- return_value=filament_usage,
- ),
- patch(
- "backend.app.utils.threemf_tools.extract_layer_filament_usage_from_3mf",
- return_value=None, # No per-layer gcode available
- ),
- ):
- mock_settings.base_dir = MagicMock()
- mock_path = MagicMock()
- mock_path.exists.return_value = True
- mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
- results = await _track_from_3mf(
- printer_id=1,
- archive_id=101,
- status="completed",
- print_name="Linear Fallback",
- handled_trays=handled_trays,
- printer_manager=printer_manager,
- db=db,
- )
- assert len(results) == 2
- # Linear split: tray 2 for 40/100 layers = 20g
- assert results[0]["ams_id"] == 0
- assert results[0]["tray_id"] == 2
- assert results[0]["weight_used"] == 20.0
- # Last segment gets remainder: 50 - 20 = 30g
- assert results[1]["ams_id"] == 0
- assert results[1]["tray_id"] == 1
- assert results[1]["weight_used"] == 30.0
- @pytest.mark.asyncio
- async def test_no_tray_change_uses_normal_path(self):
- """Single-entry tray_change_log falls through to normal tray_now_at_start logic."""
- spool = _make_spool(spool_id=1, label_weight=1000)
- assignment = _make_assignment(spool_id=1, ams_id=0, tray_id=2)
- archive = _make_archive(archive_id=102)
- db = _mock_db_sequential([archive, None, assignment, spool])
- # Only one entry = no switch, should use normal path
- printer_manager = MagicMock()
- printer_manager.get_status.return_value = SimpleNamespace(
- progress=100,
- layer_num=100,
- tray_now=2,
- last_loaded_tray=2,
- total_layers=100,
- tray_change_log=[(2, 0)],
- )
- filament_usage = [{"slot_id": 1, "used_g": 15.0, "type": "PLA", "color": ""}]
- handled_trays: set[tuple[int, int]] = set()
- with (
- patch("backend.app.core.config.settings") as mock_settings,
- patch(
- "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
- return_value=filament_usage,
- ),
- ):
- mock_settings.base_dir = MagicMock()
- mock_path = MagicMock()
- mock_path.exists.return_value = True
- mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
- results = await _track_from_3mf(
- printer_id=1,
- archive_id=102,
- status="completed",
- print_name="No Switch",
- handled_trays=handled_trays,
- printer_manager=printer_manager,
- db=db,
- tray_now_at_start=2,
- )
- # Normal path: single result, full weight
- assert len(results) == 1
- assert results[0]["weight_used"] == 15.0
- assert results[0]["ams_id"] == 0
- assert results[0]["tray_id"] == 2
- @pytest.mark.asyncio
- async def test_empty_tray_change_log_uses_normal_path(self):
- """Empty tray_change_log (e.g. server restart) falls through to existing logic."""
- spool = _make_spool(spool_id=1, label_weight=1000)
- assignment = _make_assignment(spool_id=1, ams_id=0, tray_id=0)
- archive = _make_archive(archive_id=103)
- db = _mock_db_sequential([archive, None, assignment, spool])
- # Empty log (server restarted mid-print)
- printer_manager = MagicMock()
- printer_manager.get_status.return_value = SimpleNamespace(
- progress=100,
- layer_num=100,
- tray_now=0,
- last_loaded_tray=0,
- total_layers=100,
- tray_change_log=[],
- )
- filament_usage = [{"slot_id": 1, "used_g": 10.0, "type": "PLA", "color": ""}]
- handled_trays: set[tuple[int, int]] = set()
- with (
- patch("backend.app.core.config.settings") as mock_settings,
- patch(
- "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
- return_value=filament_usage,
- ),
- ):
- mock_settings.base_dir = MagicMock()
- mock_path = MagicMock()
- mock_path.exists.return_value = True
- mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
- results = await _track_from_3mf(
- printer_id=1,
- archive_id=103,
- status="completed",
- print_name="Restart Recovery",
- handled_trays=handled_trays,
- printer_manager=printer_manager,
- db=db,
- tray_now_at_start=0,
- )
- assert len(results) == 1
- assert results[0]["weight_used"] == 10.0
- @pytest.mark.asyncio
- async def test_tray_switch_segment_no_spool(self):
- """Segment with no spool assignment is skipped; other segments still tracked."""
- spool_b = _make_spool(spool_id=20, label_weight=1000)
- assign_b = _make_assignment(spool_id=20, ams_id=0, tray_id=3)
- archive = _make_archive(archive_id=104)
- # db: archive, queue_item(None), 1st segment: no assignment, 2nd segment: assignment, spool
- db = _mock_db_sequential([archive, None, None, assign_b, spool_b])
- # Tray 5 (no spool) from layer 0, switched to tray 3 at layer 50
- printer_manager = MagicMock()
- printer_manager.get_status.return_value = SimpleNamespace(
- progress=100,
- layer_num=100,
- tray_now=3,
- last_loaded_tray=3,
- total_layers=100,
- tray_change_log=[(5, 0), (3, 50)],
- )
- filament_usage = [{"slot_id": 1, "used_g": 40.0, "type": "PLA", "color": ""}]
- handled_trays: set[tuple[int, int]] = set()
- with (
- patch("backend.app.core.config.settings") as mock_settings,
- patch(
- "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
- return_value=filament_usage,
- ),
- patch(
- "backend.app.utils.threemf_tools.extract_layer_filament_usage_from_3mf",
- return_value=None, # No per-layer data
- ),
- ):
- mock_settings.base_dir = MagicMock()
- mock_path = MagicMock()
- mock_path.exists.return_value = True
- mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
- results = await _track_from_3mf(
- printer_id=1,
- archive_id=104,
- status="completed",
- print_name="Missing Spool",
- handled_trays=handled_trays,
- printer_manager=printer_manager,
- db=db,
- )
- # Only the second segment (tray 3) tracked; first segment (tray 5) skipped
- assert len(results) == 1
- assert results[0]["ams_id"] == 0
- assert results[0]["tray_id"] == 3
- assert results[0]["spool_id"] == 20
- @pytest.mark.asyncio
- async def test_tray_switch_three_segments(self):
- """Three-segment switch (rare): A→B→C split by linear fallback."""
- spool_a = _make_spool(spool_id=1, label_weight=1000)
- spool_b = _make_spool(spool_id=2, label_weight=1000)
- spool_c = _make_spool(spool_id=3, label_weight=1000)
- assign_a = _make_assignment(spool_id=1, ams_id=0, tray_id=0)
- assign_b = _make_assignment(spool_id=2, ams_id=0, tray_id=1)
- assign_c = _make_assignment(spool_id=3, ams_id=0, tray_id=2)
- archive = _make_archive(archive_id=105)
- db = _mock_db_sequential(
- [
- archive,
- None,
- assign_a,
- spool_a,
- assign_b,
- spool_b,
- assign_c,
- spool_c,
- ]
- )
- # 3 segments: tray 0 (0-30), tray 1 (30-70), tray 2 (70-end)
- printer_manager = MagicMock()
- printer_manager.get_status.return_value = SimpleNamespace(
- progress=100,
- layer_num=100,
- tray_now=2,
- last_loaded_tray=2,
- total_layers=100,
- tray_change_log=[(0, 0), (1, 30), (2, 70)],
- )
- filament_usage = [{"slot_id": 1, "used_g": 100.0, "type": "PLA", "color": ""}]
- handled_trays: set[tuple[int, int]] = set()
- with (
- patch("backend.app.core.config.settings") as mock_settings,
- patch(
- "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
- return_value=filament_usage,
- ),
- patch(
- "backend.app.utils.threemf_tools.extract_layer_filament_usage_from_3mf",
- return_value=None,
- ),
- ):
- mock_settings.base_dir = MagicMock()
- mock_path = MagicMock()
- mock_path.exists.return_value = True
- mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
- results = await _track_from_3mf(
- printer_id=1,
- archive_id=105,
- status="completed",
- print_name="Triple Switch",
- handled_trays=handled_trays,
- printer_manager=printer_manager,
- db=db,
- )
- assert len(results) == 3
- # Tray 0: 30/100 * 100g = 30g
- assert results[0]["weight_used"] == 30.0
- assert results[0]["ams_id"] == 0
- assert results[0]["tray_id"] == 0
- # Tray 1: 40/100 * 100g = 40g
- assert results[1]["weight_used"] == 40.0
- assert results[1]["ams_id"] == 0
- assert results[1]["tray_id"] == 1
- # Tray 2: remainder = 100 - 30 - 40 = 30g
- assert results[2]["weight_used"] == 30.0
- assert results[2]["ams_id"] == 0
- assert results[2]["tray_id"] == 2
- class TestDecodeMqttMapping:
- """Tests for _decode_mqtt_mapping() — snow-encoded MQTT mapping to global tray IDs."""
- def test_none_input(self):
- assert _decode_mqtt_mapping(None) is None
- def test_empty_list(self):
- assert _decode_mqtt_mapping([]) is None
- def test_all_unmapped(self):
- """All 65535 values → None (no valid mappings)."""
- assert _decode_mqtt_mapping([65535, 65535, 65535]) is None
- def test_single_ams_slots(self):
- """AMS 0 slots: snow values 0-3 → global tray IDs 0-3."""
- assert _decode_mqtt_mapping([0, 1, 2, 3]) == [0, 1, 2, 3]
- def test_multi_ams_slots(self):
- """AMS 1 (hw_id=1): snow 256=AMS1-T0, 257=AMS1-T1 → global 4, 5."""
- assert _decode_mqtt_mapping([256, 257]) == [4, 5]
- def test_ams_ht_slot(self):
- """AMS-HT (hw_id=128): snow 32768 → global 128."""
- assert _decode_mqtt_mapping([32768]) == [128]
- def test_external_spool(self):
- """External spool: ams_hw_id=254, slot=0 → global 254."""
- # snow = 254 * 256 + 0 = 65024
- assert _decode_mqtt_mapping([65024]) == [254]
- def test_mixed_with_unmapped(self):
- """Mix of valid and unmapped (65535) values."""
- result = _decode_mqtt_mapping([1, 65535, 0])
- assert result == [1, -1, 0]
- def test_h2c_real_mapping(self):
- """Real H2C mapping from MQTT logs: [1, 0, 65535*4, 32768]."""
- mapping = [1, 0, 65535, 65535, 65535, 65535, 32768]
- result = _decode_mqtt_mapping(mapping)
- assert result == [1, 0, -1, -1, -1, -1, 128]
- def test_non_int_values_treated_as_unmapped(self):
- """Non-integer values in the mapping are treated as unmapped."""
- assert _decode_mqtt_mapping(["foo", 0]) == [-1, 0]
- class TestMatchSlotsByColor:
- """Tests for _match_slots_by_color() — color-based filament slot to AMS tray matching."""
- def _ams(self, trays):
- """Build AMS data from list of (ams_id, tray_id, color_hex, tray_type) tuples."""
- units: dict[int, list] = {}
- for ams_id, tray_id, color, tray_type in trays:
- units.setdefault(ams_id, []).append({"id": tray_id, "tray_color": color, "tray_type": tray_type})
- return [{"id": aid, "tray": t} for aid, t in units.items()]
- def _usage(self, slots):
- """Build filament_usage from list of (slot_id, color_hex) tuples."""
- return [{"slot_id": sid, "used_g": 10.0, "type": "PLA", "color": color} for sid, color in slots]
- def test_none_inputs(self):
- assert _match_slots_by_color(None, None) is None
- assert _match_slots_by_color([], None) is None
- assert _match_slots_by_color(None, {"ams": []}) is None
- def test_empty_ams(self):
- usage = self._usage([(1, "#FF0000")])
- assert _match_slots_by_color(usage, {"ams": []}) is None
- def test_single_slot_single_tray(self):
- """One 3MF slot matches one AMS tray by color."""
- ams = self._ams([(0, 0, "FF0000FF", "PLA")])
- usage = self._usage([(1, "#FF0000")])
- assert _match_slots_by_color(usage, {"ams": ams}) == [0]
- def test_a1_mini_three_colors(self):
- """A1 Mini: 3 slots match 3 distinct AMS trays."""
- ams = self._ams(
- [
- (0, 0, "FF0000FF", "PLA"), # Red
- (0, 1, "00FF00FF", "PLA"), # Green
- (0, 2, "0000FFFF", "PLA"), # Blue
- ]
- )
- usage = self._usage([(1, "#FF0000"), (2, "#00FF00"), (3, "#0000FF")])
- assert _match_slots_by_color(usage, {"ams": ams}) == [0, 1, 2]
- def test_dual_ams_p2s_like(self):
- """P2S with dual AMS: slots from second AMS unit."""
- ams = self._ams(
- [
- (0, 0, "AAAAAAFF", "PLA"),
- (0, 1, "BBBBBBFF", "PLA"),
- (1, 0, "CC0000FF", "PETG"), # global_id=4
- (1, 1, "00CC00FF", "PETG"), # global_id=5
- ]
- )
- usage = self._usage([(1, "#CC0000"), (2, "#00CC00")])
- assert _match_slots_by_color(usage, {"ams": ams}) == [4, 5]
- def test_ams_ht_global_id(self):
- """AMS-HT (ams_id >= 128) uses raw ams_id as global tray ID."""
- ams = self._ams(
- [
- (0, 0, "FF0000FF", "PLA"),
- (128, 0, "0000FFFF", "PLA"), # AMS-HT → global_id=128
- ]
- )
- usage = self._usage([(1, "#FF0000"), (2, "#0000FF")])
- assert _match_slots_by_color(usage, {"ams": ams}) == [0, 128]
- def test_ambiguous_same_color_returns_none(self):
- """Two trays with the same color → ambiguous → None."""
- ams = self._ams(
- [
- (0, 0, "FF0000FF", "PLA"),
- (0, 1, "FF0000FF", "PLA"), # Same red
- ]
- )
- usage = self._usage([(1, "#FF0000")])
- assert _match_slots_by_color(usage, {"ams": ams}) is None
- def test_no_matching_color_returns_none(self):
- """3MF slot color not found in any AMS tray → None."""
- ams = self._ams([(0, 0, "00FF00FF", "PLA")])
- usage = self._usage([(1, "#FF0000")]) # Red, but AMS has green
- assert _match_slots_by_color(usage, {"ams": ams}) is None
- def test_color_normalization_strips_alpha(self):
- """AMS colors (RRGGBBAA) and 3MF colors (#RRGGBB) match after normalization."""
- ams = self._ams([(0, 0, "AABBCC80", "PLA")]) # 8-char with alpha
- usage = self._usage([(1, "#AABBCC")]) # 6-char with #
- assert _match_slots_by_color(usage, {"ams": ams}) == [0]
- def test_case_insensitive(self):
- """Color matching is case-insensitive."""
- ams = self._ams([(0, 0, "aaBBccFF", "PLA")])
- usage = self._usage([(1, "#AAbbCC")])
- assert _match_slots_by_color(usage, {"ams": ams}) == [0]
- def test_empty_tray_color_skipped(self):
- """Trays with empty color are skipped (not matched)."""
- ams = self._ams(
- [
- (0, 0, "", "PLA"),
- (0, 1, "FF0000FF", "PLA"),
- ]
- )
- usage = self._usage([(1, "#FF0000")])
- assert _match_slots_by_color(usage, {"ams": ams}) == [1]
- def test_empty_tray_type_skipped(self):
- """Trays with empty tray_type are skipped (unloaded slot)."""
- ams = self._ams(
- [
- (0, 0, "FF0000FF", ""), # Empty slot
- (0, 1, "FF0000FF", "PLA"), # Loaded slot
- ]
- )
- usage = self._usage([(1, "#FF0000")])
- assert _match_slots_by_color(usage, {"ams": ams}) == [1]
- def test_short_slot_color_returns_none(self):
- """3MF slot with color < 6 chars → can't match → None."""
- ams = self._ams([(0, 0, "FF0000FF", "PLA")])
- usage = [{"slot_id": 1, "used_g": 10.0, "type": "PLA", "color": "#FFF"}]
- assert _match_slots_by_color(usage, {"ams": ams}) is None
- def test_slot_id_zero_skipped(self):
- """Slots with slot_id=0 are skipped."""
- ams = self._ams([(0, 0, "FF0000FF", "PLA")])
- usage = [{"slot_id": 0, "used_g": 10.0, "type": "PLA", "color": "#FF0000"}]
- assert _match_slots_by_color(usage, {"ams": ams}) is None
- def test_ams_data_as_list(self):
- """Handles ams_raw as a plain list (some printer models)."""
- ams_list = [{"id": 0, "tray": [{"id": 0, "tray_color": "FF0000FF", "tray_type": "PLA"}]}]
- usage = self._usage([(1, "#FF0000")])
- assert _match_slots_by_color(usage, ams_list) == [0]
- def test_same_color_two_trays_disambiguated_by_usage(self):
- """Two trays same color, two slots same color → unique assignment via used_trays tracking."""
- ams = self._ams(
- [
- (0, 0, "FF0000FF", "PLA"),
- (0, 1, "FF0000FF", "PLA"),
- ]
- )
- # Two slots both wanting red — first gets tray 0, second gets tray 1? No.
- # When first slot takes the only available, second has 1 left → should work
- usage = self._usage([(1, "#FF0000"), (2, "#FF0000")])
- # First slot: candidates=[0,1], available=[0,1], len!=1 → None
- assert _match_slots_by_color(usage, {"ams": ams}) is None
- def test_dict_wrapper_with_ams_key(self):
- """Standard dict format with 'ams' key."""
- ams_data = {"ams": [{"id": 0, "tray": [{"id": 0, "tray_color": "00FF00FF", "tray_type": "PLA"}]}]}
- usage = self._usage([(1, "#00FF00")])
- assert _match_slots_by_color(usage, ams_data) == [0]
- class TestMqttMappingIntegration:
- """Integration tests: MQTT mapping field used in _track_from_3mf."""
- @pytest.mark.asyncio
- async def test_h2c_multi_filament_uses_mqtt_mapping(self):
- """H2C: 3 filaments resolved via MQTT mapping field (no ams_mapping, no queue)."""
- # AMS0-T1 (White PLA), AMS0-T0 (Black PLA), AMS128-T0 (Red PLA)
- spool_white = _make_spool(spool_id=1, label_weight=1000)
- spool_black = _make_spool(spool_id=2, label_weight=1000)
- spool_red = _make_spool(spool_id=3, label_weight=1000)
- assign_white = _make_assignment(spool_id=1, ams_id=0, tray_id=1)
- assign_black = _make_assignment(spool_id=2, ams_id=0, tray_id=0)
- assign_red = _make_assignment(spool_id=3, ams_id=128, tray_id=0)
- archive = _make_archive(archive_id=12)
- # db: archive, then 3 pairs of (assignment, spool)
- # No queue lookup because MQTT mapping is found first
- db = _mock_db_sequential(
- [
- archive,
- assign_white,
- spool_white,
- assign_black,
- spool_black,
- assign_red,
- spool_red,
- ]
- )
- # MQTT mapping: slot0→AMS0-T1(1), slot1→AMS0-T0(0), slots2-5→unmapped, slot6→AMS128-T0(32768)
- printer_manager = MagicMock()
- printer_manager.get_status.return_value = SimpleNamespace(
- raw_data={"mapping": [1, 0, 65535, 65535, 65535, 65535, 32768]},
- progress=100,
- layer_num=50,
- tray_now=255,
- )
- # 3MF slots 1, 2, 7 (1-based) → indices 0, 1, 6 in mapping
- filament_usage = [
- {"slot_id": 1, "used_g": 21.16, "type": "PLA", "color": "#FFFFFF"},
- {"slot_id": 2, "used_g": 24.22, "type": "PLA", "color": "#000000"},
- {"slot_id": 7, "used_g": 18.47, "type": "PLA", "color": "#F72323"},
- ]
- handled_trays: set[tuple[int, int]] = set()
- with (
- patch("backend.app.core.config.settings") as mock_settings,
- patch(
- "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
- return_value=filament_usage,
- ),
- ):
- mock_settings.base_dir = MagicMock()
- mock_path = MagicMock()
- mock_path.exists.return_value = True
- mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
- results = await _track_from_3mf(
- printer_id=1,
- archive_id=12,
- status="completed",
- print_name="Cube + Cube + Cube",
- handled_trays=handled_trays,
- printer_manager=printer_manager,
- db=db,
- )
- assert len(results) == 3
- # slot_id=1 → mapping[0]=1 → AMS0-T1 (White PLA)
- assert results[0]["spool_id"] == 1
- assert results[0]["ams_id"] == 0
- assert results[0]["tray_id"] == 1
- assert results[0]["weight_used"] == 21.2
- # slot_id=2 → mapping[1]=0 → AMS0-T0 (Black PLA)
- assert results[1]["spool_id"] == 2
- assert results[1]["ams_id"] == 0
- assert results[1]["tray_id"] == 0
- assert results[1]["weight_used"] == 24.2
- # slot_id=7 → mapping[6]=32768 → AMS128-T0 (Red PLA)
- assert results[2]["spool_id"] == 3
- assert results[2]["ams_id"] == 128
- assert results[2]["tray_id"] == 0
- assert results[2]["weight_used"] == 18.5
- @pytest.mark.asyncio
- async def test_print_cmd_mapping_takes_priority_over_mqtt(self):
- """ams_mapping from print command is used even when MQTT mapping exists."""
- spool = _make_spool(spool_id=1, label_weight=1000)
- assignment = _make_assignment(spool_id=1, ams_id=0, tray_id=2)
- archive = _make_archive(archive_id=10)
- # db: archive, assignment, spool (no queue lookup when ams_mapping provided)
- db = _mock_db_sequential([archive, assignment, spool])
- printer_manager = MagicMock()
- printer_manager.get_status.return_value = SimpleNamespace(
- raw_data={"mapping": [0, 65535]}, # MQTT says slot 0 → AMS0-T0
- progress=100,
- layer_num=50,
- tray_now=255,
- )
- filament_usage = [{"slot_id": 1, "used_g": 10.0, "type": "PLA", "color": ""}]
- handled_trays: set[tuple[int, int]] = set()
- with (
- patch("backend.app.core.config.settings") as mock_settings,
- patch(
- "backend.app.utils.threemf_tools.extract_filament_usage_from_3mf",
- return_value=filament_usage,
- ),
- ):
- mock_settings.base_dir = MagicMock()
- mock_path = MagicMock()
- mock_path.exists.return_value = True
- mock_settings.base_dir.__truediv__ = MagicMock(return_value=mock_path)
- results = await _track_from_3mf(
- printer_id=1,
- archive_id=10,
- status="completed",
- print_name="Test",
- handled_trays=handled_trays,
- printer_manager=printer_manager,
- db=db,
- ams_mapping=[2], # Print cmd says slot 0 → AMS0-T2 (overrides MQTT)
- )
- assert len(results) == 1
- assert results[0]["ams_id"] == 0
- assert results[0]["tray_id"] == 2 # From print_cmd mapping, not MQTT
- class TestNotificationVariables:
- """Tests for filament_details formatting in notifications."""
- def test_filament_details_single_slot(self):
- """Single slot produces 'PLA: 15.2g' format."""
- slots = [{"type": "PLA", "used_g": 15.2, "slot_id": 1, "color": "#FF0000"}]
- parts = []
- for slot in slots:
- ftype = slot.get("type", "Unknown") or "Unknown"
- used = slot.get("used_g", 0)
- parts.append(f"{ftype}: {used:.1f}g")
- result = " | ".join(parts)
- assert result == "PLA: 15.2g"
- def test_filament_details_multi_slot(self):
- """Multiple slots produce 'PLA: 10.0g | PETG: 5.0g' format."""
- slots = [
- {"type": "PLA", "used_g": 10.0, "slot_id": 1, "color": ""},
- {"type": "PETG", "used_g": 5.0, "slot_id": 2, "color": ""},
- ]
- parts = []
- for slot in slots:
- ftype = slot.get("type", "Unknown") or "Unknown"
- used = slot.get("used_g", 0)
- parts.append(f"{ftype}: {used:.1f}g")
- result = " | ".join(parts)
- assert result == "PLA: 10.0g | PETG: 5.0g"
- def test_filament_details_empty_type(self):
- """Empty type defaults to 'Unknown'."""
- slots = [{"type": "", "used_g": 5.0, "slot_id": 1, "color": ""}]
- parts = []
- for slot in slots:
- ftype = slot.get("type", "Unknown") or "Unknown"
- used = slot.get("used_g", 0)
- parts.append(f"{ftype}: {used:.1f}g")
- result = " | ".join(parts)
- assert result == "Unknown: 5.0g"
- def test_filament_grams_scaled_for_partial(self):
- """filament_grams is scaled by progress for partial prints."""
- filament_used_grams = 20.0
- progress = 50
- scale = max(0.0, min(progress / 100.0, 1.0))
- scaled = round(filament_used_grams * scale, 1)
- assert scaled == 10.0
- def test_filament_grams_zero_progress(self):
- """Progress=0 at cancellation gives 0.0g."""
- filament_used_grams = 20.0
- progress = 0
- scale = max(0.0, min(progress / 100.0, 1.0))
- scaled = round(filament_used_grams * scale, 1)
- assert scaled == 0.0
- def test_slot_scaling_for_partial(self):
- """Per-slot usage is scaled linearly for partial prints."""
- slots = [
- {"type": "PLA", "used_g": 20.0, "slot_id": 1, "color": ""},
- {"type": "PETG", "used_g": 10.0, "slot_id": 2, "color": ""},
- ]
- progress = 30
- scale = max(0.0, min(progress / 100.0, 1.0))
- scaled_slots = [{**s, "used_g": round(s["used_g"] * scale, 1)} for s in slots]
- assert scaled_slots[0]["used_g"] == 6.0
- assert scaled_slots[1]["used_g"] == 3.0
|