notification_service.py 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945
  1. """Notification service for sending push notifications via various providers."""
  2. import asyncio
  3. import json
  4. import logging
  5. import re
  6. import smtplib
  7. from datetime import datetime
  8. from email.mime.multipart import MIMEMultipart
  9. from email.mime.text import MIMEText
  10. from typing import Any
  11. from urllib.parse import quote
  12. import httpx
  13. from sqlalchemy import select
  14. from sqlalchemy.ext.asyncio import AsyncSession
  15. from backend.app.models.notification import NotificationLog, NotificationProvider, NotificationDigestQueue
  16. from backend.app.models.notification_template import NotificationTemplate
  17. from backend.app.models.settings import Settings
  18. logger = logging.getLogger(__name__)
  19. class NotificationService:
  20. """Service for sending notifications through various providers."""
  21. def __init__(self):
  22. self._http_client: httpx.AsyncClient | None = None
  23. self._template_cache: dict[str, NotificationTemplate] = {}
  24. self._digest_scheduler_task: asyncio.Task | None = None
  25. self._last_digest_check: str = "" # "HH:MM" to avoid duplicate checks
  26. async def _get_client(self) -> httpx.AsyncClient:
  27. """Get or create HTTP client."""
  28. if self._http_client is None or self._http_client.is_closed:
  29. self._http_client = httpx.AsyncClient(timeout=30.0)
  30. return self._http_client
  31. async def close(self):
  32. """Close HTTP client."""
  33. if self._http_client and not self._http_client.is_closed:
  34. await self._http_client.aclose()
  35. def _is_in_quiet_hours(self, provider: NotificationProvider) -> bool:
  36. """Check if current time is within provider's quiet hours."""
  37. if not provider.quiet_hours_enabled:
  38. return False
  39. if not provider.quiet_hours_start or not provider.quiet_hours_end:
  40. return False
  41. try:
  42. now = datetime.now()
  43. current_time = now.hour * 60 + now.minute
  44. start_parts = provider.quiet_hours_start.split(":")
  45. end_parts = provider.quiet_hours_end.split(":")
  46. start_minutes = int(start_parts[0]) * 60 + int(start_parts[1])
  47. end_minutes = int(end_parts[0]) * 60 + int(end_parts[1])
  48. # Handle overnight quiet hours (e.g., 22:00 to 07:00)
  49. if start_minutes > end_minutes:
  50. # Quiet hours span midnight
  51. return current_time >= start_minutes or current_time < end_minutes
  52. else:
  53. # Same day quiet hours
  54. return start_minutes <= current_time < end_minutes
  55. except (ValueError, TypeError, AttributeError):
  56. logger.warning(f"Invalid quiet hours format for provider {provider.name}")
  57. return False
  58. async def _get_template(self, db: AsyncSession, event_type: str) -> NotificationTemplate | None:
  59. """Get a notification template by event type."""
  60. # Check cache first
  61. if event_type in self._template_cache:
  62. return self._template_cache[event_type]
  63. result = await db.execute(
  64. select(NotificationTemplate).where(NotificationTemplate.event_type == event_type)
  65. )
  66. template = result.scalar_one_or_none()
  67. if template:
  68. self._template_cache[event_type] = template
  69. return template
  70. def _render_template(self, template_str: str, variables: dict[str, Any]) -> str:
  71. """Render a template string with variables. Missing variables become empty."""
  72. result = template_str
  73. for key, value in variables.items():
  74. result = result.replace("{" + key + "}", str(value) if value is not None else "")
  75. # Remove any remaining unreplaced placeholders
  76. result = re.sub(r"\{[a-z_]+\}", "", result)
  77. return result
  78. def _format_duration(self, seconds: int | None) -> str:
  79. """Format duration in seconds to human-readable string."""
  80. if seconds is None:
  81. return "Unknown"
  82. hours = seconds // 3600
  83. minutes = (seconds % 3600) // 60
  84. if hours > 0:
  85. return f"{hours}h {minutes}m"
  86. return f"{minutes}m"
  87. def _clean_filename(self, filename: str) -> str:
  88. """Remove file extensions from filename."""
  89. if filename.endswith(".gcode.3mf"):
  90. return filename[:-10]
  91. elif filename.endswith(".3mf"):
  92. return filename[:-4]
  93. return filename
  94. async def _build_message_from_template(
  95. self, db: AsyncSession, event_type: str, variables: dict[str, Any]
  96. ) -> tuple[str, str]:
  97. """Build notification title and body from template."""
  98. # Add common variables
  99. variables["timestamp"] = datetime.now().strftime("%Y-%m-%d %H:%M")
  100. variables["app_name"] = "Bambuddy"
  101. template = await self._get_template(db, event_type)
  102. if not template:
  103. # Fallback to simple message
  104. logger.warning(f"Template not found for event type: {event_type}")
  105. return event_type.replace("_", " ").title(), str(variables)
  106. title = self._render_template(template.title_template, variables)
  107. body = self._render_template(template.body_template, variables)
  108. return title, body
  109. async def send_test_notification(
  110. self, provider_type: str, config: dict[str, Any], db: AsyncSession | None = None
  111. ) -> tuple[bool, str]:
  112. """Send a test notification to verify configuration."""
  113. if db:
  114. title, message = await self._build_message_from_template(db, "test", {})
  115. else:
  116. title = "Bambuddy Test"
  117. message = "This is a test notification. If you see this, notifications are working!"
  118. try:
  119. if provider_type == "callmebot":
  120. return await self._send_callmebot(config, f"{title}\n{message}")
  121. elif provider_type == "ntfy":
  122. return await self._send_ntfy(config, title, message)
  123. elif provider_type == "pushover":
  124. return await self._send_pushover(config, title, message)
  125. elif provider_type == "telegram":
  126. return await self._send_telegram(config, f"*{title}*\n{message}")
  127. elif provider_type == "email":
  128. return await self._send_email(config, title, message)
  129. elif provider_type == "discord":
  130. return await self._send_discord(config, title, message)
  131. elif provider_type == "webhook":
  132. return await self._send_webhook(config, title, message)
  133. else:
  134. return False, f"Unknown provider type: {provider_type}"
  135. except Exception as e:
  136. logger.exception(f"Error sending test notification via {provider_type}")
  137. return False, str(e)
  138. async def _send_callmebot(self, config: dict, message: str) -> tuple[bool, str]:
  139. """Send notification via CallMeBot (WhatsApp)."""
  140. phone = config.get("phone", "").strip()
  141. apikey = config.get("apikey", "").strip()
  142. if not phone or not apikey:
  143. return False, "Phone number and API key are required"
  144. # URL encode the message
  145. encoded_message = quote(message)
  146. url = f"https://api.callmebot.com/whatsapp.php?phone={phone}&text={encoded_message}&apikey={apikey}"
  147. client = await self._get_client()
  148. response = await client.get(url)
  149. if response.status_code == 200:
  150. return True, "Message sent successfully"
  151. else:
  152. return False, f"HTTP {response.status_code}: {response.text[:200]}"
  153. async def _send_ntfy(self, config: dict, title: str, message: str) -> tuple[bool, str]:
  154. """Send notification via ntfy."""
  155. server = config.get("server", "https://ntfy.sh").rstrip("/")
  156. topic = config.get("topic", "").strip()
  157. auth_token = config.get("auth_token", "").strip()
  158. if not topic:
  159. return False, "Topic is required"
  160. url = f"{server}/{topic}"
  161. headers = {"Title": title}
  162. if auth_token:
  163. headers["Authorization"] = f"Bearer {auth_token}"
  164. client = await self._get_client()
  165. response = await client.post(url, content=message, headers=headers)
  166. if response.status_code in (200, 204):
  167. return True, "Message sent successfully"
  168. else:
  169. return False, f"HTTP {response.status_code}: {response.text[:200]}"
  170. async def _send_pushover(self, config: dict, title: str, message: str) -> tuple[bool, str]:
  171. """Send notification via Pushover."""
  172. user_key = config.get("user_key", "").strip()
  173. app_token = config.get("app_token", "").strip()
  174. priority = config.get("priority", 0)
  175. if not user_key or not app_token:
  176. return False, "User key and app token are required"
  177. url = "https://api.pushover.net/1/messages.json"
  178. data = {
  179. "token": app_token,
  180. "user": user_key,
  181. "title": title,
  182. "message": message,
  183. "priority": priority,
  184. }
  185. client = await self._get_client()
  186. response = await client.post(url, data=data)
  187. if response.status_code == 200:
  188. return True, "Message sent successfully"
  189. else:
  190. try:
  191. error_data = response.json()
  192. errors = error_data.get("errors", [])
  193. return False, f"Pushover error: {', '.join(errors)}"
  194. except Exception:
  195. return False, f"HTTP {response.status_code}: {response.text[:200]}"
  196. async def _send_telegram(self, config: dict, message: str) -> tuple[bool, str]:
  197. """Send notification via Telegram bot."""
  198. bot_token = config.get("bot_token", "").strip()
  199. chat_id = config.get("chat_id", "").strip()
  200. if not bot_token or not chat_id:
  201. return False, "Bot token and chat ID are required"
  202. url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
  203. data = {
  204. "chat_id": chat_id,
  205. "text": message,
  206. "parse_mode": "Markdown",
  207. }
  208. client = await self._get_client()
  209. response = await client.post(url, json=data)
  210. if response.status_code == 200:
  211. result = response.json()
  212. if result.get("ok"):
  213. return True, "Message sent successfully"
  214. else:
  215. return False, f"Telegram error: {result.get('description', 'Unknown error')}"
  216. else:
  217. return False, f"HTTP {response.status_code}: {response.text[:200]}"
  218. async def _send_email(self, config: dict, subject: str, body: str) -> tuple[bool, str]:
  219. """Send notification via email (SMTP)."""
  220. smtp_server = config.get("smtp_server", "").strip()
  221. smtp_port = int(config.get("smtp_port", 587))
  222. username = config.get("username", "").strip()
  223. password = config.get("password", "").strip()
  224. from_email = config.get("from_email", "").strip()
  225. to_email = config.get("to_email", "").strip()
  226. # Security: "starttls" (port 587), "ssl" (port 465), "none" (port 25)
  227. security = config.get("security", "starttls")
  228. # Authentication: "true" or "false"
  229. auth_enabled = config.get("auth_enabled", "true").lower() == "true"
  230. if not all([smtp_server, from_email, to_email]):
  231. return False, "SMTP server, from email, and to email are required"
  232. if auth_enabled and not all([username, password]):
  233. return False, "Username and password are required when authentication is enabled"
  234. try:
  235. msg = MIMEMultipart()
  236. msg["From"] = from_email
  237. msg["To"] = to_email
  238. msg["Subject"] = f"[Bambuddy] {subject}"
  239. msg.attach(MIMEText(body, "plain"))
  240. if security == "ssl":
  241. # Direct SSL connection (typically port 465)
  242. server = smtplib.SMTP_SSL(smtp_server, smtp_port)
  243. elif security == "starttls":
  244. # STARTTLS upgrade (typically port 587)
  245. server = smtplib.SMTP(smtp_server, smtp_port)
  246. server.starttls()
  247. else:
  248. # No encryption (typically port 25) - use with caution
  249. server = smtplib.SMTP(smtp_server, smtp_port)
  250. if auth_enabled:
  251. server.login(username, password)
  252. server.sendmail(from_email, to_email, msg.as_string())
  253. server.quit()
  254. return True, "Email sent successfully"
  255. except smtplib.SMTPAuthenticationError:
  256. return False, "SMTP authentication failed - check username/password"
  257. except smtplib.SMTPException as e:
  258. return False, f"SMTP error: {str(e)}"
  259. except Exception as e:
  260. return False, f"Email error: {str(e)}"
  261. async def _send_discord(self, config: dict, title: str, message: str) -> tuple[bool, str]:
  262. """Send notification via Discord webhook."""
  263. webhook_url = config.get("webhook_url", "").strip()
  264. if not webhook_url:
  265. return False, "Webhook URL is required"
  266. if not webhook_url.startswith("https://discord.com/api/webhooks/"):
  267. return False, "Invalid Discord webhook URL"
  268. # Discord embed format for nicer messages
  269. data = {
  270. "embeds": [{
  271. "title": title,
  272. "description": message,
  273. "color": 0x00AE42, # Bambu green
  274. }]
  275. }
  276. client = await self._get_client()
  277. response = await client.post(webhook_url, json=data)
  278. if response.status_code in (200, 204):
  279. return True, "Message sent successfully"
  280. else:
  281. return False, f"HTTP {response.status_code}: {response.text[:200]}"
  282. async def _send_webhook(self, config: dict, title: str, message: str) -> tuple[bool, str]:
  283. """Send notification via generic webhook (POST JSON)."""
  284. webhook_url = config.get("webhook_url", "").strip()
  285. auth_header = config.get("auth_header", "").strip()
  286. custom_field_title = config.get("field_title", "title").strip() or "title"
  287. custom_field_message = config.get("field_message", "message").strip() or "message"
  288. if not webhook_url:
  289. return False, "Webhook URL is required"
  290. # Build payload with custom field names
  291. data = {
  292. custom_field_title: title,
  293. custom_field_message: message,
  294. "timestamp": datetime.now().isoformat(),
  295. "source": "Bambuddy",
  296. }
  297. headers = {"Content-Type": "application/json"}
  298. if auth_header:
  299. # Support "Bearer token" or just "token" format
  300. if " " in auth_header:
  301. headers["Authorization"] = auth_header
  302. else:
  303. headers["Authorization"] = f"Bearer {auth_header}"
  304. client = await self._get_client()
  305. try:
  306. response = await client.post(webhook_url, json=data, headers=headers)
  307. if response.status_code in (200, 201, 202, 204):
  308. return True, "Webhook delivered successfully"
  309. else:
  310. return False, f"HTTP {response.status_code}: {response.text[:200]}"
  311. except Exception as e:
  312. return False, f"Webhook error: {str(e)}"
  313. async def _send_to_provider(
  314. self, provider: NotificationProvider, title: str, message: str, db: AsyncSession | None = None
  315. ) -> tuple[bool, str]:
  316. """Send notification to a specific provider."""
  317. # Check quiet hours
  318. if self._is_in_quiet_hours(provider):
  319. logger.info(f"Skipping notification to {provider.name} - quiet hours active")
  320. return True, "Skipped - quiet hours"
  321. config = json.loads(provider.config) if isinstance(provider.config, str) else provider.config
  322. try:
  323. if provider.provider_type == "callmebot":
  324. return await self._send_callmebot(config, f"{title}\n{message}")
  325. elif provider.provider_type == "ntfy":
  326. return await self._send_ntfy(config, title, message)
  327. elif provider.provider_type == "pushover":
  328. return await self._send_pushover(config, title, message)
  329. elif provider.provider_type == "telegram":
  330. return await self._send_telegram(config, f"*{title}*\n{message}")
  331. elif provider.provider_type == "email":
  332. return await self._send_email(config, title, message)
  333. elif provider.provider_type == "discord":
  334. return await self._send_discord(config, title, message)
  335. elif provider.provider_type == "webhook":
  336. return await self._send_webhook(config, title, message)
  337. else:
  338. return False, f"Unknown provider type: {provider.provider_type}"
  339. except Exception as e:
  340. logger.exception(f"Error sending notification via {provider.provider_type}")
  341. return False, str(e)
  342. async def _update_provider_status(
  343. self, db: AsyncSession, provider_id: int, success: bool, error: str | None = None
  344. ):
  345. """Update provider status after sending notification."""
  346. result = await db.execute(
  347. select(NotificationProvider).where(NotificationProvider.id == provider_id)
  348. )
  349. provider = result.scalar_one_or_none()
  350. if provider:
  351. if success:
  352. provider.last_success = datetime.utcnow()
  353. else:
  354. provider.last_error = error
  355. provider.last_error_at = datetime.utcnow()
  356. await db.commit()
  357. async def _get_providers_for_event(
  358. self,
  359. db: AsyncSession,
  360. event_field: str,
  361. printer_id: int | None = None,
  362. ) -> list[NotificationProvider]:
  363. """Get all enabled providers that want a specific event type."""
  364. # Build the query dynamically based on event field
  365. query = select(NotificationProvider).where(
  366. NotificationProvider.enabled == True,
  367. getattr(NotificationProvider, event_field) == True,
  368. )
  369. if printer_id is not None:
  370. query = query.where(
  371. (NotificationProvider.printer_id == None) | (NotificationProvider.printer_id == printer_id)
  372. )
  373. result = await db.execute(query)
  374. return list(result.scalars().all())
  375. async def _log_notification(
  376. self,
  377. db: AsyncSession,
  378. provider_id: int,
  379. event_type: str,
  380. title: str,
  381. message: str,
  382. success: bool,
  383. error_message: str | None = None,
  384. printer_id: int | None = None,
  385. printer_name: str | None = None,
  386. ):
  387. """Create a log entry for a sent notification."""
  388. try:
  389. log = NotificationLog(
  390. provider_id=provider_id,
  391. event_type=event_type,
  392. title=title,
  393. message=message,
  394. success=success,
  395. error_message=error_message,
  396. printer_id=printer_id,
  397. printer_name=printer_name,
  398. )
  399. db.add(log)
  400. await db.commit()
  401. except Exception as e:
  402. logger.warning(f"Failed to log notification: {e}")
  403. # Don't fail the notification just because logging failed
  404. async def _send_to_providers(
  405. self,
  406. providers: list[NotificationProvider],
  407. title: str,
  408. message: str,
  409. db: AsyncSession,
  410. event_type: str = "unknown",
  411. printer_id: int | None = None,
  412. printer_name: str | None = None,
  413. force_immediate: bool = False,
  414. ):
  415. """Send notification to multiple providers and log the results.
  416. All notifications are always sent immediately. If digest mode is enabled,
  417. the notification is ALSO queued for the daily digest summary.
  418. """
  419. for provider in providers:
  420. try:
  421. # Always send notification immediately
  422. success, error = await self._send_to_provider(provider, title, message, db)
  423. # Also queue for digest if enabled (digest is a summary, not a queue)
  424. if provider.daily_digest_enabled and provider.daily_digest_time:
  425. await self._queue_for_digest(
  426. provider=provider,
  427. event_type=event_type,
  428. title=title,
  429. message=message,
  430. db=db,
  431. printer_id=printer_id,
  432. printer_name=printer_name,
  433. )
  434. await self._update_provider_status(db, provider.id, success, error if not success else None)
  435. await self._log_notification(
  436. db=db,
  437. provider_id=provider.id,
  438. event_type=event_type,
  439. title=title,
  440. message=message,
  441. success=success,
  442. error_message=error if not success else None,
  443. printer_id=printer_id,
  444. printer_name=printer_name,
  445. )
  446. if success:
  447. logger.info(f"Sent notification via {provider.name}")
  448. else:
  449. logger.warning(f"Failed to send notification via {provider.name}: {error}")
  450. except Exception as e:
  451. logger.exception(f"Error sending notification via {provider.name}")
  452. await self._update_provider_status(db, provider.id, False, str(e))
  453. await self._log_notification(
  454. db=db,
  455. provider_id=provider.id,
  456. event_type=event_type,
  457. title=title,
  458. message=message,
  459. success=False,
  460. error_message=str(e),
  461. printer_id=printer_id,
  462. printer_name=printer_name,
  463. )
  464. async def on_print_start(
  465. self, printer_id: int, printer_name: str, data: dict, db: AsyncSession
  466. ):
  467. """Handle print start event - send notifications to relevant providers."""
  468. logger.info(f"on_print_start called for printer {printer_id} ({printer_name})")
  469. providers = await self._get_providers_for_event(db, "on_print_start", printer_id)
  470. if not providers:
  471. logger.info(f"No notification providers configured for print_start event on printer {printer_id}")
  472. return
  473. filename = self._clean_filename(data.get("filename", "Unknown"))
  474. estimated_time = data.get("raw_data", {}).get("print", {}).get("mc_remaining_time")
  475. time_str = self._format_duration(estimated_time * 60 if estimated_time else None)
  476. variables = {
  477. "printer": printer_name,
  478. "filename": filename,
  479. "estimated_time": time_str,
  480. }
  481. logger.info(f"Found {len(providers)} providers for print_start: {[p.name for p in providers]}")
  482. title, message = await self._build_message_from_template(db, "print_start", variables)
  483. await self._send_to_providers(providers, title, message, db, "print_start", printer_id, printer_name)
  484. async def on_print_complete(
  485. self,
  486. printer_id: int,
  487. printer_name: str,
  488. status: str,
  489. data: dict,
  490. db: AsyncSession,
  491. archive_data: dict | None = None,
  492. ):
  493. """Handle print complete event - send notifications to relevant providers."""
  494. logger.info(f"on_print_complete called for printer {printer_id} ({printer_name}), status={status}")
  495. # Determine event type based on status
  496. if status == "completed":
  497. event_field = "on_print_complete"
  498. event_type = "print_complete"
  499. elif status in ("failed",):
  500. event_field = "on_print_failed"
  501. event_type = "print_failed"
  502. elif status in ("aborted", "stopped", "cancelled"):
  503. event_field = "on_print_stopped"
  504. event_type = "print_stopped"
  505. else:
  506. logger.warning(f"Unknown print status '{status}', defaulting to on_print_complete")
  507. event_field = "on_print_complete"
  508. event_type = "print_complete"
  509. providers = await self._get_providers_for_event(db, event_field, printer_id)
  510. if not providers:
  511. logger.info(f"No notification providers configured for {event_field} event on printer {printer_id}")
  512. return
  513. filename = self._clean_filename(data.get("filename", "Unknown"))
  514. variables = {
  515. "printer": printer_name,
  516. "filename": filename,
  517. "duration": "",
  518. "filament_grams": "",
  519. "reason": "",
  520. }
  521. if archive_data:
  522. if archive_data.get("print_time_seconds"):
  523. variables["duration"] = self._format_duration(archive_data["print_time_seconds"])
  524. if archive_data.get("actual_filament_grams"):
  525. variables["filament_grams"] = f"{archive_data['actual_filament_grams']:.1f}"
  526. if status == "failed" and archive_data.get("failure_reason"):
  527. variables["reason"] = archive_data["failure_reason"]
  528. logger.info(f"Found {len(providers)} providers for {event_field}: {[p.name for p in providers]}")
  529. title, message = await self._build_message_from_template(db, event_type, variables)
  530. await self._send_to_providers(providers, title, message, db, event_type, printer_id, printer_name)
  531. async def on_print_progress(
  532. self,
  533. printer_id: int,
  534. printer_name: str,
  535. filename: str,
  536. progress: int,
  537. db: AsyncSession,
  538. remaining_time: int | None = None,
  539. ):
  540. """Handle print progress milestone (25%, 50%, 75%)."""
  541. providers = await self._get_providers_for_event(db, "on_print_progress", printer_id)
  542. if not providers:
  543. return
  544. variables = {
  545. "printer": printer_name,
  546. "filename": self._clean_filename(filename),
  547. "progress": str(progress),
  548. "remaining_time": self._format_duration(remaining_time) if remaining_time else "",
  549. }
  550. title, message = await self._build_message_from_template(db, "print_progress", variables)
  551. await self._send_to_providers(providers, title, message, db, "print_progress", printer_id, printer_name)
  552. async def on_printer_offline(
  553. self, printer_id: int, printer_name: str, db: AsyncSession
  554. ):
  555. """Handle printer offline event."""
  556. providers = await self._get_providers_for_event(db, "on_printer_offline", printer_id)
  557. if not providers:
  558. return
  559. variables = {"printer": printer_name}
  560. title, message = await self._build_message_from_template(db, "printer_offline", variables)
  561. await self._send_to_providers(providers, title, message, db, "printer_offline", printer_id, printer_name)
  562. async def on_printer_error(
  563. self,
  564. printer_id: int,
  565. printer_name: str,
  566. error_type: str,
  567. db: AsyncSession,
  568. error_detail: str | None = None,
  569. ):
  570. """Handle printer error event (AMS issues, etc.)."""
  571. providers = await self._get_providers_for_event(db, "on_printer_error", printer_id)
  572. if not providers:
  573. return
  574. variables = {
  575. "printer": printer_name,
  576. "error_type": error_type,
  577. "error_detail": error_detail or "",
  578. }
  579. title, message = await self._build_message_from_template(db, "printer_error", variables)
  580. await self._send_to_providers(providers, title, message, db, "printer_error", printer_id, printer_name)
  581. async def on_filament_low(
  582. self,
  583. printer_id: int,
  584. printer_name: str,
  585. slot: int,
  586. remaining_percent: int,
  587. db: AsyncSession,
  588. color: str | None = None,
  589. ):
  590. """Handle low filament event."""
  591. providers = await self._get_providers_for_event(db, "on_filament_low", printer_id)
  592. if not providers:
  593. return
  594. variables = {
  595. "printer": printer_name,
  596. "slot": str(slot),
  597. "remaining_percent": str(remaining_percent),
  598. "color": color or "",
  599. }
  600. title, message = await self._build_message_from_template(db, "filament_low", variables)
  601. await self._send_to_providers(providers, title, message, db, "filament_low", printer_id, printer_name)
  602. async def on_maintenance_due(
  603. self,
  604. printer_id: int,
  605. printer_name: str,
  606. maintenance_items: list[dict],
  607. db: AsyncSession,
  608. ):
  609. """Handle maintenance due event - sends notification when maintenance is due or warning."""
  610. if not maintenance_items:
  611. return
  612. providers = await self._get_providers_for_event(db, "on_maintenance_due", printer_id)
  613. if not providers:
  614. logger.info(f"No notification providers configured for maintenance_due event on printer {printer_id}")
  615. return
  616. # Format maintenance items list
  617. items_list = []
  618. for item in maintenance_items:
  619. status = "OVERDUE" if item.get("is_due") else "Soon"
  620. items_list.append(f"- {item['name']} ({status})")
  621. items_str = "\n".join(items_list)
  622. variables = {
  623. "printer": printer_name,
  624. "items": items_str,
  625. }
  626. logger.info(f"Found {len(providers)} providers for maintenance_due: {[p.name for p in providers]}")
  627. title, message = await self._build_message_from_template(db, "maintenance_due", variables)
  628. await self._send_to_providers(providers, title, message, db, "maintenance_due", printer_id, printer_name)
  629. async def on_ams_humidity_high(
  630. self,
  631. printer_id: int,
  632. printer_name: str,
  633. ams_label: str,
  634. humidity: float,
  635. threshold: float,
  636. db: AsyncSession,
  637. ):
  638. """Handle AMS high humidity alarm event. Always sends immediately (bypasses digest)."""
  639. providers = await self._get_providers_for_event(db, "on_ams_humidity_high", printer_id)
  640. if not providers:
  641. return
  642. variables = {
  643. "printer": printer_name,
  644. "ams_label": ams_label,
  645. "humidity": f"{humidity:.0f}",
  646. "threshold": f"{threshold:.0f}",
  647. }
  648. title, message = await self._build_message_from_template(db, "ams_humidity_high", variables)
  649. # Alarms always send immediately, bypassing digest mode
  650. await self._send_to_providers(providers, title, message, db, "ams_humidity_high", printer_id, printer_name, force_immediate=True)
  651. async def on_ams_temperature_high(
  652. self,
  653. printer_id: int,
  654. printer_name: str,
  655. ams_label: str,
  656. temperature: float,
  657. threshold: float,
  658. db: AsyncSession,
  659. ):
  660. """Handle AMS high temperature alarm event. Always sends immediately (bypasses digest)."""
  661. providers = await self._get_providers_for_event(db, "on_ams_temperature_high", printer_id)
  662. if not providers:
  663. return
  664. variables = {
  665. "printer": printer_name,
  666. "ams_label": ams_label,
  667. "temperature": f"{temperature:.1f}",
  668. "threshold": f"{threshold:.1f}",
  669. }
  670. title, message = await self._build_message_from_template(db, "ams_temperature_high", variables)
  671. # Alarms always send immediately, bypassing digest mode
  672. await self._send_to_providers(providers, title, message, db, "ams_temperature_high", printer_id, printer_name, force_immediate=True)
  673. def clear_template_cache(self):
  674. """Clear the template cache. Call this when templates are updated."""
  675. self._template_cache.clear()
  676. async def _queue_for_digest(
  677. self,
  678. provider: NotificationProvider,
  679. event_type: str,
  680. title: str,
  681. message: str,
  682. db: AsyncSession,
  683. printer_id: int | None = None,
  684. printer_name: str | None = None,
  685. ):
  686. """Queue a notification for later delivery in the daily digest."""
  687. try:
  688. queue_entry = NotificationDigestQueue(
  689. provider_id=provider.id,
  690. event_type=event_type,
  691. title=title,
  692. message=message,
  693. printer_id=printer_id,
  694. printer_name=printer_name,
  695. )
  696. db.add(queue_entry)
  697. await db.commit()
  698. logger.info(f"Queued notification for digest: {event_type} for provider {provider.name}")
  699. except Exception as e:
  700. logger.warning(f"Failed to queue notification for digest: {e}")
  701. async def send_digest(self, provider_id: int):
  702. """Send all queued notifications as a single digest for a provider."""
  703. from backend.app.core.database import async_session
  704. async with async_session() as db:
  705. # Get the provider
  706. result = await db.execute(
  707. select(NotificationProvider).where(NotificationProvider.id == provider_id)
  708. )
  709. provider = result.scalar_one_or_none()
  710. if not provider or not provider.enabled:
  711. return
  712. # Get all queued notifications for this provider
  713. result = await db.execute(
  714. select(NotificationDigestQueue)
  715. .where(NotificationDigestQueue.provider_id == provider_id)
  716. .order_by(NotificationDigestQueue.created_at)
  717. )
  718. queue_entries = list(result.scalars().all())
  719. if not queue_entries:
  720. logger.debug(f"No queued notifications for provider {provider.name}")
  721. return
  722. # Build digest message
  723. title = f"Daily Digest - {len(queue_entries)} Events"
  724. # Group by event type
  725. events_by_type: dict[str, list] = {}
  726. for entry in queue_entries:
  727. if entry.event_type not in events_by_type:
  728. events_by_type[entry.event_type] = []
  729. events_by_type[entry.event_type].append(entry)
  730. # Format the digest body
  731. body_parts = []
  732. for event_type, entries in events_by_type.items():
  733. event_label = event_type.replace("_", " ").title()
  734. body_parts.append(f"== {event_label} ({len(entries)}) ==")
  735. for entry in entries:
  736. time_str = entry.created_at.strftime("%H:%M")
  737. printer_info = f"[{entry.printer_name}] " if entry.printer_name else ""
  738. body_parts.append(f" {time_str} {printer_info}{entry.title}")
  739. body_parts.append("")
  740. body = "\n".join(body_parts)
  741. # Send the digest
  742. success, error = await self._send_to_provider(provider, title, body, db)
  743. # Log the digest
  744. await self._log_notification(
  745. db=db,
  746. provider_id=provider.id,
  747. event_type="daily_digest",
  748. title=title,
  749. message=body,
  750. success=success,
  751. error_message=error if not success else None,
  752. )
  753. # Clear the queue
  754. for entry in queue_entries:
  755. await db.delete(entry)
  756. await db.commit()
  757. if success:
  758. logger.info(f"Sent daily digest with {len(queue_entries)} events to {provider.name}")
  759. else:
  760. logger.warning(f"Failed to send daily digest to {provider.name}: {error}")
  761. async def check_and_send_digests(self):
  762. """Check all providers and send digests if it's their scheduled time."""
  763. from backend.app.core.database import async_session
  764. current_time = datetime.now().strftime("%H:%M")
  765. # Avoid duplicate checks within the same minute
  766. if current_time == self._last_digest_check:
  767. return
  768. self._last_digest_check = current_time
  769. async with async_session() as db:
  770. # Find all providers with digest enabled at this time
  771. result = await db.execute(
  772. select(NotificationProvider).where(
  773. NotificationProvider.enabled == True,
  774. NotificationProvider.daily_digest_enabled == True,
  775. NotificationProvider.daily_digest_time == current_time,
  776. )
  777. )
  778. providers = result.scalars().all()
  779. for provider in providers:
  780. try:
  781. await self.send_digest(provider.id)
  782. except Exception as e:
  783. logger.error(f"Error sending digest for provider {provider.id}: {e}")
  784. def start_digest_scheduler(self):
  785. """Start the background scheduler for daily digest notifications."""
  786. if self._digest_scheduler_task is None:
  787. self._digest_scheduler_task = asyncio.create_task(self._digest_scheduler_loop())
  788. logger.info("Notification digest scheduler started")
  789. def stop_digest_scheduler(self):
  790. """Stop the background scheduler for daily digests."""
  791. if self._digest_scheduler_task:
  792. self._digest_scheduler_task.cancel()
  793. self._digest_scheduler_task = None
  794. logger.info("Notification digest scheduler stopped")
  795. async def _digest_scheduler_loop(self):
  796. """Background loop that checks for scheduled digests every minute."""
  797. while True:
  798. try:
  799. await self.check_and_send_digests()
  800. except Exception as e:
  801. logger.error(f"Error in digest scheduler: {e}")
  802. # Wait until the next minute
  803. await asyncio.sleep(60)
  804. # Global instance
  805. notification_service = NotificationService()