notification_service.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595
  1. """Notification service for sending push notifications via various providers."""
  2. import json
  3. import logging
  4. import smtplib
  5. from datetime import datetime
  6. from email.mime.multipart import MIMEMultipart
  7. from email.mime.text import MIMEText
  8. from typing import Any
  9. from urllib.parse import quote
  10. import httpx
  11. from sqlalchemy import select
  12. from sqlalchemy.ext.asyncio import AsyncSession
  13. from backend.app.models.notification import NotificationProvider
  14. from backend.app.models.settings import Settings
  15. from backend.app.i18n import Translator
  16. logger = logging.getLogger(__name__)
  17. class NotificationService:
  18. """Service for sending notifications through various providers."""
  19. def __init__(self):
  20. self._http_client: httpx.AsyncClient | None = None
  21. async def _get_client(self) -> httpx.AsyncClient:
  22. """Get or create HTTP client."""
  23. if self._http_client is None or self._http_client.is_closed:
  24. self._http_client = httpx.AsyncClient(timeout=30.0)
  25. return self._http_client
  26. async def close(self):
  27. """Close HTTP client."""
  28. if self._http_client and not self._http_client.is_closed:
  29. await self._http_client.aclose()
  30. def _is_in_quiet_hours(self, provider: NotificationProvider) -> bool:
  31. """Check if current time is within provider's quiet hours."""
  32. if not provider.quiet_hours_enabled:
  33. return False
  34. if not provider.quiet_hours_start or not provider.quiet_hours_end:
  35. return False
  36. try:
  37. now = datetime.now()
  38. current_time = now.hour * 60 + now.minute
  39. start_parts = provider.quiet_hours_start.split(":")
  40. end_parts = provider.quiet_hours_end.split(":")
  41. start_minutes = int(start_parts[0]) * 60 + int(start_parts[1])
  42. end_minutes = int(end_parts[0]) * 60 + int(end_parts[1])
  43. # Handle overnight quiet hours (e.g., 22:00 to 07:00)
  44. if start_minutes > end_minutes:
  45. # Quiet hours span midnight
  46. return current_time >= start_minutes or current_time < end_minutes
  47. else:
  48. # Same day quiet hours
  49. return start_minutes <= current_time < end_minutes
  50. except (ValueError, TypeError, AttributeError):
  51. logger.warning(f"Invalid quiet hours format for provider {provider.name}")
  52. return False
  53. async def _get_notification_language(self, db: AsyncSession) -> str:
  54. """Get the notification language from settings."""
  55. result = await db.execute(
  56. select(Settings).where(Settings.key == "notification_language")
  57. )
  58. setting = result.scalar_one_or_none()
  59. return setting.value if setting else "en"
  60. def _format_duration(self, seconds: int | None, translator: Translator) -> str:
  61. """Format duration in seconds to human-readable string."""
  62. if seconds is None:
  63. return translator.t("notification.unknown")
  64. hours = seconds // 3600
  65. minutes = (seconds % 3600) // 60
  66. if hours > 0:
  67. return f"{hours}h {minutes}m"
  68. return f"{minutes}m"
  69. def _build_print_start_message(self, printer_name: str, data: dict, translator: Translator) -> tuple[str, str]:
  70. """Build notification message for print start event."""
  71. filename = data.get("filename", translator.t("notification.unknown"))
  72. # Clean up filename
  73. if filename.endswith(".gcode.3mf"):
  74. filename = filename[:-10]
  75. elif filename.endswith(".3mf"):
  76. filename = filename[:-4]
  77. title = translator.t("notification.print_started")
  78. estimated_time = data.get("raw_data", {}).get("print", {}).get("mc_remaining_time")
  79. time_str = self._format_duration(estimated_time * 60 if estimated_time else None, translator)
  80. message = f"{printer_name}: {filename}\n{translator.t('notification.estimated')}: {time_str}"
  81. return title, message
  82. def _build_print_complete_message(
  83. self, printer_name: str, status: str, data: dict, translator: Translator, archive_data: dict | None = None
  84. ) -> tuple[str, str]:
  85. """Build notification message for print complete event."""
  86. filename = data.get("filename", translator.t("notification.unknown"))
  87. if filename.endswith(".gcode.3mf"):
  88. filename = filename[:-10]
  89. elif filename.endswith(".3mf"):
  90. filename = filename[:-4]
  91. if status == "completed":
  92. title = translator.t("notification.print_completed")
  93. elif status == "failed":
  94. title = translator.t("notification.print_failed")
  95. elif status in ("aborted", "stopped", "cancelled"):
  96. title = translator.t("notification.print_stopped")
  97. else:
  98. title = translator.t("notification.print_ended")
  99. lines = [f"{printer_name}: {filename}"]
  100. if archive_data:
  101. # Add print time if available
  102. if archive_data.get("print_time_seconds"):
  103. lines.append(f"{translator.t('notification.time')}: {self._format_duration(archive_data['print_time_seconds'], translator)}")
  104. # Add filament used if available
  105. if archive_data.get("actual_filament_grams"):
  106. lines.append(f"{translator.t('notification.filament')}: {archive_data['actual_filament_grams']:.1f}g")
  107. # Add failure reason if failed
  108. if status == "failed" and archive_data.get("failure_reason"):
  109. lines.append(f"{translator.t('notification.reason')}: {archive_data['failure_reason']}")
  110. message = "\n".join(lines)
  111. return title, message
  112. def _build_progress_message(
  113. self, printer_name: str, filename: str, progress: int, translator: Translator
  114. ) -> tuple[str, str]:
  115. """Build notification message for print progress milestone."""
  116. if filename.endswith(".gcode.3mf"):
  117. filename = filename[:-10]
  118. elif filename.endswith(".3mf"):
  119. filename = filename[:-4]
  120. title = translator.t("notification.print_progress", progress=progress)
  121. message = f"{printer_name}: {filename}"
  122. return title, message
  123. def _build_printer_offline_message(self, printer_name: str, translator: Translator) -> tuple[str, str]:
  124. """Build notification message for printer offline event."""
  125. title = translator.t("notification.printer_offline")
  126. message = translator.t("notification.printer_disconnected", printer=printer_name)
  127. return title, message
  128. def _build_printer_error_message(
  129. self, printer_name: str, error_type: str, translator: Translator, error_detail: str | None = None
  130. ) -> tuple[str, str]:
  131. """Build notification message for printer error event."""
  132. title = translator.t("notification.printer_error", error_type=error_type)
  133. message = f"{printer_name}"
  134. if error_detail:
  135. message += f"\n{error_detail}"
  136. return title, message
  137. def _build_filament_low_message(
  138. self, printer_name: str, slot: int, remaining_percent: int, translator: Translator
  139. ) -> tuple[str, str]:
  140. """Build notification message for low filament event."""
  141. title = translator.t("notification.filament_low")
  142. message = translator.t("notification.slot_at_percent", printer=printer_name, slot=slot, percent=remaining_percent)
  143. return title, message
  144. def _build_maintenance_due_message(
  145. self, printer_name: str, maintenance_items: list[dict], translator: Translator
  146. ) -> tuple[str, str]:
  147. """Build notification message for maintenance due event."""
  148. title = translator.t("notification.maintenance_due")
  149. lines = [f"{printer_name}:"]
  150. for item in maintenance_items:
  151. status = translator.t("notification.overdue") if item.get("is_due") else translator.t("notification.soon")
  152. lines.append(f"• {item['name']} ({status})")
  153. message = "\n".join(lines)
  154. return title, message
  155. async def send_test_notification(
  156. self, provider_type: str, config: dict[str, Any], db: AsyncSession | None = None
  157. ) -> tuple[bool, str]:
  158. """Send a test notification to verify configuration."""
  159. lang = "en"
  160. if db:
  161. lang = await self._get_notification_language(db)
  162. translator = Translator(lang)
  163. title = translator.t("notification.test_title")
  164. message = translator.t("notification.test_message")
  165. try:
  166. if provider_type == "callmebot":
  167. return await self._send_callmebot(config, f"{title}\n{message}")
  168. elif provider_type == "ntfy":
  169. return await self._send_ntfy(config, title, message)
  170. elif provider_type == "pushover":
  171. return await self._send_pushover(config, title, message)
  172. elif provider_type == "telegram":
  173. return await self._send_telegram(config, f"*{title}*\n{message}")
  174. elif provider_type == "email":
  175. return await self._send_email(config, title, message)
  176. else:
  177. return False, f"Unknown provider type: {provider_type}"
  178. except Exception as e:
  179. logger.exception(f"Error sending test notification via {provider_type}")
  180. return False, str(e)
  181. async def _send_callmebot(self, config: dict, message: str) -> tuple[bool, str]:
  182. """Send notification via CallMeBot (WhatsApp)."""
  183. phone = config.get("phone", "").strip()
  184. apikey = config.get("apikey", "").strip()
  185. if not phone or not apikey:
  186. return False, "Phone number and API key are required"
  187. # URL encode the message
  188. encoded_message = quote(message)
  189. url = f"https://api.callmebot.com/whatsapp.php?phone={phone}&text={encoded_message}&apikey={apikey}"
  190. client = await self._get_client()
  191. response = await client.get(url)
  192. if response.status_code == 200:
  193. return True, "Message sent successfully"
  194. else:
  195. return False, f"HTTP {response.status_code}: {response.text[:200]}"
  196. async def _send_ntfy(self, config: dict, title: str, message: str) -> tuple[bool, str]:
  197. """Send notification via ntfy."""
  198. server = config.get("server", "https://ntfy.sh").rstrip("/")
  199. topic = config.get("topic", "").strip()
  200. auth_token = config.get("auth_token", "").strip()
  201. if not topic:
  202. return False, "Topic is required"
  203. url = f"{server}/{topic}"
  204. headers = {"Title": title}
  205. if auth_token:
  206. headers["Authorization"] = f"Bearer {auth_token}"
  207. client = await self._get_client()
  208. response = await client.post(url, content=message, headers=headers)
  209. if response.status_code in (200, 204):
  210. return True, "Message sent successfully"
  211. else:
  212. return False, f"HTTP {response.status_code}: {response.text[:200]}"
  213. async def _send_pushover(self, config: dict, title: str, message: str) -> tuple[bool, str]:
  214. """Send notification via Pushover."""
  215. user_key = config.get("user_key", "").strip()
  216. app_token = config.get("app_token", "").strip()
  217. priority = config.get("priority", 0)
  218. if not user_key or not app_token:
  219. return False, "User key and app token are required"
  220. url = "https://api.pushover.net/1/messages.json"
  221. data = {
  222. "token": app_token,
  223. "user": user_key,
  224. "title": title,
  225. "message": message,
  226. "priority": priority,
  227. }
  228. client = await self._get_client()
  229. response = await client.post(url, data=data)
  230. if response.status_code == 200:
  231. return True, "Message sent successfully"
  232. else:
  233. try:
  234. error_data = response.json()
  235. errors = error_data.get("errors", [])
  236. return False, f"Pushover error: {', '.join(errors)}"
  237. except Exception:
  238. return False, f"HTTP {response.status_code}: {response.text[:200]}"
  239. async def _send_telegram(self, config: dict, message: str) -> tuple[bool, str]:
  240. """Send notification via Telegram bot."""
  241. bot_token = config.get("bot_token", "").strip()
  242. chat_id = config.get("chat_id", "").strip()
  243. if not bot_token or not chat_id:
  244. return False, "Bot token and chat ID are required"
  245. url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
  246. data = {
  247. "chat_id": chat_id,
  248. "text": message,
  249. "parse_mode": "Markdown",
  250. }
  251. client = await self._get_client()
  252. response = await client.post(url, json=data)
  253. if response.status_code == 200:
  254. result = response.json()
  255. if result.get("ok"):
  256. return True, "Message sent successfully"
  257. else:
  258. return False, f"Telegram error: {result.get('description', 'Unknown error')}"
  259. else:
  260. return False, f"HTTP {response.status_code}: {response.text[:200]}"
  261. async def _send_email(self, config: dict, subject: str, body: str) -> tuple[bool, str]:
  262. """Send notification via email (SMTP)."""
  263. smtp_server = config.get("smtp_server", "").strip()
  264. smtp_port = int(config.get("smtp_port", 587))
  265. username = config.get("username", "").strip()
  266. password = config.get("password", "").strip()
  267. from_email = config.get("from_email", "").strip()
  268. to_email = config.get("to_email", "").strip()
  269. # Security: "starttls" (port 587), "ssl" (port 465), "none" (port 25)
  270. security = config.get("security", "starttls")
  271. # Authentication: "true" or "false"
  272. auth_enabled = config.get("auth_enabled", "true").lower() == "true"
  273. if not all([smtp_server, from_email, to_email]):
  274. return False, "SMTP server, from email, and to email are required"
  275. if auth_enabled and not all([username, password]):
  276. return False, "Username and password are required when authentication is enabled"
  277. try:
  278. msg = MIMEMultipart()
  279. msg["From"] = from_email
  280. msg["To"] = to_email
  281. msg["Subject"] = f"[BambuTrack] {subject}"
  282. msg.attach(MIMEText(body, "plain"))
  283. if security == "ssl":
  284. # Direct SSL connection (typically port 465)
  285. server = smtplib.SMTP_SSL(smtp_server, smtp_port)
  286. elif security == "starttls":
  287. # STARTTLS upgrade (typically port 587)
  288. server = smtplib.SMTP(smtp_server, smtp_port)
  289. server.starttls()
  290. else:
  291. # No encryption (typically port 25) - use with caution
  292. server = smtplib.SMTP(smtp_server, smtp_port)
  293. if auth_enabled:
  294. server.login(username, password)
  295. server.sendmail(from_email, to_email, msg.as_string())
  296. server.quit()
  297. return True, "Email sent successfully"
  298. except smtplib.SMTPAuthenticationError:
  299. return False, "SMTP authentication failed - check username/password"
  300. except smtplib.SMTPException as e:
  301. return False, f"SMTP error: {str(e)}"
  302. except Exception as e:
  303. return False, f"Email error: {str(e)}"
  304. async def _send_to_provider(
  305. self, provider: NotificationProvider, title: str, message: str
  306. ) -> tuple[bool, str]:
  307. """Send notification to a specific provider."""
  308. # Check quiet hours
  309. if self._is_in_quiet_hours(provider):
  310. logger.info(f"Skipping notification to {provider.name} - quiet hours active")
  311. return True, "Skipped - quiet hours"
  312. config = json.loads(provider.config) if isinstance(provider.config, str) else provider.config
  313. try:
  314. if provider.provider_type == "callmebot":
  315. return await self._send_callmebot(config, f"{title}\n{message}")
  316. elif provider.provider_type == "ntfy":
  317. return await self._send_ntfy(config, title, message)
  318. elif provider.provider_type == "pushover":
  319. return await self._send_pushover(config, title, message)
  320. elif provider.provider_type == "telegram":
  321. return await self._send_telegram(config, f"*{title}*\n{message}")
  322. elif provider.provider_type == "email":
  323. return await self._send_email(config, title, message)
  324. else:
  325. return False, f"Unknown provider type: {provider.provider_type}"
  326. except Exception as e:
  327. logger.exception(f"Error sending notification via {provider.provider_type}")
  328. return False, str(e)
  329. async def _update_provider_status(
  330. self, db: AsyncSession, provider_id: int, success: bool, error: str | None = None
  331. ):
  332. """Update provider status after sending notification."""
  333. result = await db.execute(
  334. select(NotificationProvider).where(NotificationProvider.id == provider_id)
  335. )
  336. provider = result.scalar_one_or_none()
  337. if provider:
  338. if success:
  339. provider.last_success = datetime.utcnow()
  340. else:
  341. provider.last_error = error
  342. provider.last_error_at = datetime.utcnow()
  343. await db.commit()
  344. async def _get_providers_for_event(
  345. self,
  346. db: AsyncSession,
  347. event_field: str,
  348. printer_id: int | None = None,
  349. ) -> list[NotificationProvider]:
  350. """Get all enabled providers that want a specific event type."""
  351. # Build the query dynamically based on event field
  352. query = select(NotificationProvider).where(
  353. NotificationProvider.enabled == True,
  354. getattr(NotificationProvider, event_field) == True,
  355. )
  356. if printer_id is not None:
  357. query = query.where(
  358. (NotificationProvider.printer_id == None) | (NotificationProvider.printer_id == printer_id)
  359. )
  360. result = await db.execute(query)
  361. return list(result.scalars().all())
  362. async def _send_to_providers(
  363. self,
  364. providers: list[NotificationProvider],
  365. title: str,
  366. message: str,
  367. db: AsyncSession,
  368. ):
  369. """Send notification to multiple providers."""
  370. for provider in providers:
  371. try:
  372. success, error = await self._send_to_provider(provider, title, message)
  373. await self._update_provider_status(db, provider.id, success, error if not success else None)
  374. if success:
  375. logger.info(f"Sent notification via {provider.name}")
  376. else:
  377. logger.warning(f"Failed to send notification via {provider.name}: {error}")
  378. except Exception as e:
  379. logger.exception(f"Error sending notification via {provider.name}")
  380. await self._update_provider_status(db, provider.id, False, str(e))
  381. async def on_print_start(
  382. self, printer_id: int, printer_name: str, data: dict, db: AsyncSession
  383. ):
  384. """Handle print start event - send notifications to relevant providers."""
  385. logger.info(f"on_print_start called for printer {printer_id} ({printer_name})")
  386. providers = await self._get_providers_for_event(db, "on_print_start", printer_id)
  387. if not providers:
  388. logger.info(f"No notification providers configured for print_start event on printer {printer_id}")
  389. return
  390. lang = await self._get_notification_language(db)
  391. translator = Translator(lang)
  392. logger.info(f"Found {len(providers)} providers for print_start: {[p.name for p in providers]}")
  393. title, message = self._build_print_start_message(printer_name, data, translator)
  394. await self._send_to_providers(providers, title, message, db)
  395. async def on_print_complete(
  396. self,
  397. printer_id: int,
  398. printer_name: str,
  399. status: str,
  400. data: dict,
  401. db: AsyncSession,
  402. archive_data: dict | None = None,
  403. ):
  404. """Handle print complete event - send notifications to relevant providers."""
  405. logger.info(f"on_print_complete called for printer {printer_id} ({printer_name}), status={status}")
  406. # Determine which event type this is
  407. if status == "completed":
  408. event_field = "on_print_complete"
  409. elif status in ("failed",):
  410. event_field = "on_print_failed"
  411. elif status in ("aborted", "stopped", "cancelled"):
  412. event_field = "on_print_stopped"
  413. else:
  414. # Unknown status, default to on_print_complete
  415. logger.warning(f"Unknown print status '{status}', defaulting to on_print_complete")
  416. event_field = "on_print_complete"
  417. providers = await self._get_providers_for_event(db, event_field, printer_id)
  418. if not providers:
  419. logger.info(f"No notification providers configured for {event_field} event on printer {printer_id}")
  420. return
  421. lang = await self._get_notification_language(db)
  422. translator = Translator(lang)
  423. logger.info(f"Found {len(providers)} providers for {event_field}: {[p.name for p in providers]}")
  424. title, message = self._build_print_complete_message(printer_name, status, data, translator, archive_data)
  425. await self._send_to_providers(providers, title, message, db)
  426. async def on_print_progress(
  427. self,
  428. printer_id: int,
  429. printer_name: str,
  430. filename: str,
  431. progress: int,
  432. db: AsyncSession,
  433. ):
  434. """Handle print progress milestone (25%, 50%, 75%)."""
  435. providers = await self._get_providers_for_event(db, "on_print_progress", printer_id)
  436. if not providers:
  437. return
  438. lang = await self._get_notification_language(db)
  439. translator = Translator(lang)
  440. title, message = self._build_progress_message(printer_name, filename, progress, translator)
  441. await self._send_to_providers(providers, title, message, db)
  442. async def on_printer_offline(
  443. self, printer_id: int, printer_name: str, db: AsyncSession
  444. ):
  445. """Handle printer offline event."""
  446. providers = await self._get_providers_for_event(db, "on_printer_offline", printer_id)
  447. if not providers:
  448. return
  449. lang = await self._get_notification_language(db)
  450. translator = Translator(lang)
  451. title, message = self._build_printer_offline_message(printer_name, translator)
  452. await self._send_to_providers(providers, title, message, db)
  453. async def on_printer_error(
  454. self,
  455. printer_id: int,
  456. printer_name: str,
  457. error_type: str,
  458. db: AsyncSession,
  459. error_detail: str | None = None,
  460. ):
  461. """Handle printer error event (AMS issues, etc.)."""
  462. providers = await self._get_providers_for_event(db, "on_printer_error", printer_id)
  463. if not providers:
  464. return
  465. lang = await self._get_notification_language(db)
  466. translator = Translator(lang)
  467. title, message = self._build_printer_error_message(printer_name, error_type, translator, error_detail)
  468. await self._send_to_providers(providers, title, message, db)
  469. async def on_filament_low(
  470. self,
  471. printer_id: int,
  472. printer_name: str,
  473. slot: int,
  474. remaining_percent: int,
  475. db: AsyncSession,
  476. ):
  477. """Handle low filament event."""
  478. providers = await self._get_providers_for_event(db, "on_filament_low", printer_id)
  479. if not providers:
  480. return
  481. lang = await self._get_notification_language(db)
  482. translator = Translator(lang)
  483. title, message = self._build_filament_low_message(printer_name, slot, remaining_percent, translator)
  484. await self._send_to_providers(providers, title, message, db)
  485. async def on_maintenance_due(
  486. self,
  487. printer_id: int,
  488. printer_name: str,
  489. maintenance_items: list[dict],
  490. db: AsyncSession,
  491. ):
  492. """Handle maintenance due event - sends notification when maintenance is due or warning."""
  493. if not maintenance_items:
  494. return
  495. providers = await self._get_providers_for_event(db, "on_maintenance_due", printer_id)
  496. if not providers:
  497. logger.info(f"No notification providers configured for maintenance_due event on printer {printer_id}")
  498. return
  499. lang = await self._get_notification_language(db)
  500. translator = Translator(lang)
  501. logger.info(f"Found {len(providers)} providers for maintenance_due: {[p.name for p in providers]}")
  502. title, message = self._build_maintenance_due_message(printer_name, maintenance_items, translator)
  503. await self._send_to_providers(providers, title, message, db)
  504. # Global instance
  505. notification_service = NotificationService()