firmware_check.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385
  1. """
  2. Firmware Check Service
  3. Checks for firmware updates by fetching from Bambu Lab's official firmware download page.
  4. Also provides firmware download functionality for offline updates.
  5. """
  6. import logging
  7. import re
  8. import time
  9. from collections.abc import Callable
  10. from dataclasses import dataclass
  11. from pathlib import Path
  12. import httpx
  13. from backend.app.core.config import _data_dir
  14. logger = logging.getLogger(__name__)
  15. # Bambu Lab firmware download page
  16. BAMBU_FIRMWARE_BASE = "https://bambulab.com"
  17. FIRMWARE_PAGE = "/en/support/firmware-download/all"
  18. # Cache TTL in seconds (1 hour)
  19. CACHE_TTL = 3600
  20. # Map Bambuddy model names to Bambu Lab API keys
  21. MODEL_TO_API_KEY = {
  22. "X1": "x1",
  23. "X1C": "x1",
  24. "X1-Carbon": "x1",
  25. "X1 Carbon": "x1",
  26. "P1P": "p1",
  27. "P1S": "p1",
  28. "A1": "a1",
  29. "A1 Mini": "a1-mini",
  30. "A1-Mini": "a1-mini",
  31. "A1mini": "a1-mini",
  32. "H2D": "h2d",
  33. "H2C": "h2d", # H2C uses same firmware as H2D
  34. "H2S": "h2s",
  35. "P2S": "p2s",
  36. "X1E": "x1e",
  37. "H2D Pro": "h2d-pro",
  38. "H2D-Pro": "h2d-pro",
  39. "H2DPRO": "h2d-pro",
  40. }
  41. # Reverse mapping: API key to model codes
  42. API_KEY_TO_DEV_MODEL = {
  43. "x1": "BL-P001",
  44. "p1": "C11",
  45. "a1": "N2S",
  46. "a1-mini": "N1",
  47. "h2d": "O1D",
  48. "h2s": "O1S",
  49. "p2s": "N7",
  50. "x1e": "C13",
  51. "h2d-pro": "O1E",
  52. }
  53. @dataclass
  54. class FirmwareVersion:
  55. """Firmware version information."""
  56. version: str
  57. download_url: str
  58. release_notes: str | None = None
  59. release_time: str | None = None
  60. class FirmwareCheckService:
  61. """Service for checking firmware updates from Bambu Lab."""
  62. def __init__(self):
  63. self._build_id: str | None = None
  64. self._build_id_time: float = 0
  65. self._version_cache: dict[str, FirmwareVersion] = {}
  66. self._cache_time: float = 0
  67. self._client = httpx.AsyncClient(
  68. timeout=30.0,
  69. headers={
  70. "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
  71. },
  72. )
  73. async def _get_build_id(self) -> str | None:
  74. """Fetch the Next.js build ID from Bambu Lab's firmware page."""
  75. # Use cached build ID if still valid (cache for 1 hour)
  76. if self._build_id and (time.time() - self._build_id_time) < CACHE_TTL:
  77. return self._build_id
  78. try:
  79. response = await self._client.get(f"{BAMBU_FIRMWARE_BASE}{FIRMWARE_PAGE}")
  80. if response.status_code == 200:
  81. # Extract buildId from the page
  82. match = re.search(r'"buildId":"([^"]+)"', response.text)
  83. if match:
  84. self._build_id = match.group(1)
  85. self._build_id_time = time.time()
  86. logger.info("Got Bambu Lab build ID: %s", self._build_id)
  87. return self._build_id
  88. logger.warning("Failed to get Bambu Lab page: %s", response.status_code)
  89. except Exception as e:
  90. logger.error("Error fetching Bambu Lab build ID: %s", e)
  91. return self._build_id # Return cached value if available
  92. async def _fetch_firmware_versions(self, api_key: str) -> FirmwareVersion | None:
  93. """Fetch firmware versions for a specific printer from Bambu Lab API."""
  94. build_id = await self._get_build_id()
  95. if not build_id:
  96. logger.warning("No build ID available, cannot fetch firmware versions")
  97. return None
  98. try:
  99. url = f"{BAMBU_FIRMWARE_BASE}/_next/data/{build_id}/en/support/firmware-download/{api_key}.json"
  100. response = await self._client.get(url)
  101. if response.status_code == 200:
  102. data = response.json()
  103. page_props = data.get("pageProps", {})
  104. printer_map = page_props.get("printerMap", {})
  105. printer_data = printer_map.get(api_key, {})
  106. versions = printer_data.get("versions", [])
  107. if versions:
  108. latest = versions[0]
  109. return FirmwareVersion(
  110. version=latest.get("version", ""),
  111. download_url=latest.get("url", ""),
  112. release_notes=latest.get("release_notes_en"),
  113. release_time=latest.get("release_time"),
  114. )
  115. else:
  116. # api_key is a printer model identifier (e.g. "x1", "p1"), not a secret
  117. logger.warning("Failed to fetch firmware for %s: %s", api_key, response.status_code)
  118. except Exception as e:
  119. # api_key is a printer model identifier (e.g. "x1", "p1"), not a secret
  120. logger.error("Error fetching firmware for %s: %s", api_key, e)
  121. return None
  122. async def get_latest_version(self, model: str) -> FirmwareVersion | None:
  123. """
  124. Get the latest firmware version for a printer model.
  125. Args:
  126. model: Bambuddy printer model name (e.g., "X1C", "P1S", "H2D")
  127. Returns:
  128. FirmwareVersion if found, None otherwise
  129. """
  130. # Normalize model name
  131. model_upper = model.upper().replace(" ", "").replace("-", "")
  132. # Find the API key for this model
  133. api_key = None
  134. for model_name, key in MODEL_TO_API_KEY.items():
  135. if model_name.upper().replace(" ", "").replace("-", "") == model_upper:
  136. api_key = key
  137. break
  138. if not api_key:
  139. # Try direct lookup with original model
  140. api_key = MODEL_TO_API_KEY.get(model)
  141. if not api_key:
  142. logger.debug("Unknown printer model: %s", model)
  143. return None
  144. # Check cache
  145. cache_key = api_key
  146. if cache_key in self._version_cache and (time.time() - self._cache_time) < CACHE_TTL:
  147. return self._version_cache[cache_key]
  148. # Fetch from API
  149. version = await self._fetch_firmware_versions(api_key)
  150. if version:
  151. self._version_cache[cache_key] = version
  152. self._cache_time = time.time()
  153. return version
  154. async def check_for_update(self, model: str, current_version: str) -> dict:
  155. """
  156. Check if a firmware update is available for a printer.
  157. Args:
  158. model: Printer model name
  159. current_version: Currently installed firmware version
  160. Returns:
  161. Dict with update info:
  162. - update_available: bool
  163. - current_version: str
  164. - latest_version: str or None
  165. - download_url: str or None
  166. - release_notes: str or None
  167. """
  168. result = {
  169. "update_available": False,
  170. "current_version": current_version,
  171. "latest_version": None,
  172. "download_url": None,
  173. "release_notes": None,
  174. }
  175. if not current_version:
  176. return result
  177. latest = await self.get_latest_version(model)
  178. if not latest:
  179. return result
  180. result["latest_version"] = latest.version
  181. result["download_url"] = latest.download_url
  182. result["release_notes"] = latest.release_notes
  183. # Compare versions (format: XX.XX.XX.XX)
  184. try:
  185. current_parts = [int(x) for x in current_version.split(".")]
  186. latest_parts = [int(x) for x in latest.version.split(".")]
  187. # Pad to same length
  188. while len(current_parts) < 4:
  189. current_parts.append(0)
  190. while len(latest_parts) < 4:
  191. latest_parts.append(0)
  192. result["update_available"] = latest_parts > current_parts
  193. except (ValueError, AttributeError):
  194. logger.warning("Could not compare versions: %s vs %s", current_version, latest.version)
  195. return result
  196. async def get_all_latest_versions(self) -> dict[str, FirmwareVersion]:
  197. """
  198. Fetch latest firmware versions for all known printer models.
  199. Returns:
  200. Dict mapping API key to FirmwareVersion
  201. """
  202. results = {}
  203. for api_key in API_KEY_TO_DEV_MODEL:
  204. version = await self._fetch_firmware_versions(api_key)
  205. if version:
  206. results[api_key] = version
  207. return results
  208. def _get_firmware_cache_dir(self) -> Path:
  209. """Get the firmware cache directory, creating it if needed."""
  210. cache_dir = _data_dir / "firmware"
  211. cache_dir.mkdir(parents=True, exist_ok=True)
  212. return cache_dir
  213. def _get_cached_firmware_path(self, model: str, version: str) -> Path:
  214. """Get the path where a firmware file would be cached."""
  215. # Normalize model name for filename
  216. model_safe = model.upper().replace(" ", "-").replace("/", "-")
  217. version_safe = version.replace(".", "_")
  218. filename = f"{model_safe}_{version_safe}.bin"
  219. return self._get_firmware_cache_dir() / filename
  220. async def get_firmware_file_info(self, model: str) -> dict | None:
  221. """
  222. Get information about the firmware file for a model.
  223. Returns:
  224. Dict with download_url, version, filename, and estimated_size (if available)
  225. """
  226. latest = await self.get_latest_version(model)
  227. if not latest or not latest.download_url:
  228. return None
  229. # Extract filename from URL
  230. url_parts = latest.download_url.split("/")
  231. filename = url_parts[-1] if url_parts else f"firmware_{model}.bin"
  232. return {
  233. "download_url": latest.download_url,
  234. "version": latest.version,
  235. "filename": filename,
  236. "release_notes": latest.release_notes,
  237. }
  238. async def download_firmware(
  239. self,
  240. model: str,
  241. progress_callback: Callable[[int, int, str], None] | None = None,
  242. ) -> Path | None:
  243. """
  244. Download firmware file for a printer model.
  245. Args:
  246. model: Printer model name (e.g., "X1C", "P1S", "H2D")
  247. progress_callback: Optional callback(bytes_downloaded, total_bytes, status_message)
  248. Returns:
  249. Path to downloaded firmware file, or None on failure
  250. """
  251. latest = await self.get_latest_version(model)
  252. if not latest or not latest.download_url:
  253. logger.warning("No firmware download URL available for model: %s", model)
  254. return None
  255. # Check if already cached
  256. cached_path = self._get_cached_firmware_path(model, latest.version)
  257. if cached_path.exists():
  258. logger.info("Using cached firmware: %s", cached_path)
  259. return cached_path
  260. # Extract original filename from URL (must preserve for SD card update)
  261. url_parts = latest.download_url.split("/")
  262. original_filename = url_parts[-1] if url_parts else f"firmware_{model}.bin"
  263. # Download to temp file first
  264. temp_path = self._get_firmware_cache_dir() / f".downloading_{original_filename}"
  265. try:
  266. logger.info("Downloading firmware from %s", latest.download_url)
  267. if progress_callback:
  268. progress_callback(0, 0, "Starting download...")
  269. async with self._client.stream("GET", latest.download_url) as response:
  270. if response.status_code != 200:
  271. logger.error("Firmware download failed with status %s", response.status_code)
  272. return None
  273. total_size = int(response.headers.get("content-length", 0))
  274. downloaded = 0
  275. with open(temp_path, "wb") as f:
  276. async for chunk in response.aiter_bytes(chunk_size=65536):
  277. f.write(chunk)
  278. downloaded += len(chunk)
  279. if progress_callback:
  280. progress_callback(downloaded, total_size, "Downloading firmware...")
  281. # Also save a copy with the original filename for SD card
  282. original_path = self._get_firmware_cache_dir() / original_filename
  283. if original_path.exists():
  284. original_path.unlink()
  285. # Move temp to both cached path and original filename path
  286. import shutil
  287. shutil.copy2(temp_path, cached_path)
  288. temp_path.rename(original_path)
  289. logger.info("Firmware downloaded successfully: %s", original_path)
  290. if progress_callback:
  291. progress_callback(downloaded, total_size, "Download complete")
  292. return original_path
  293. except Exception as e:
  294. logger.error("Firmware download failed: %s", e)
  295. if temp_path.exists():
  296. try:
  297. temp_path.unlink()
  298. except OSError:
  299. pass # Best-effort cleanup of failed download temp file
  300. return None
  301. async def close(self):
  302. """Close the HTTP client."""
  303. await self._client.aclose()
  304. # Singleton instance
  305. _firmware_service: FirmwareCheckService | None = None
  306. def get_firmware_service() -> FirmwareCheckService:
  307. """Get the singleton firmware check service instance."""
  308. global _firmware_service
  309. if _firmware_service is None:
  310. _firmware_service = FirmwareCheckService()
  311. return _firmware_service