firmware_check.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386
  1. """
  2. Firmware Check Service
  3. Checks for firmware updates by fetching from Bambu Lab's official firmware download page.
  4. Also provides firmware download functionality for offline updates.
  5. """
  6. import logging
  7. import re
  8. import time
  9. from collections.abc import Callable
  10. from dataclasses import dataclass
  11. from pathlib import Path
  12. import httpx
  13. from backend.app.core.config import _data_dir
  14. logger = logging.getLogger(__name__)
  15. # Bambu Lab firmware download page
  16. BAMBU_FIRMWARE_BASE = "https://bambulab.com"
  17. FIRMWARE_PAGE = "/en/support/firmware-download/all"
  18. # Cache TTL in seconds (1 hour)
  19. CACHE_TTL = 3600
  20. # Map Bambuddy model names to Bambu Lab API keys
  21. MODEL_TO_API_KEY = {
  22. "X1": "x1",
  23. "X1C": "x1",
  24. "X1-Carbon": "x1",
  25. "X1 Carbon": "x1",
  26. "P1P": "p1",
  27. "P1S": "p1",
  28. "A1": "a1",
  29. "A1 Mini": "a1-mini",
  30. "A1-Mini": "a1-mini",
  31. "A1mini": "a1-mini",
  32. "H2D": "h2d",
  33. "H2C": "h2c",
  34. "H2S": "h2s",
  35. "P2S": "p2s",
  36. "X1E": "x1e",
  37. "H2D Pro": "h2d-pro",
  38. "H2D-Pro": "h2d-pro",
  39. "H2DPRO": "h2d-pro",
  40. }
  41. # Reverse mapping: API key to model codes
  42. API_KEY_TO_DEV_MODEL = {
  43. "x1": "BL-P001",
  44. "p1": "C11",
  45. "a1": "N2S",
  46. "a1-mini": "N1",
  47. "h2d": "O1D",
  48. "h2c": "O1C",
  49. "h2s": "O1S",
  50. "p2s": "N7",
  51. "x1e": "C13",
  52. "h2d-pro": "O1E",
  53. }
  54. @dataclass
  55. class FirmwareVersion:
  56. """Firmware version information."""
  57. version: str
  58. download_url: str
  59. release_notes: str | None = None
  60. release_time: str | None = None
  61. class FirmwareCheckService:
  62. """Service for checking firmware updates from Bambu Lab."""
  63. def __init__(self):
  64. self._build_id: str | None = None
  65. self._build_id_time: float = 0
  66. self._version_cache: dict[str, FirmwareVersion] = {}
  67. self._cache_time: float = 0
  68. self._client = httpx.AsyncClient(
  69. timeout=30.0,
  70. headers={
  71. "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
  72. },
  73. )
  74. async def _get_build_id(self) -> str | None:
  75. """Fetch the Next.js build ID from Bambu Lab's firmware page."""
  76. # Use cached build ID if still valid (cache for 1 hour)
  77. if self._build_id and (time.time() - self._build_id_time) < CACHE_TTL:
  78. return self._build_id
  79. try:
  80. response = await self._client.get(f"{BAMBU_FIRMWARE_BASE}{FIRMWARE_PAGE}")
  81. if response.status_code == 200:
  82. # Extract buildId from the page
  83. match = re.search(r'"buildId":"([^"]+)"', response.text)
  84. if match:
  85. self._build_id = match.group(1)
  86. self._build_id_time = time.time()
  87. logger.info("Got Bambu Lab build ID: %s", self._build_id)
  88. return self._build_id
  89. logger.warning("Failed to get Bambu Lab page: %s", response.status_code)
  90. except Exception as e:
  91. logger.error("Error fetching Bambu Lab build ID: %s", e)
  92. return self._build_id # Return cached value if available
  93. async def _fetch_firmware_versions(self, api_key: str) -> FirmwareVersion | None:
  94. """Fetch firmware versions for a specific printer from Bambu Lab API."""
  95. build_id = await self._get_build_id()
  96. if not build_id:
  97. logger.warning("No build ID available, cannot fetch firmware versions")
  98. return None
  99. try:
  100. url = f"{BAMBU_FIRMWARE_BASE}/_next/data/{build_id}/en/support/firmware-download/{api_key}.json"
  101. response = await self._client.get(url)
  102. if response.status_code == 200:
  103. data = response.json()
  104. page_props = data.get("pageProps", {})
  105. printer_map = page_props.get("printerMap", {})
  106. printer_data = printer_map.get(api_key, {})
  107. versions = printer_data.get("versions", [])
  108. if versions:
  109. latest = versions[0]
  110. return FirmwareVersion(
  111. version=latest.get("version", ""),
  112. download_url=latest.get("url", ""),
  113. release_notes=latest.get("release_notes_en"),
  114. release_time=latest.get("release_time"),
  115. )
  116. else:
  117. # api_key is a printer model identifier (e.g. "x1", "p1"), not a secret
  118. logger.warning("Failed to fetch firmware for %s: %s", api_key, response.status_code)
  119. except Exception as e:
  120. # api_key is a printer model identifier (e.g. "x1", "p1"), not a secret
  121. logger.error("Error fetching firmware for %s: %s", api_key, e)
  122. return None
  123. async def get_latest_version(self, model: str) -> FirmwareVersion | None:
  124. """
  125. Get the latest firmware version for a printer model.
  126. Args:
  127. model: Bambuddy printer model name (e.g., "X1C", "P1S", "H2D")
  128. Returns:
  129. FirmwareVersion if found, None otherwise
  130. """
  131. # Normalize model name
  132. model_upper = model.upper().replace(" ", "").replace("-", "")
  133. # Find the API key for this model
  134. api_key = None
  135. for model_name, key in MODEL_TO_API_KEY.items():
  136. if model_name.upper().replace(" ", "").replace("-", "") == model_upper:
  137. api_key = key
  138. break
  139. if not api_key:
  140. # Try direct lookup with original model
  141. api_key = MODEL_TO_API_KEY.get(model)
  142. if not api_key:
  143. logger.debug("Unknown printer model: %s", model)
  144. return None
  145. # Check cache
  146. cache_key = api_key
  147. if cache_key in self._version_cache and (time.time() - self._cache_time) < CACHE_TTL:
  148. return self._version_cache[cache_key]
  149. # Fetch from API
  150. version = await self._fetch_firmware_versions(api_key)
  151. if version:
  152. self._version_cache[cache_key] = version
  153. self._cache_time = time.time()
  154. return version
  155. async def check_for_update(self, model: str, current_version: str) -> dict:
  156. """
  157. Check if a firmware update is available for a printer.
  158. Args:
  159. model: Printer model name
  160. current_version: Currently installed firmware version
  161. Returns:
  162. Dict with update info:
  163. - update_available: bool
  164. - current_version: str
  165. - latest_version: str or None
  166. - download_url: str or None
  167. - release_notes: str or None
  168. """
  169. result = {
  170. "update_available": False,
  171. "current_version": current_version,
  172. "latest_version": None,
  173. "download_url": None,
  174. "release_notes": None,
  175. }
  176. if not current_version:
  177. return result
  178. latest = await self.get_latest_version(model)
  179. if not latest:
  180. return result
  181. result["latest_version"] = latest.version
  182. result["download_url"] = latest.download_url
  183. result["release_notes"] = latest.release_notes
  184. # Compare versions (format: XX.XX.XX.XX)
  185. try:
  186. current_parts = [int(x) for x in current_version.split(".")]
  187. latest_parts = [int(x) for x in latest.version.split(".")]
  188. # Pad to same length
  189. while len(current_parts) < 4:
  190. current_parts.append(0)
  191. while len(latest_parts) < 4:
  192. latest_parts.append(0)
  193. result["update_available"] = latest_parts > current_parts
  194. except (ValueError, AttributeError):
  195. logger.warning("Could not compare versions: %s vs %s", current_version, latest.version)
  196. return result
  197. async def get_all_latest_versions(self) -> dict[str, FirmwareVersion]:
  198. """
  199. Fetch latest firmware versions for all known printer models.
  200. Returns:
  201. Dict mapping API key to FirmwareVersion
  202. """
  203. results = {}
  204. for api_key in API_KEY_TO_DEV_MODEL:
  205. version = await self._fetch_firmware_versions(api_key)
  206. if version:
  207. results[api_key] = version
  208. return results
  209. def _get_firmware_cache_dir(self) -> Path:
  210. """Get the firmware cache directory, creating it if needed."""
  211. cache_dir = _data_dir / "firmware"
  212. cache_dir.mkdir(parents=True, exist_ok=True)
  213. return cache_dir
  214. def _get_cached_firmware_path(self, model: str, version: str) -> Path:
  215. """Get the path where a firmware file would be cached."""
  216. # Normalize model name for filename
  217. model_safe = model.upper().replace(" ", "-").replace("/", "-")
  218. version_safe = version.replace(".", "_")
  219. filename = f"{model_safe}_{version_safe}.bin"
  220. return self._get_firmware_cache_dir() / filename
  221. async def get_firmware_file_info(self, model: str) -> dict | None:
  222. """
  223. Get information about the firmware file for a model.
  224. Returns:
  225. Dict with download_url, version, filename, and estimated_size (if available)
  226. """
  227. latest = await self.get_latest_version(model)
  228. if not latest or not latest.download_url:
  229. return None
  230. # Extract filename from URL
  231. url_parts = latest.download_url.split("/")
  232. filename = url_parts[-1] if url_parts else f"firmware_{model}.bin"
  233. return {
  234. "download_url": latest.download_url,
  235. "version": latest.version,
  236. "filename": filename,
  237. "release_notes": latest.release_notes,
  238. }
  239. async def download_firmware(
  240. self,
  241. model: str,
  242. progress_callback: Callable[[int, int, str], None] | None = None,
  243. ) -> Path | None:
  244. """
  245. Download firmware file for a printer model.
  246. Args:
  247. model: Printer model name (e.g., "X1C", "P1S", "H2D")
  248. progress_callback: Optional callback(bytes_downloaded, total_bytes, status_message)
  249. Returns:
  250. Path to downloaded firmware file, or None on failure
  251. """
  252. latest = await self.get_latest_version(model)
  253. if not latest or not latest.download_url:
  254. logger.warning("No firmware download URL available for model: %s", model)
  255. return None
  256. # Check if already cached
  257. cached_path = self._get_cached_firmware_path(model, latest.version)
  258. if cached_path.exists():
  259. logger.info("Using cached firmware: %s", cached_path)
  260. return cached_path
  261. # Extract original filename from URL (must preserve for SD card update)
  262. url_parts = latest.download_url.split("/")
  263. original_filename = url_parts[-1] if url_parts else f"firmware_{model}.bin"
  264. # Download to temp file first
  265. temp_path = self._get_firmware_cache_dir() / f".downloading_{original_filename}"
  266. try:
  267. logger.info("Downloading firmware from %s", latest.download_url)
  268. if progress_callback:
  269. progress_callback(0, 0, "Starting download...")
  270. async with self._client.stream("GET", latest.download_url) as response:
  271. if response.status_code != 200:
  272. logger.error("Firmware download failed with status %s", response.status_code)
  273. return None
  274. total_size = int(response.headers.get("content-length", 0))
  275. downloaded = 0
  276. with open(temp_path, "wb") as f:
  277. async for chunk in response.aiter_bytes(chunk_size=65536):
  278. f.write(chunk)
  279. downloaded += len(chunk)
  280. if progress_callback:
  281. progress_callback(downloaded, total_size, "Downloading firmware...")
  282. # Also save a copy with the original filename for SD card
  283. original_path = self._get_firmware_cache_dir() / original_filename
  284. if original_path.exists():
  285. original_path.unlink()
  286. # Move temp to both cached path and original filename path
  287. import shutil
  288. shutil.copy2(temp_path, cached_path)
  289. temp_path.rename(original_path)
  290. logger.info("Firmware downloaded successfully: %s", original_path)
  291. if progress_callback:
  292. progress_callback(downloaded, total_size, "Download complete")
  293. return original_path
  294. except Exception as e:
  295. logger.error("Firmware download failed: %s", e)
  296. if temp_path.exists():
  297. try:
  298. temp_path.unlink()
  299. except OSError:
  300. pass # Best-effort cleanup of failed download temp file
  301. return None
  302. async def close(self):
  303. """Close the HTTP client."""
  304. await self._client.aclose()
  305. # Singleton instance
  306. _firmware_service: FirmwareCheckService | None = None
  307. def get_firmware_service() -> FirmwareCheckService:
  308. """Get the singleton firmware check service instance."""
  309. global _firmware_service
  310. if _firmware_service is None:
  311. _firmware_service = FirmwareCheckService()
  312. return _firmware_service