_spoolman_helpers.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. """Pure helper functions for Spoolman spool mapping.
  2. No heavy dependencies — importable in unit tests without the full backend stack.
  3. """
  4. from __future__ import annotations
  5. import ipaddress
  6. import logging
  7. import math
  8. import re
  9. from datetime import datetime, timezone
  10. from urllib.parse import urlparse
  11. logger = logging.getLogger(__name__)
  12. _CLOUD_METADATA_IPS = frozenset(
  13. {
  14. # AWS / GCP / Azure / Oracle / DigitalOcean IMDS
  15. ipaddress.ip_address("169.254.169.254"),
  16. # Alibaba Cloud metadata
  17. ipaddress.ip_address("100.100.100.200"),
  18. # AWS IMDS IPv6
  19. ipaddress.ip_address("fd00:ec2::254"),
  20. }
  21. )
  22. def assert_safe_spoolman_url(url: str) -> None:
  23. """Raise ValueError if *url* should be blocked as an SSRF risk.
  24. Bambuddy is typically deployed on a home LAN alongside Spoolman, so
  25. loopback (127.0.0.1) and RFC-1918 private ranges (192.168.x.x, 10.x.x.x,
  26. 172.16-31.x) must be permitted — they are THE normal Spoolman topology.
  27. This guard therefore targets the genuinely dangerous cases only.
  28. Checks performed:
  29. - Scheme must be http or https (no file://, gopher://, dict://, etc.).
  30. - Numeric-encoded IP addresses in decimal (e.g. ``2130706433``) or hex
  31. (e.g. ``0x7f000001``) are rejected. Python's ``ipaddress`` module raises
  32. ``ValueError`` for these forms so they would otherwise bypass the
  33. explicit-IP block below, but libc (and browsers) resolve them as valid
  34. IPv4 addresses.
  35. - Cloud provider metadata endpoints (169.254.169.254, 100.100.100.200,
  36. fd00:ec2::254) are blocked — the classic SSRF credential-exfil target.
  37. - Multicast (224.0.0.0/4, ff00::/8) and unspecified (0.0.0.0, ::) addresses
  38. are blocked — pointless as a destination and suggests misuse.
  39. - IPv4-mapped IPv6 addresses (::ffff:x.x.x.x) are unwrapped so they cannot
  40. bypass the checks above.
  41. Hostname-based addresses ("localhost", "spoolman.lan", "internal.corp")
  42. are out of scope — DNS resolution is deliberately not performed here.
  43. """
  44. parsed = urlparse(url)
  45. if parsed.scheme.lower() not in ("http", "https"):
  46. raise ValueError("Spoolman URL must use http or https")
  47. hostname = (parsed.hostname or "").lower()
  48. # Reject decimal- and hex-encoded IPs (e.g. http://2130706433/ or
  49. # http://0x7f000001/). These slip past ipaddress.ip_address() but libc
  50. # (and browsers) parse them as IPv4 — an obvious bypass if not caught.
  51. if re.match(r"^(0x[0-9a-f]+|[0-9]+)$", hostname, re.I):
  52. raise ValueError("Spoolman URL must not use numeric-encoded IP addresses; use standard dotted-decimal notation")
  53. try:
  54. addr = ipaddress.ip_address(hostname)
  55. except ValueError:
  56. # Not a bare IP — hostname-based addresses are out of scope.
  57. return
  58. # Unwrap IPv4-mapped IPv6 (::ffff:169.254.169.254 etc.) so attackers can't
  59. # encode a blocked IPv4 into an IPv6 literal to bypass the check.
  60. effective: ipaddress.IPv4Address | ipaddress.IPv6Address = addr
  61. if isinstance(addr, ipaddress.IPv6Address) and addr.ipv4_mapped is not None:
  62. effective = addr.ipv4_mapped
  63. if effective in _CLOUD_METADATA_IPS:
  64. raise ValueError("Spoolman URL must not point to a cloud metadata endpoint")
  65. if effective.is_multicast or effective.is_unspecified:
  66. raise ValueError("Spoolman URL must not point to a multicast or unspecified address")
  67. _COLOR_HEX_RE = re.compile(r"^[0-9A-Fa-f]{6}$")
  68. _TAG_HEX_RE = re.compile(r"^[0-9A-F]+$")
  69. def _safe_int(value: object, fallback: int) -> int:
  70. """Convert value to int, returning fallback for None/NaN/Inf/non-numeric."""
  71. try:
  72. f = float(value) # type: ignore[arg-type]
  73. if math.isfinite(f):
  74. return int(f)
  75. except (TypeError, ValueError):
  76. pass
  77. return fallback
  78. def _safe_float(value: object, fallback: float) -> float:
  79. """Convert value to float, returning fallback for None/NaN/Inf/non-numeric."""
  80. try:
  81. f = float(value) # type: ignore[arg-type]
  82. if math.isfinite(f):
  83. return f
  84. except (TypeError, ValueError):
  85. pass
  86. return fallback
  87. def _safe_optional_float(value: object) -> float | None:
  88. """Convert value to finite float, or None if missing/NaN/Infinite/non-numeric.
  89. Used for optional monetary fields (price) to prevent Infinity/NaN from
  90. reaching JSON serialisation, which raises ValueError with allow_nan=False.
  91. """
  92. if value is None:
  93. return None
  94. try:
  95. f = float(value) # type: ignore[arg-type]
  96. if math.isfinite(f):
  97. return f
  98. except (TypeError, ValueError):
  99. pass
  100. return None
  101. def _map_spoolman_spool(spool: dict) -> dict:
  102. """Convert a raw Spoolman spool dict to the InventorySpool-compatible format.
  103. Fields not supported by Spoolman (k_profiles, slicer_filament, …) are
  104. returned as None / empty so the frontend can still render them without
  105. errors. The ``data_origin`` field is set to ``"spoolman"`` so UI code can
  106. distinguish these spools from local ones.
  107. """
  108. raw_id = spool.get("id")
  109. if raw_id is None:
  110. raise ValueError("Spoolman spool is missing required 'id' field")
  111. try:
  112. spool_id: int = int(raw_id)
  113. except (TypeError, ValueError):
  114. raise ValueError(f"Spoolman spool 'id' is not a valid integer: {raw_id!r}")
  115. if spool_id <= 0:
  116. raise ValueError(f"Spoolman spool 'id' must be a positive integer, got {spool_id}")
  117. filament: dict = spool.get("filament") or {}
  118. if not filament:
  119. logger.warning(
  120. "Spoolman spool %s has no filament data — all filament fields will use defaults",
  121. spool_id,
  122. )
  123. vendor: dict = filament.get("vendor") or {}
  124. extra: dict = spool.get("extra") or {}
  125. # RFID tag stored as JSON-encoded string in Spoolman extra.tag.
  126. # 32-char hex → Bambu Lab tray UUID; 8–30-char hex → NFC tag UID.
  127. # Accepting the full realistic UID range (4-byte = 8 chars, 7-byte = 14 chars,
  128. # 10-byte = 20 chars) avoids silently dropping valid SpoolBuddy-written tags.
  129. raw_tag: str = (extra.get("tag") or "").strip('"').upper()
  130. _raw_is_hex = bool(_TAG_HEX_RE.match(raw_tag))
  131. tag_uid = raw_tag if _raw_is_hex and 8 <= len(raw_tag) <= 30 else None
  132. tray_uuid = raw_tag if _raw_is_hex and len(raw_tag) == 32 else None
  133. # Subtype = filament name with material prefix stripped
  134. material: str = (filament.get("material") or "").strip()
  135. filament_name: str = (filament.get("name") or "").strip()
  136. if material and filament_name.upper().startswith(material.upper()):
  137. subtype: str | None = filament_name[len(material) :].strip() or None
  138. else:
  139. subtype = filament_name or None
  140. # Colour: validate as 6-char hex; fall back to neutral grey for invalid values
  141. raw_color = (filament.get("color_hex") or "").upper().removeprefix("#")
  142. color_hex: str = raw_color if _COLOR_HEX_RE.match(raw_color) else "808080"
  143. rgba: str = color_hex + "FF"
  144. label_weight: int = _safe_int(filament.get("weight"), 1000)
  145. used_weight: float = _safe_float(spool.get("used_weight"), 0.0)
  146. # Archived state – Spoolman uses a boolean ``archived`` field
  147. archived: bool = spool.get("archived", False)
  148. archived_at: str | None = None
  149. if archived:
  150. archived_at = spool.get("last_used") or spool.get("registered")
  151. if not archived_at:
  152. archived_at = datetime.now(timezone.utc).isoformat()
  153. created_at: str = spool.get("registered") or datetime.now(timezone.utc).isoformat()
  154. color_name: str | None = filament.get("color_name") or None
  155. nozzle_temp_raw = filament.get("settings_extruder_temp")
  156. nozzle_temp_min: int | None = _safe_int(nozzle_temp_raw, 0) or None
  157. return {
  158. "id": spool_id,
  159. "material": material,
  160. "subtype": subtype,
  161. "color_name": color_name,
  162. "rgba": rgba,
  163. "brand": vendor.get("name") or None,
  164. "label_weight": label_weight,
  165. "core_weight": _safe_int(filament.get("spool_weight"), 250),
  166. "core_weight_catalog_id": None,
  167. "weight_used": used_weight,
  168. "weight_locked": False,
  169. "last_scale_weight": None,
  170. "last_weighed_at": None,
  171. # slicer_filament_name carries the Spoolman filament name for display
  172. "slicer_filament": None,
  173. "slicer_filament_name": filament_name or None,
  174. "nozzle_temp_min": nozzle_temp_min,
  175. "nozzle_temp_max": None,
  176. "note": spool.get("comment") or None,
  177. "added_full": None,
  178. "last_used": spool.get("last_used"),
  179. # encode_time semantics differ: local records NFC write time; Spoolman first_used
  180. # records first print use — different events; using first_used as best available proxy.
  181. "encode_time": spool.get("first_used"),
  182. "tag_uid": tag_uid,
  183. "tray_uuid": tray_uuid,
  184. "data_origin": "spoolman",
  185. "tag_type": "spoolman",
  186. "archived_at": archived_at,
  187. "created_at": created_at,
  188. # Spoolman has no updated_at field; use registered timestamp as best available proxy
  189. "updated_at": created_at,
  190. "cost_per_kg": _safe_optional_float(spool.get("price")),
  191. "storage_location": spool.get("location") or None,
  192. "k_profiles": [],
  193. }