notification_service.py 74 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925
  1. """Notification service for sending push notifications via various providers."""
  2. import asyncio
  3. import json
  4. import logging
  5. import re
  6. import smtplib
  7. from datetime import datetime, timedelta, timezone
  8. from email.mime.multipart import MIMEMultipart
  9. from email.mime.text import MIMEText
  10. from typing import Any
  11. from urllib.parse import quote
  12. import httpx
  13. from sqlalchemy import select
  14. from sqlalchemy.ext.asyncio import AsyncSession
  15. from backend.app.models.notification import NotificationDigestQueue, NotificationLog, NotificationProvider
  16. from backend.app.models.notification_template import NotificationTemplate
  17. logger = logging.getLogger(__name__)
  18. # Honest User-Agent — matches the convention used by every other outbound
  19. # httpx client in the codebase (bambu_cloud, makerworld, firmware_check,
  20. # inventory). Previously this client leaked python-httpx/<version>, which
  21. # was both inconsistent with the rest of the project and a more obvious
  22. # bot signature for upstream WAFs.
  23. _USER_AGENT = "Bambuddy/1.0 (+https://github.com/maziggy/bambuddy)"
  24. def _looks_like_cloudflare_challenge(response: httpx.Response) -> bool:
  25. """Return True if ``response`` looks like a Cloudflare mitigation
  26. interstitial (JS challenge / managed challenge / block page) rather
  27. than a legitimate response passed through Cloudflare.
  28. Self-hosted servers behind Cloudflare (Tunnel, "Bot Fight Mode", or
  29. "Under Attack" mode) intercept non-browser clients at the edge and
  30. return a challenge HTML page instead of forwarding to the origin —
  31. so we never reach the user's actual ntfy / webhook backend.
  32. Cloudflare cannot be defeated from a Python client; the user has to
  33. add a security-skip rule on their side. We detect the shape so the
  34. UI can tell them that, instead of dumping the raw HTML.
  35. Detection deliberately does NOT rely on ``Server: cloudflare`` alone
  36. — Cloudflare adds that header to every response it proxies (success
  37. AND legitimate origin errors), so a real 401 "wrong token" from a
  38. CF-fronted ntfy would false-positive into a misleading "your CF is
  39. blocking" message. Reliable signals: the ``cf-mitigated`` header
  40. (set only when CF actively mitigates) and the challenge body shape.
  41. """
  42. if response.headers.get("cf-mitigated"):
  43. return True
  44. content_type = (response.headers.get("content-type") or "").lower()
  45. if "html" not in content_type:
  46. return False
  47. body = (response.text or "")[:1024].lower()
  48. # "Just a moment..." is Cloudflare's universal challenge-page title
  49. # (managed challenge, JS challenge, Under Attack mode). Combined with
  50. # an HTML content-type this is unambiguous — no legitimate ntfy or
  51. # webhook backend returns HTML with that title. ``cf-chl-*`` and
  52. # ``challenge-platform`` cover newer / non-default CF templates.
  53. return "just a moment" in body or "cf-chl-bypass" in body or "cf-chl-opt" in body or "challenge-platform" in body
  54. class NotificationService:
  55. """Service for sending notifications through various providers."""
  56. def __init__(self):
  57. self._http_client: httpx.AsyncClient | None = None
  58. self._template_cache: dict[str, NotificationTemplate] = {}
  59. self._digest_scheduler_task: asyncio.Task | None = None
  60. self._last_digest_check: str = "" # "HH:MM" to avoid duplicate checks
  61. async def _get_client(self) -> httpx.AsyncClient:
  62. """Get or create HTTP client."""
  63. if self._http_client is None or self._http_client.is_closed:
  64. self._http_client = httpx.AsyncClient(
  65. timeout=30.0,
  66. headers={"User-Agent": _USER_AGENT},
  67. )
  68. return self._http_client
  69. async def close(self):
  70. """Close HTTP client."""
  71. if self._http_client and not self._http_client.is_closed:
  72. await self._http_client.aclose()
  73. def _is_in_quiet_hours(self, provider: NotificationProvider) -> bool:
  74. """Check if current time is within provider's quiet hours."""
  75. if not provider.quiet_hours_enabled:
  76. return False
  77. if not provider.quiet_hours_start or not provider.quiet_hours_end:
  78. return False
  79. try:
  80. now = datetime.now()
  81. current_time = now.hour * 60 + now.minute
  82. start_parts = provider.quiet_hours_start.split(":")
  83. end_parts = provider.quiet_hours_end.split(":")
  84. start_minutes = int(start_parts[0]) * 60 + int(start_parts[1])
  85. end_minutes = int(end_parts[0]) * 60 + int(end_parts[1])
  86. # Handle overnight quiet hours (e.g., 22:00 to 07:00)
  87. if start_minutes > end_minutes:
  88. # Quiet hours span midnight
  89. return current_time >= start_minutes or current_time < end_minutes
  90. else:
  91. # Same day quiet hours
  92. return start_minutes <= current_time < end_minutes
  93. except (ValueError, TypeError, AttributeError):
  94. logger.warning("Invalid quiet hours format for provider %s", provider.name)
  95. return False
  96. async def _get_template(self, db: AsyncSession, event_type: str) -> NotificationTemplate | None:
  97. """Get a notification template by event type."""
  98. # Check cache first
  99. if event_type in self._template_cache:
  100. return self._template_cache[event_type]
  101. result = await db.execute(select(NotificationTemplate).where(NotificationTemplate.event_type == event_type))
  102. template = result.scalar_one_or_none()
  103. if template:
  104. self._template_cache[event_type] = template
  105. return template
  106. def _render_template(self, template_str: str, variables: dict[str, Any]) -> str:
  107. """Render a template string with variables. Missing variables become empty."""
  108. result = template_str
  109. for key, value in variables.items():
  110. result = result.replace("{" + key + "}", str(value) if value is not None else "")
  111. # Remove any remaining unreplaced placeholders
  112. result = re.sub(r"\{[a-z_]+\}", "", result)
  113. return result
  114. async def _format_eta(self, seconds: int | None, db: AsyncSession) -> str:
  115. """Format ETA as wall-clock time, respecting user's time_format setting."""
  116. if not seconds or seconds <= 0:
  117. return "Unknown"
  118. from backend.app.api.routes.settings import get_setting
  119. eta_time = datetime.now() + timedelta(seconds=seconds)
  120. time_format = await get_setting(db, "time_format")
  121. if time_format == "12h":
  122. return eta_time.strftime("%I:%M %p").lstrip("0")
  123. # Default to 24h for "24h", "system", or unset
  124. return eta_time.strftime("%H:%M")
  125. def _format_duration(self, seconds: int | None) -> str:
  126. """Format duration in seconds to human-readable string."""
  127. if seconds is None:
  128. return "Unknown"
  129. hours = seconds // 3600
  130. minutes = (seconds % 3600) // 60
  131. if hours > 0:
  132. return f"{hours}h {minutes}m"
  133. return f"{minutes}m"
  134. def _clean_filename(self, filename: str) -> str:
  135. """Extract filename and remove file extensions."""
  136. import os
  137. # Strip path prefix (e.g., /data/Metadata/plate_5.gcode -> plate_5.gcode)
  138. filename = os.path.basename(filename)
  139. # Remove common extensions
  140. if filename.endswith(".gcode.3mf"):
  141. return filename[:-10]
  142. elif filename.endswith(".gcode"):
  143. return filename[:-6]
  144. elif filename.endswith(".3mf"):
  145. return filename[:-4]
  146. return filename
  147. async def _build_message_from_template(
  148. self, db: AsyncSession, event_type: str, variables: dict[str, Any]
  149. ) -> tuple[str, str]:
  150. """Build notification title and body from template."""
  151. # Add common variables
  152. variables["timestamp"] = datetime.now().strftime("%Y-%m-%d %H:%M")
  153. variables["app_name"] = "Bambuddy"
  154. template = await self._get_template(db, event_type)
  155. if not template:
  156. # Fallback to simple message
  157. logger.warning("Template not found for event type: %s", event_type)
  158. return event_type.replace("_", " ").title(), str(variables)
  159. title = self._render_template(template.title_template, variables)
  160. body = self._render_template(template.body_template, variables)
  161. return title, body
  162. async def send_test_notification(
  163. self, provider_type: str, config: dict[str, Any], db: AsyncSession | None = None
  164. ) -> tuple[bool, str]:
  165. """Send a test notification to verify configuration."""
  166. if db:
  167. title, message = await self._build_message_from_template(db, "test", {})
  168. else:
  169. title = "Bambuddy Test"
  170. message = "This is a test notification. If you see this, notifications are working!"
  171. try:
  172. if provider_type == "callmebot":
  173. return await self._send_callmebot(config, f"{title}\n{message}")
  174. elif provider_type == "ntfy":
  175. return await self._send_ntfy(config, title, message)
  176. elif provider_type == "pushover":
  177. return await self._send_pushover(config, title, message)
  178. elif provider_type == "telegram":
  179. return await self._send_telegram(config, f"*{title}*\n{message}")
  180. elif provider_type == "email":
  181. return await self._send_email(config, title, message)
  182. elif provider_type == "discord":
  183. return await self._send_discord(config, title, message)
  184. elif provider_type == "webhook":
  185. return await self._send_webhook(config, title, message)
  186. elif provider_type == "homeassistant":
  187. return await self._send_homeassistant(config, title, message, db=db)
  188. else:
  189. return False, f"Unknown provider type: {provider_type}"
  190. except Exception as e:
  191. logger.exception("Error sending test notification via %s", provider_type)
  192. return False, str(e)
  193. async def _send_callmebot(self, config: dict, message: str) -> tuple[bool, str]:
  194. """Send notification via CallMeBot (WhatsApp)."""
  195. phone = config.get("phone", "").strip()
  196. apikey = config.get("apikey", "").strip()
  197. if not phone or not apikey:
  198. return False, "Phone number and API key are required"
  199. # URL encode the message
  200. encoded_message = quote(message)
  201. url = f"https://api.callmebot.com/whatsapp.php?phone={phone}&text={encoded_message}&apikey={apikey}"
  202. client = await self._get_client()
  203. response = await client.get(url)
  204. if response.status_code == 200:
  205. return True, "Message sent successfully"
  206. else:
  207. return False, f"HTTP {response.status_code}: {response.text[:200]}"
  208. async def _send_ntfy(
  209. self,
  210. config: dict,
  211. title: str,
  212. message: str,
  213. image_data: bytes | None = None,
  214. event_type: str | None = None,
  215. ) -> tuple[bool, str]:
  216. """Send notification via ntfy."""
  217. server = config.get("server", "https://ntfy.sh").rstrip("/")
  218. topic = config.get("topic", "").strip()
  219. auth_token = config.get("auth_token", "").strip()
  220. if not topic:
  221. return False, "Topic is required"
  222. url = f"{server}/{topic}"
  223. # ntfy reads Title/Message from HTTP headers. httpx enforces ASCII
  224. # for str header values, but printer names and filenames can contain
  225. # non-ASCII characters (e.g. accented letters, CJK). Passing bytes
  226. # bypasses the ASCII check — ntfy handles UTF-8 headers correctly.
  227. headers: dict[str, str | bytes] = {"Title": title.encode("utf-8")}
  228. # Per-event Priority header (#990). Only set when the user has
  229. # explicitly mapped this event to a 1-5 value; otherwise fall through
  230. # to the ntfy server's default so existing setups stay unchanged.
  231. event_priorities = config.get("event_priorities") or {}
  232. if event_type and isinstance(event_priorities, dict):
  233. raw = event_priorities.get(event_type)
  234. try:
  235. priority = int(raw) if raw is not None else None
  236. except (TypeError, ValueError):
  237. priority = None
  238. if priority is not None and 1 <= priority <= 5:
  239. headers["Priority"] = str(priority)
  240. if auth_token:
  241. headers["Authorization"] = f"Bearer {auth_token}"
  242. client = await self._get_client()
  243. if image_data:
  244. # ntfy supports image attachments via multipart form-data.
  245. # HTTP headers cannot contain newlines, but ntfy interprets
  246. # literal \n (backslash-n) as newlines in the Message header.
  247. headers["Filename"] = "photo.jpg"
  248. headers["Message"] = message.replace("\n", "\\n").encode("utf-8")
  249. response = await client.put(url, content=image_data, headers=headers)
  250. if response.status_code == 400 and "attachments not allowed" in response.text:
  251. # Server has attachments disabled — retry without the image
  252. headers.pop("Filename", None)
  253. headers.pop("Message", None)
  254. response = await client.post(url, content=message.encode("utf-8"), headers=headers)
  255. else:
  256. response = await client.post(url, content=message.encode("utf-8"), headers=headers)
  257. if response.status_code in (200, 204):
  258. return True, "Message sent successfully"
  259. if _looks_like_cloudflare_challenge(response):
  260. return False, (
  261. f"HTTP {response.status_code} — ntfy server is behind a Cloudflare "
  262. "challenge. Bambuddy was served the JS challenge page instead of "
  263. "reaching ntfy. Cloudflare cannot be solved from a backend; add a "
  264. "Cloudflare security-skip rule for this hostname, disable Bot "
  265. "Fight Mode, or front the server with Cloudflare Access using a "
  266. "service token. (#1534)"
  267. )
  268. return False, f"HTTP {response.status_code}: {response.text[:200]}"
  269. async def _send_pushover(
  270. self, config: dict, title: str, message: str, image_data: bytes | None = None
  271. ) -> tuple[bool, str]:
  272. """Send notification via Pushover.
  273. Args:
  274. config: Provider configuration with user_key, app_token, priority
  275. title: Notification title
  276. message: Notification body
  277. image_data: Optional JPEG image bytes to attach (max 2.5MB)
  278. """
  279. user_key = config.get("user_key", "").strip()
  280. app_token = config.get("app_token", "").strip()
  281. priority = config.get("priority", 0)
  282. if not user_key or not app_token:
  283. return False, "User key and app token are required"
  284. url = "https://api.pushover.net/1/messages.json"
  285. data = {
  286. "token": app_token,
  287. "user": user_key,
  288. "title": title,
  289. "message": message,
  290. "priority": priority,
  291. }
  292. client = await self._get_client()
  293. if image_data:
  294. # Pushover supports image attachments via multipart form-data
  295. files = {"attachment": ("photo.jpg", image_data, "image/jpeg")}
  296. response = await client.post(url, data=data, files=files)
  297. else:
  298. response = await client.post(url, data=data)
  299. if response.status_code == 200:
  300. return True, "Message sent successfully"
  301. else:
  302. try:
  303. error_data = response.json()
  304. errors = error_data.get("errors", [])
  305. return False, f"Pushover error: {', '.join(errors)}"
  306. except Exception:
  307. return False, f"HTTP {response.status_code}: {response.text[:200]}"
  308. async def _send_telegram(self, config: dict, message: str, image_data: bytes | None = None) -> tuple[bool, str]:
  309. """Send notification via Telegram bot."""
  310. bot_token = config.get("bot_token", "").strip()
  311. chat_id = config.get("chat_id", "").strip()
  312. if not bot_token or not chat_id:
  313. return False, "Bot token and chat ID are required"
  314. # Escape underscores in the message body so Telegram Markdown
  315. # parsing doesn't break on job names like "A1_plate_8" or error
  316. # codes like "0300_0001". The title is already wrapped in *bold*
  317. # markers, so only escape after the first newline.
  318. if "\n" in message:
  319. title_part, body_part = message.split("\n", 1)
  320. body_part = body_part.replace("_", "\\_")
  321. message = f"{title_part}\n{body_part}"
  322. client = await self._get_client()
  323. if image_data:
  324. # Use sendPhoto to attach the thumbnail with the caption
  325. url = f"https://api.telegram.org/bot{bot_token}/sendPhoto"
  326. response = await client.post(
  327. url,
  328. data={"chat_id": chat_id, "caption": message, "parse_mode": "Markdown"},
  329. files={"photo": ("photo.jpg", image_data, "image/jpeg")},
  330. )
  331. else:
  332. url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
  333. data = {
  334. "chat_id": chat_id,
  335. "text": message,
  336. "parse_mode": "Markdown",
  337. }
  338. response = await client.post(url, json=data)
  339. if response.status_code == 200:
  340. result = response.json()
  341. if result.get("ok"):
  342. return True, "Message sent successfully"
  343. else:
  344. return False, f"Telegram error: {result.get('description', 'Unknown error')}"
  345. else:
  346. return False, f"HTTP {response.status_code}: {response.text[:200]}"
  347. async def _send_email(self, config: dict, subject: str, body: str) -> tuple[bool, str]:
  348. """Send notification via email (SMTP)."""
  349. smtp_server = config.get("smtp_server", "").strip()
  350. smtp_port = int(config.get("smtp_port", 587))
  351. username = config.get("username", "").strip()
  352. password = config.get("password", "").strip()
  353. from_email = config.get("from_email", "").strip()
  354. to_email = config.get("to_email", "").strip()
  355. # Security: "starttls" (port 587), "ssl" (port 465), "none" (port 25)
  356. security = config.get("security", "starttls")
  357. # Authentication: "true" or "false"
  358. auth_enabled = config.get("auth_enabled", "true").lower() == "true"
  359. if not all([smtp_server, from_email, to_email]):
  360. return False, "SMTP server, from email, and to email are required"
  361. if auth_enabled and not all([username, password]):
  362. return False, "Username and password are required when authentication is enabled"
  363. try:
  364. msg = MIMEMultipart()
  365. msg["From"] = from_email
  366. msg["To"] = to_email
  367. msg["Subject"] = f"[Bambuddy] {subject}"
  368. msg.attach(MIMEText(body, "plain"))
  369. if security == "ssl":
  370. # Direct SSL connection (typically port 465)
  371. server = smtplib.SMTP_SSL(smtp_server, smtp_port)
  372. elif security == "starttls":
  373. # STARTTLS upgrade (typically port 587)
  374. server = smtplib.SMTP(smtp_server, smtp_port)
  375. server.starttls()
  376. else:
  377. # No encryption (typically port 25) - use with caution
  378. server = smtplib.SMTP(smtp_server, smtp_port)
  379. if auth_enabled:
  380. server.login(username, password)
  381. server.sendmail(from_email, to_email, msg.as_string())
  382. server.quit()
  383. return True, "Email sent successfully"
  384. except smtplib.SMTPAuthenticationError:
  385. return False, "SMTP authentication failed - check username/password"
  386. except smtplib.SMTPException as e:
  387. return False, f"SMTP error: {str(e)}"
  388. except Exception as e:
  389. return False, f"Email error: {str(e)}"
  390. async def _send_discord(
  391. self, config: dict, title: str, message: str, image_data: bytes | None = None
  392. ) -> tuple[bool, str]:
  393. """Send notification via Discord webhook."""
  394. webhook_url = config.get("webhook_url", "").strip()
  395. if not webhook_url:
  396. return False, "Webhook URL is required"
  397. if not (
  398. webhook_url.startswith("https://discord.com/api/webhooks/")
  399. or webhook_url.startswith("https://discordapp.com/api/webhooks/")
  400. ):
  401. return False, "Invalid Discord webhook URL"
  402. # Discord embed format for nicer messages
  403. embed = {
  404. "title": title,
  405. "description": message,
  406. "color": 0x00AE42, # Bambu green
  407. }
  408. client = await self._get_client()
  409. if image_data:
  410. # Attach image via multipart form-data and reference in embed
  411. embed["image"] = {"url": "attachment://photo.jpg"}
  412. payload = {"embeds": [embed]}
  413. response = await client.post(
  414. webhook_url,
  415. data={"payload_json": json.dumps(payload)},
  416. files={"files[0]": ("photo.jpg", image_data, "image/jpeg")},
  417. )
  418. else:
  419. response = await client.post(webhook_url, json={"embeds": [embed]})
  420. if response.status_code in (200, 204):
  421. return True, "Message sent successfully"
  422. else:
  423. return False, f"HTTP {response.status_code}: {response.text[:200]}"
  424. async def _send_webhook(
  425. self,
  426. config: dict,
  427. title: str,
  428. message: str,
  429. image_data: bytes | None = None,
  430. event_type: str | None = None,
  431. variables: dict | None = None,
  432. ) -> tuple[bool, str]:
  433. """Send notification via generic webhook (POST JSON).
  434. Supports two payload formats:
  435. - generic: Custom field names with timestamp/source metadata + structured event data
  436. - slack: Slack/Mattermost compatible format (just {"text": "..."})
  437. """
  438. webhook_url = config.get("webhook_url", "").strip()
  439. auth_header = config.get("auth_header", "").strip()
  440. payload_format = config.get("payload_format", "generic").strip()
  441. if not webhook_url:
  442. return False, "Webhook URL is required"
  443. # Build payload based on format
  444. if payload_format == "slack":
  445. # Slack/Mattermost format - just text field
  446. data = {"text": f"*{title}*\n{message}"}
  447. else:
  448. # Generic format with custom field names
  449. custom_field_title = config.get("field_title", "title").strip() or "title"
  450. custom_field_message = config.get("field_message", "message").strip() or "message"
  451. data = {
  452. custom_field_title: title,
  453. custom_field_message: message,
  454. "timestamp": datetime.now().isoformat(),
  455. "source": "Bambuddy",
  456. }
  457. # For generic format, include structured event data for automation tools
  458. if payload_format != "slack":
  459. if event_type:
  460. data["event"] = event_type
  461. if variables:
  462. for key, value in variables.items():
  463. if key not in data: # Don't overwrite title/message/timestamp/source
  464. data[key] = value
  465. # Attach base64-encoded image when available (generic format only)
  466. if image_data and payload_format != "slack":
  467. import base64
  468. data["image"] = base64.b64encode(image_data).decode("ascii")
  469. headers = {"Content-Type": "application/json"}
  470. if auth_header:
  471. # Support "Bearer token" or just "token" format
  472. if " " in auth_header:
  473. headers["Authorization"] = auth_header
  474. else:
  475. headers["Authorization"] = f"Bearer {auth_header}"
  476. client = await self._get_client()
  477. try:
  478. response = await client.post(webhook_url, json=data, headers=headers)
  479. if response.status_code in (200, 201, 202, 204):
  480. return True, "Webhook delivered successfully"
  481. else:
  482. return False, f"HTTP {response.status_code}: {response.text[:200]}"
  483. except Exception as e:
  484. return False, f"Webhook error: {str(e)}"
  485. async def _send_homeassistant(
  486. self, config: dict, title: str, message: str, db: AsyncSession | None = None
  487. ) -> tuple[bool, str]:
  488. """Send notification via Home Assistant.
  489. Uses the globally configured HA URL/token from settings.
  490. Defaults to persistent_notification/create, but supports
  491. custom services via config["service"] (e.g. notify.mobile_app_myphone).
  492. """
  493. # Get HA connection settings from global config
  494. ha_url = ""
  495. ha_token = ""
  496. if db:
  497. from backend.app.api.routes.settings import get_homeassistant_settings
  498. try:
  499. ha_settings = await get_homeassistant_settings(db)
  500. ha_url = ha_settings.get("ha_url", "")
  501. ha_token = ha_settings.get("ha_token", "")
  502. except Exception as e:
  503. logger.warning("Failed to read HA settings from database: %s", e)
  504. else:
  505. # Fallback: read directly from environment if no DB session
  506. import os
  507. ha_url = os.environ.get("HA_URL", "")
  508. ha_token = os.environ.get("HA_TOKEN", "")
  509. if not ha_url or not ha_token:
  510. return False, (
  511. "Home Assistant is not configured. Please set HA URL and token in Settings → Network → Home Assistant."
  512. )
  513. # Determine which HA service to call - Default: persistent_notification.create
  514. service = (config.get("service") or "").strip()
  515. if service:
  516. # Allow in different forms:
  517. # - notify.mobile_app_<device>
  518. # - notify/mobile_app_<device>
  519. # - api/services/notify/mobile_app_<device>
  520. service_str = service.lstrip("/")
  521. if service_str.startswith("api/services/"):
  522. endpoint = service_str
  523. elif "/" in service_str:
  524. endpoint = f"api/services/{service_str}"
  525. elif "." in service_str:
  526. domain, svc = service_str.split(".", 1)
  527. endpoint = f"api/services/{domain}/{svc}"
  528. else:
  529. return False, (
  530. "Invalid Home Assistant service name. Use e.g. 'notify.mobile_app_yourdevice' or 'notify/your_service'."
  531. )
  532. if not re.match(r"^api/services/[a-zA-Z0-9_]+/[a-zA-Z0-9_]+$", endpoint):
  533. return False, (
  534. "Invalid Home Assistant service name. Domain and service must only contain letters, numbers, and underscores."
  535. )
  536. else:
  537. endpoint = "api/services/persistent_notification/create"
  538. url = f"{ha_url.rstrip('/')}/{endpoint}"
  539. headers = {
  540. "Authorization": f"Bearer {ha_token}",
  541. "Content-Type": "application/json",
  542. }
  543. payload = {
  544. "title": title,
  545. "message": message,
  546. }
  547. client = await self._get_client()
  548. response = await client.post(url, json=payload, headers=headers)
  549. if response.status_code in (200, 201):
  550. return True, "Notification sent via Home Assistant"
  551. elif response.status_code == 401:
  552. return False, "Home Assistant authentication failed - check your token"
  553. else:
  554. return False, f"HTTP {response.status_code}: {response.text[:200]}"
  555. async def _send_to_provider(
  556. self,
  557. provider: NotificationProvider,
  558. title: str,
  559. message: str,
  560. db: AsyncSession | None = None,
  561. image_data: bytes | None = None,
  562. event_type: str | None = None,
  563. variables: dict | None = None,
  564. ) -> tuple[bool, str]:
  565. """Send notification to a specific provider."""
  566. # Check quiet hours
  567. if self._is_in_quiet_hours(provider):
  568. logger.info("Skipping notification to %s - quiet hours active", provider.name)
  569. return True, "Skipped - quiet hours"
  570. config = json.loads(provider.config) if isinstance(provider.config, str) else provider.config
  571. try:
  572. if provider.provider_type == "callmebot":
  573. return await self._send_callmebot(config, f"{title}\n{message}")
  574. elif provider.provider_type == "ntfy":
  575. return await self._send_ntfy(config, title, message, image_data=image_data, event_type=event_type)
  576. elif provider.provider_type == "pushover":
  577. return await self._send_pushover(config, title, message, image_data=image_data)
  578. elif provider.provider_type == "telegram":
  579. return await self._send_telegram(config, f"*{title}*\n{message}", image_data=image_data)
  580. elif provider.provider_type == "email":
  581. return await self._send_email(config, title, message)
  582. elif provider.provider_type == "discord":
  583. return await self._send_discord(config, title, message, image_data=image_data)
  584. elif provider.provider_type == "webhook":
  585. return await self._send_webhook(
  586. config, title, message, image_data=image_data, event_type=event_type, variables=variables
  587. )
  588. elif provider.provider_type == "homeassistant":
  589. return await self._send_homeassistant(config, title, message, db=db)
  590. else:
  591. return False, f"Unknown provider type: {provider.provider_type}"
  592. except Exception as e:
  593. logger.exception("Error sending notification via %s", provider.provider_type)
  594. return False, str(e)
  595. async def _update_provider_status(
  596. self, db: AsyncSession, provider_id: int, success: bool, error: str | None = None
  597. ):
  598. """Update provider status after sending notification."""
  599. result = await db.execute(select(NotificationProvider).where(NotificationProvider.id == provider_id))
  600. provider = result.scalar_one_or_none()
  601. if provider:
  602. if success:
  603. provider.last_success = datetime.now(timezone.utc)
  604. else:
  605. provider.last_error = error
  606. provider.last_error_at = datetime.now(timezone.utc)
  607. await db.commit()
  608. async def _get_providers_for_event(
  609. self,
  610. db: AsyncSession,
  611. event_field: str,
  612. printer_id: int | None = None,
  613. ) -> list[NotificationProvider]:
  614. """Get all enabled providers that want a specific event type."""
  615. # Build the query dynamically based on event field
  616. query = select(NotificationProvider).where(
  617. NotificationProvider.enabled.is_(True),
  618. getattr(NotificationProvider, event_field).is_(True),
  619. )
  620. if printer_id is not None:
  621. query = query.where(
  622. (NotificationProvider.printer_id.is_(None)) | (NotificationProvider.printer_id == printer_id)
  623. )
  624. result = await db.execute(query)
  625. return list(result.scalars().all())
  626. async def _log_notification(
  627. self,
  628. db: AsyncSession,
  629. provider_id: int,
  630. event_type: str,
  631. title: str,
  632. message: str,
  633. success: bool,
  634. error_message: str | None = None,
  635. printer_id: int | None = None,
  636. printer_name: str | None = None,
  637. ):
  638. """Create a log entry for a sent notification."""
  639. try:
  640. log = NotificationLog(
  641. provider_id=provider_id,
  642. event_type=event_type,
  643. title=title,
  644. message=message,
  645. success=success,
  646. error_message=error_message,
  647. printer_id=printer_id,
  648. printer_name=printer_name,
  649. )
  650. db.add(log)
  651. await db.commit()
  652. except Exception as e:
  653. logger.warning("Failed to log notification: %s", e)
  654. # Don't fail the notification just because logging failed
  655. async def _send_to_providers(
  656. self,
  657. providers: list[NotificationProvider],
  658. title: str,
  659. message: str,
  660. db: AsyncSession,
  661. event_type: str = "unknown",
  662. printer_id: int | None = None,
  663. printer_name: str | None = None,
  664. force_immediate: bool = False,
  665. image_data: bytes | None = None,
  666. variables: dict | None = None,
  667. ):
  668. """Send notification to multiple providers and log the results.
  669. All notifications are always sent immediately. If digest mode is enabled,
  670. the notification is ALSO queued for the daily digest summary.
  671. """
  672. for provider in providers:
  673. try:
  674. # Always send notification immediately
  675. success, error = await self._send_to_provider(
  676. provider, title, message, db, image_data=image_data, event_type=event_type, variables=variables
  677. )
  678. # Also queue for digest if enabled (digest is a summary, not a queue)
  679. if provider.daily_digest_enabled and provider.daily_digest_time:
  680. await self._queue_for_digest(
  681. provider=provider,
  682. event_type=event_type,
  683. title=title,
  684. message=message,
  685. db=db,
  686. printer_id=printer_id,
  687. printer_name=printer_name,
  688. )
  689. await self._update_provider_status(db, provider.id, success, error if not success else None)
  690. await self._log_notification(
  691. db=db,
  692. provider_id=provider.id,
  693. event_type=event_type,
  694. title=title,
  695. message=message,
  696. success=success,
  697. error_message=error if not success else None,
  698. printer_id=printer_id,
  699. printer_name=printer_name,
  700. )
  701. if success:
  702. logger.info("Sent notification via %s", provider.name)
  703. else:
  704. logger.warning("Failed to send notification via %s: %s", provider.name, error)
  705. except Exception as e:
  706. logger.exception("Error sending notification via %s", provider.name)
  707. await self._update_provider_status(db, provider.id, False, str(e))
  708. await self._log_notification(
  709. db=db,
  710. provider_id=provider.id,
  711. event_type=event_type,
  712. title=title,
  713. message=message,
  714. success=False,
  715. error_message=str(e),
  716. printer_id=printer_id,
  717. printer_name=printer_name,
  718. )
  719. async def on_print_start(
  720. self,
  721. printer_id: int,
  722. printer_name: str,
  723. data: dict,
  724. db: AsyncSession,
  725. archive_data: dict | None = None,
  726. ):
  727. """Handle print start event - send notifications to relevant providers.
  728. Args:
  729. printer_id: The printer ID
  730. printer_name: The printer name
  731. data: MQTT event data with filename, subtask_name, remaining_time, raw_data
  732. db: Database session
  733. archive_data: Optional archive data with print_time_seconds from 3MF parsing
  734. """
  735. logger.info("on_print_start called for printer %s (%s)", printer_id, printer_name)
  736. providers = await self._get_providers_for_event(db, "on_print_start", printer_id)
  737. if not providers:
  738. logger.info("No notification providers configured for print_start event on printer %s", printer_id)
  739. return
  740. # Use subtask_name (project name) if available, otherwise use filename
  741. subtask_name = data.get("subtask_name")
  742. if subtask_name:
  743. # Replace underscores with spaces for readability
  744. filename = subtask_name.replace("_", " ")
  745. else:
  746. filename = self._clean_filename(data.get("filename", "Unknown"))
  747. # Priority for estimated_time:
  748. # 1. Archive's print_time_seconds from 3MF parsing (most reliable)
  749. # 2. MQTT remaining_time (may be 0 at print start)
  750. # 3. raw_data mc_remaining_time
  751. estimated_time = None
  752. # Try archive data first (from 3MF parsing - most reliable)
  753. if archive_data and archive_data.get("print_time_seconds"):
  754. estimated_time = archive_data["print_time_seconds"]
  755. logger.debug("Using print_time_seconds from archive: %s", estimated_time)
  756. # Fall back to MQTT remaining_time
  757. if estimated_time is None:
  758. estimated_time = data.get("remaining_time")
  759. if estimated_time:
  760. logger.debug("Using remaining_time from MQTT: %s", estimated_time)
  761. # Last resort: raw_data mc_remaining_time (in minutes, convert to seconds)
  762. if estimated_time is None:
  763. raw_time = data.get("raw_data", {}).get("mc_remaining_time")
  764. if raw_time:
  765. estimated_time = raw_time * 60
  766. logger.debug("Using mc_remaining_time from raw_data: %s", estimated_time)
  767. time_str = self._format_duration(estimated_time)
  768. eta_str = await self._format_eta(estimated_time, db)
  769. variables = {
  770. "printer": printer_name,
  771. "filename": filename,
  772. "estimated_time": time_str,
  773. "eta": eta_str,
  774. }
  775. # Extract image data for providers that support attachments (e.g. Pushover)
  776. image_data = None
  777. if archive_data:
  778. image_data = archive_data.get("image_data")
  779. logger.info("Found %s providers for print_start: %s", len(providers), [p.name for p in providers])
  780. title, message = await self._build_message_from_template(db, "print_start", variables)
  781. await self._send_to_providers(
  782. providers,
  783. title,
  784. message,
  785. db,
  786. "print_start",
  787. printer_id,
  788. printer_name,
  789. image_data=image_data,
  790. variables=variables,
  791. )
  792. async def on_print_complete(
  793. self,
  794. printer_id: int,
  795. printer_name: str,
  796. status: str,
  797. data: dict,
  798. db: AsyncSession,
  799. archive_data: dict | None = None,
  800. ):
  801. """Handle print complete event - send notifications to relevant providers."""
  802. logger.info("on_print_complete called for printer %s (%s), status=%s", printer_id, printer_name, status)
  803. # Determine event type based on status
  804. if status == "completed":
  805. event_field = "on_print_complete"
  806. event_type = "print_complete"
  807. elif status in ("failed",):
  808. event_field = "on_print_failed"
  809. event_type = "print_failed"
  810. elif status in ("aborted", "stopped", "cancelled"):
  811. event_field = "on_print_stopped"
  812. event_type = "print_stopped"
  813. else:
  814. logger.warning("Unknown print status '%s', defaulting to on_print_complete", status)
  815. event_field = "on_print_complete"
  816. event_type = "print_complete"
  817. providers = await self._get_providers_for_event(db, event_field, printer_id)
  818. if not providers:
  819. logger.info("No notification providers configured for %s event on printer %s", event_field, printer_id)
  820. return
  821. # Use subtask_name (project name) if available, otherwise use filename
  822. subtask_name = data.get("subtask_name")
  823. if subtask_name:
  824. filename = subtask_name.replace("_", " ")
  825. else:
  826. filename = self._clean_filename(data.get("filename", "Unknown"))
  827. variables = {
  828. "printer": printer_name,
  829. "filename": filename,
  830. "duration": "Unknown",
  831. "filament_grams": "Unknown",
  832. "reason": "Unknown",
  833. }
  834. if archive_data:
  835. # {{duration}} on completion / failure / stopped events is the *actual*
  836. # elapsed time (#1198). Slicer-estimated print_time_seconds is only used
  837. # as a last-resort fallback when timestamps weren't recorded.
  838. duration_seconds = archive_data.get("actual_time_seconds") or archive_data.get("print_time_seconds")
  839. if duration_seconds:
  840. variables["duration"] = self._format_duration(duration_seconds)
  841. if archive_data.get("actual_filament_grams"):
  842. variables["filament_grams"] = f"{archive_data['actual_filament_grams']:.1f}"
  843. if status == "failed" and archive_data.get("failure_reason"):
  844. variables["reason"] = archive_data["failure_reason"]
  845. if archive_data.get("finish_photo_url"):
  846. variables["finish_photo_url"] = archive_data["finish_photo_url"]
  847. # Build per-slot breakdown string with AMS info when available
  848. if archive_data.get("usage_results"):
  849. parts = []
  850. for u in archive_data["usage_results"]:
  851. ams_id = u.get("ams_id", 0)
  852. tray_id = u.get("tray_id", 0)
  853. material = u.get("material", "Unknown") or "Unknown"
  854. used = u.get("weight_used", 0)
  855. if ams_id >= 128:
  856. slot_label = "Ext"
  857. else:
  858. slot_label = f"AMS-{chr(65 + ams_id)} T{tray_id + 1}"
  859. parts.append(f"{slot_label} {material}: {used:.1f}g")
  860. variables["filament_details"] = " | ".join(parts)
  861. elif archive_data.get("filament_slots"):
  862. parts = []
  863. for slot in archive_data["filament_slots"]:
  864. ftype = slot.get("type", "Unknown") or "Unknown"
  865. used = slot.get("used_g", 0)
  866. parts.append(f"{ftype}: {used:.1f}g")
  867. variables["filament_details"] = " | ".join(parts)
  868. # Add progress for partial prints
  869. if archive_data.get("progress") is not None:
  870. variables["progress"] = str(archive_data["progress"])
  871. # Extract image data for providers that support attachments (e.g. Pushover)
  872. image_data = None
  873. if archive_data:
  874. image_data = archive_data.get("image_data")
  875. logger.info("Found %s providers for %s: %s", len(providers), event_field, [p.name for p in providers])
  876. title, message = await self._build_message_from_template(db, event_type, variables)
  877. await self._send_to_providers(
  878. providers,
  879. title,
  880. message,
  881. db,
  882. event_type,
  883. printer_id,
  884. printer_name,
  885. image_data=image_data,
  886. variables=variables,
  887. )
  888. async def on_print_progress(
  889. self,
  890. printer_id: int,
  891. printer_name: str,
  892. filename: str,
  893. progress: int,
  894. db: AsyncSession,
  895. remaining_time: int | None = None,
  896. image_data: bytes | None = None,
  897. ):
  898. """Handle print progress milestone (25%, 50%, 75%)."""
  899. providers = await self._get_providers_for_event(db, "on_print_progress", printer_id)
  900. if not providers:
  901. return
  902. eta_str = await self._format_eta(remaining_time, db)
  903. variables = {
  904. "printer": printer_name,
  905. "filename": self._clean_filename(filename),
  906. "progress": str(progress),
  907. "remaining_time": self._format_duration(remaining_time) if remaining_time else "Unknown",
  908. "eta": eta_str,
  909. }
  910. title, message = await self._build_message_from_template(db, "print_progress", variables)
  911. await self._send_to_providers(
  912. providers,
  913. title,
  914. message,
  915. db,
  916. "print_progress",
  917. printer_id,
  918. printer_name,
  919. image_data=image_data,
  920. variables=variables,
  921. )
  922. async def on_print_missing_spool_assignment(
  923. self,
  924. printer_id: int,
  925. printer_name: str,
  926. missing_slots: list[dict[str, str]],
  927. db: AsyncSession,
  928. ):
  929. """Handle print-start event when required trays are missing spool assignments."""
  930. if not missing_slots:
  931. return
  932. providers = await self._get_providers_for_event(db, "on_print_missing_spool_assignment", printer_id)
  933. if not providers:
  934. return
  935. missing_slot_names = ", ".join(slot.get("slot", "Unknown") for slot in missing_slots)
  936. detail_lines = []
  937. for slot in missing_slots:
  938. slot_name = slot.get("slot", "Unknown")
  939. profile = slot.get("profile", "Unknown")
  940. detail_lines.append(f"- {slot_name}: {profile}")
  941. missing_profile_details = "\n".join(detail_lines)
  942. variables = {
  943. "printer": printer_name,
  944. "missing_slots": missing_slot_names,
  945. "missing_slot_details": missing_profile_details,
  946. }
  947. title, message = await self._build_message_from_template(db, "print_missing_spool_assignment", variables)
  948. await self._send_to_providers(
  949. providers,
  950. title,
  951. message,
  952. db,
  953. "print_missing_spool_assignment",
  954. printer_id,
  955. printer_name,
  956. force_immediate=True,
  957. variables=variables,
  958. )
  959. async def on_printer_offline(self, printer_id: int, printer_name: str, db: AsyncSession):
  960. """Handle printer offline event."""
  961. providers = await self._get_providers_for_event(db, "on_printer_offline", printer_id)
  962. if not providers:
  963. return
  964. variables = {"printer": printer_name}
  965. title, message = await self._build_message_from_template(db, "printer_offline", variables)
  966. await self._send_to_providers(
  967. providers, title, message, db, "printer_offline", printer_id, printer_name, variables=variables
  968. )
  969. async def on_printer_error(
  970. self,
  971. printer_id: int,
  972. printer_name: str,
  973. error_type: str,
  974. db: AsyncSession,
  975. error_detail: str | None = None,
  976. image_data: bytes | None = None,
  977. ):
  978. """Handle printer error event (AMS issues, etc.)."""
  979. providers = await self._get_providers_for_event(db, "on_printer_error", printer_id)
  980. if not providers:
  981. return
  982. variables = {
  983. "printer": printer_name,
  984. "error_type": error_type,
  985. "error_detail": error_detail or "No details available",
  986. }
  987. title, message = await self._build_message_from_template(db, "printer_error", variables)
  988. await self._send_to_providers(
  989. providers,
  990. title,
  991. message,
  992. db,
  993. "printer_error",
  994. printer_id,
  995. printer_name,
  996. image_data=image_data,
  997. variables=variables,
  998. )
  999. async def on_plate_not_empty(
  1000. self,
  1001. printer_id: int,
  1002. printer_name: str,
  1003. db: AsyncSession,
  1004. difference_percent: float | None = None,
  1005. ):
  1006. """Handle plate not empty event - objects detected on build plate before print."""
  1007. providers = await self._get_providers_for_event(db, "on_plate_not_empty", printer_id)
  1008. if not providers:
  1009. return
  1010. variables = {
  1011. "printer": printer_name,
  1012. "difference_percent": f"{difference_percent:.1f}" if difference_percent else "N/A",
  1013. }
  1014. title, message = await self._build_message_from_template(db, "plate_not_empty", variables)
  1015. await self._send_to_providers(
  1016. providers,
  1017. title,
  1018. message,
  1019. db,
  1020. "plate_not_empty",
  1021. printer_id,
  1022. printer_name,
  1023. force_immediate=True,
  1024. variables=variables,
  1025. )
  1026. async def on_filament_low(
  1027. self,
  1028. printer_id: int,
  1029. printer_name: str,
  1030. slot: int,
  1031. remaining_percent: int,
  1032. db: AsyncSession,
  1033. color: str | None = None,
  1034. ):
  1035. """Handle low filament event."""
  1036. providers = await self._get_providers_for_event(db, "on_filament_low", printer_id)
  1037. if not providers:
  1038. return
  1039. variables = {
  1040. "printer": printer_name,
  1041. "slot": str(slot),
  1042. "remaining_percent": str(remaining_percent),
  1043. "color": color or "",
  1044. }
  1045. title, message = await self._build_message_from_template(db, "filament_low", variables)
  1046. await self._send_to_providers(
  1047. providers, title, message, db, "filament_low", printer_id, printer_name, variables=variables
  1048. )
  1049. async def on_maintenance_due(
  1050. self,
  1051. printer_id: int,
  1052. printer_name: str,
  1053. maintenance_items: list[dict],
  1054. db: AsyncSession,
  1055. ):
  1056. """Handle maintenance due event - sends notification when maintenance is due or warning."""
  1057. if not maintenance_items:
  1058. return
  1059. providers = await self._get_providers_for_event(db, "on_maintenance_due", printer_id)
  1060. if not providers:
  1061. logger.info("No notification providers configured for maintenance_due event on printer %s", printer_id)
  1062. return
  1063. # Format maintenance items list
  1064. items_list = []
  1065. for item in maintenance_items:
  1066. status = "OVERDUE" if item.get("is_due") else "Soon"
  1067. items_list.append(f"- {item['name']} ({status})")
  1068. items_str = "\n".join(items_list)
  1069. variables = {
  1070. "printer": printer_name,
  1071. "items": items_str,
  1072. }
  1073. logger.info("Found %s providers for maintenance_due: %s", len(providers), [p.name for p in providers])
  1074. title, message = await self._build_message_from_template(db, "maintenance_due", variables)
  1075. await self._send_to_providers(
  1076. providers, title, message, db, "maintenance_due", printer_id, printer_name, variables=variables
  1077. )
  1078. async def on_ams_humidity_high(
  1079. self,
  1080. printer_id: int,
  1081. printer_name: str,
  1082. ams_label: str,
  1083. humidity: float,
  1084. threshold: float,
  1085. db: AsyncSession,
  1086. ):
  1087. """Handle AMS high humidity alarm event. Always sends immediately (bypasses digest)."""
  1088. providers = await self._get_providers_for_event(db, "on_ams_humidity_high", printer_id)
  1089. if not providers:
  1090. return
  1091. variables = {
  1092. "printer": printer_name,
  1093. "ams_label": ams_label,
  1094. "humidity": f"{humidity:.0f}",
  1095. "threshold": f"{threshold:.0f}",
  1096. }
  1097. title, message = await self._build_message_from_template(db, "ams_humidity_high", variables)
  1098. # Alarms always send immediately, bypassing digest mode
  1099. await self._send_to_providers(
  1100. providers,
  1101. title,
  1102. message,
  1103. db,
  1104. "ams_humidity_high",
  1105. printer_id,
  1106. printer_name,
  1107. force_immediate=True,
  1108. variables=variables,
  1109. )
  1110. async def on_ams_temperature_high(
  1111. self,
  1112. printer_id: int,
  1113. printer_name: str,
  1114. ams_label: str,
  1115. temperature: float,
  1116. threshold: float,
  1117. db: AsyncSession,
  1118. ):
  1119. """Handle AMS high temperature alarm event. Always sends immediately (bypasses digest)."""
  1120. providers = await self._get_providers_for_event(db, "on_ams_temperature_high", printer_id)
  1121. if not providers:
  1122. return
  1123. variables = {
  1124. "printer": printer_name,
  1125. "ams_label": ams_label,
  1126. "temperature": f"{temperature:.1f}",
  1127. "threshold": f"{threshold:.1f}",
  1128. }
  1129. title, message = await self._build_message_from_template(db, "ams_temperature_high", variables)
  1130. # Alarms always send immediately, bypassing digest mode
  1131. await self._send_to_providers(
  1132. providers,
  1133. title,
  1134. message,
  1135. db,
  1136. "ams_temperature_high",
  1137. printer_id,
  1138. printer_name,
  1139. force_immediate=True,
  1140. variables=variables,
  1141. )
  1142. async def on_ams_ht_humidity_high(
  1143. self,
  1144. printer_id: int,
  1145. printer_name: str,
  1146. ams_label: str,
  1147. humidity: float,
  1148. threshold: float,
  1149. db: AsyncSession,
  1150. ):
  1151. """Handle AMS-HT high humidity alarm event. Always sends immediately (bypasses digest)."""
  1152. providers = await self._get_providers_for_event(db, "on_ams_ht_humidity_high", printer_id)
  1153. if not providers:
  1154. return
  1155. variables = {
  1156. "printer": printer_name,
  1157. "ams_label": ams_label,
  1158. "humidity": f"{humidity:.0f}",
  1159. "threshold": f"{threshold:.0f}",
  1160. }
  1161. # Use the same template as regular AMS (can create separate templates later if needed)
  1162. title, message = await self._build_message_from_template(db, "ams_humidity_high", variables)
  1163. # Alarms always send immediately, bypassing digest mode
  1164. await self._send_to_providers(
  1165. providers,
  1166. title,
  1167. message,
  1168. db,
  1169. "ams_ht_humidity_high",
  1170. printer_id,
  1171. printer_name,
  1172. force_immediate=True,
  1173. variables=variables,
  1174. )
  1175. async def on_ams_ht_temperature_high(
  1176. self,
  1177. printer_id: int,
  1178. printer_name: str,
  1179. ams_label: str,
  1180. temperature: float,
  1181. threshold: float,
  1182. db: AsyncSession,
  1183. ):
  1184. """Handle AMS-HT high temperature alarm event. Always sends immediately (bypasses digest)."""
  1185. providers = await self._get_providers_for_event(db, "on_ams_ht_temperature_high", printer_id)
  1186. if not providers:
  1187. return
  1188. variables = {
  1189. "printer": printer_name,
  1190. "ams_label": ams_label,
  1191. "temperature": f"{temperature:.1f}",
  1192. "threshold": f"{threshold:.1f}",
  1193. }
  1194. # Use the same template as regular AMS (can create separate templates later if needed)
  1195. title, message = await self._build_message_from_template(db, "ams_temperature_high", variables)
  1196. # Alarms always send immediately, bypassing digest mode
  1197. await self._send_to_providers(
  1198. providers,
  1199. title,
  1200. message,
  1201. db,
  1202. "ams_ht_temperature_high",
  1203. printer_id,
  1204. printer_name,
  1205. force_immediate=True,
  1206. variables=variables,
  1207. )
  1208. async def on_bed_cooled(
  1209. self,
  1210. printer_id: int,
  1211. printer_name: str,
  1212. bed_temp: float,
  1213. threshold: float,
  1214. filename: str,
  1215. db: AsyncSession,
  1216. ):
  1217. """Handle bed cooled event - bed temperature dropped below threshold after print."""
  1218. providers = await self._get_providers_for_event(db, "on_bed_cooled", printer_id)
  1219. if not providers:
  1220. return
  1221. variables = {
  1222. "printer": printer_name,
  1223. "bed_temp": f"{bed_temp:.0f}",
  1224. "threshold": f"{threshold:.0f}",
  1225. "filename": self._clean_filename(filename) if filename else "Unknown",
  1226. }
  1227. title, message = await self._build_message_from_template(db, "bed_cooled", variables)
  1228. await self._send_to_providers(
  1229. providers, title, message, db, "bed_cooled", printer_id, printer_name, variables=variables
  1230. )
  1231. async def on_first_layer_complete(
  1232. self,
  1233. printer_id: int,
  1234. printer_name: str,
  1235. filename: str,
  1236. total_layers: int,
  1237. db: AsyncSession,
  1238. image_data: bytes | None = None,
  1239. ):
  1240. """Handle first layer complete event."""
  1241. providers = await self._get_providers_for_event(db, "on_first_layer_complete", printer_id)
  1242. if not providers:
  1243. return
  1244. variables = {
  1245. "printer": printer_name,
  1246. "filename": self._clean_filename(filename),
  1247. "total_layers": str(total_layers),
  1248. }
  1249. title, message = await self._build_message_from_template(db, "first_layer_complete", variables)
  1250. await self._send_to_providers(
  1251. providers,
  1252. title,
  1253. message,
  1254. db,
  1255. "first_layer_complete",
  1256. printer_id,
  1257. printer_name,
  1258. image_data=image_data,
  1259. variables=variables,
  1260. )
  1261. def clear_template_cache(self):
  1262. """Clear the template cache. Call this when templates are updated."""
  1263. self._template_cache.clear()
  1264. async def send_user_print_email(
  1265. self,
  1266. event_type: str,
  1267. created_by_id: int | None,
  1268. printer_name: str,
  1269. filename: str,
  1270. db: AsyncSession,
  1271. ) -> None:
  1272. """Send a print event email notification to the user who submitted the job.
  1273. Args:
  1274. event_type: 'user_print_start', 'user_print_complete', 'user_print_failed', or 'user_print_stopped'
  1275. created_by_id: User ID who submitted the print job (from archive)
  1276. printer_name: Name of the printer
  1277. filename: Raw filename or subtask name
  1278. db: Database session
  1279. """
  1280. if created_by_id is None:
  1281. logger.debug("[EMAIL] Skipping user print email (%s): no created_by_id", event_type)
  1282. return
  1283. try:
  1284. # Check if advanced auth is enabled - required for user email notifications
  1285. from backend.app.models.settings import Settings
  1286. result = await db.execute(select(Settings).where(Settings.key == "advanced_auth_enabled"))
  1287. setting = result.scalar_one_or_none()
  1288. if not setting or setting.value.lower() != "true":
  1289. logger.debug("[EMAIL] Skipping user print email (%s): advanced_auth not enabled", event_type)
  1290. return
  1291. # Check if user notifications are enabled (admin-controlled toggle)
  1292. notif_enabled_result = await db.execute(
  1293. select(Settings).where(Settings.key == "user_notifications_enabled")
  1294. )
  1295. notif_enabled_setting = notif_enabled_result.scalar_one_or_none()
  1296. if notif_enabled_setting and notif_enabled_setting.value.lower() == "false":
  1297. logger.debug("[EMAIL] Skipping user print email (%s): user_notifications_enabled is false", event_type)
  1298. return
  1299. # Check SMTP settings are configured - required for sending emails
  1300. from backend.app.services.email_service import get_smtp_settings, send_user_print_notification
  1301. smtp_settings = await get_smtp_settings(db)
  1302. if not smtp_settings:
  1303. logger.debug("[EMAIL] Skipping user print email (%s): SMTP settings not configured", event_type)
  1304. return
  1305. # Load user preferences
  1306. from backend.app.models.user import User
  1307. from backend.app.models.user_email_pref import UserEmailPreference
  1308. user_result = await db.execute(select(User).where(User.id == created_by_id))
  1309. user = user_result.scalar_one_or_none()
  1310. if user is None or not user.email:
  1311. logger.debug(
  1312. "[EMAIL] Skipping user print email (%s): user %s not found or has no email address",
  1313. event_type,
  1314. created_by_id,
  1315. )
  1316. return
  1317. # Load user's notification preferences
  1318. pref_result = await db.execute(
  1319. select(UserEmailPreference).where(UserEmailPreference.user_id == created_by_id)
  1320. )
  1321. pref = pref_result.scalar_one_or_none()
  1322. # Determine if this event type should be sent
  1323. should_send = False
  1324. if event_type == "user_print_start":
  1325. should_send = pref is None or pref.notify_print_start
  1326. elif event_type == "user_print_complete":
  1327. should_send = pref is None or pref.notify_print_complete
  1328. elif event_type == "user_print_failed":
  1329. should_send = pref is None or pref.notify_print_failed
  1330. elif event_type == "user_print_stopped":
  1331. should_send = pref is None or pref.notify_print_stopped
  1332. if not should_send:
  1333. logger.debug(
  1334. "[EMAIL] Skipping user print email (%s): user %s has notifications disabled for this event",
  1335. event_type,
  1336. created_by_id,
  1337. )
  1338. return
  1339. logger.info(
  1340. "[EMAIL] Sending user print email: event=%s, user=%s (%s), printer=%s, file=%s",
  1341. event_type,
  1342. user.username,
  1343. user.email,
  1344. printer_name,
  1345. filename,
  1346. )
  1347. # Build variables
  1348. variables = {
  1349. "printer": printer_name,
  1350. "filename": self._clean_filename(filename),
  1351. }
  1352. # Send the email
  1353. await send_user_print_notification(
  1354. db=db,
  1355. event_type=event_type,
  1356. user_email=user.email,
  1357. username=user.username,
  1358. variables=variables,
  1359. )
  1360. logger.info("[EMAIL] User print email sent: event=%s → %s", event_type, user.email)
  1361. except Exception as e:
  1362. logger.warning("Failed to send user print email notification: %s", e, exc_info=True)
  1363. # ==================== Queue Notifications ====================
  1364. async def on_queue_job_added(
  1365. self,
  1366. job_name: str,
  1367. target: str,
  1368. db: AsyncSession,
  1369. printer_id: int | None = None,
  1370. printer_name: str | None = None,
  1371. ):
  1372. """Handle queue job added event."""
  1373. providers = await self._get_providers_for_event(db, "on_queue_job_added", printer_id)
  1374. if not providers:
  1375. return
  1376. variables = {
  1377. "job_name": job_name,
  1378. "target": target, # e.g., "Printer1" or "Any X1C"
  1379. "printer": printer_name or target,
  1380. }
  1381. title, message = await self._build_message_from_template(db, "queue_job_added", variables)
  1382. await self._send_to_providers(
  1383. providers, title, message, db, "queue_job_added", printer_id, printer_name, variables=variables
  1384. )
  1385. async def on_queue_job_assigned(
  1386. self,
  1387. job_name: str,
  1388. printer_id: int,
  1389. printer_name: str,
  1390. target_model: str,
  1391. db: AsyncSession,
  1392. ):
  1393. """Handle model-based job assigned to printer event."""
  1394. providers = await self._get_providers_for_event(db, "on_queue_job_assigned", printer_id)
  1395. if not providers:
  1396. return
  1397. variables = {
  1398. "job_name": job_name,
  1399. "printer": printer_name,
  1400. "target_model": target_model,
  1401. }
  1402. title, message = await self._build_message_from_template(db, "queue_job_assigned", variables)
  1403. await self._send_to_providers(
  1404. providers, title, message, db, "queue_job_assigned", printer_id, printer_name, variables=variables
  1405. )
  1406. async def on_queue_job_started(
  1407. self,
  1408. job_name: str,
  1409. printer_id: int,
  1410. printer_name: str,
  1411. db: AsyncSession,
  1412. estimated_time: int | None = None,
  1413. ):
  1414. """Handle queue job started printing event."""
  1415. providers = await self._get_providers_for_event(db, "on_queue_job_started", printer_id)
  1416. if not providers:
  1417. return
  1418. eta_str = await self._format_eta(estimated_time, db)
  1419. variables = {
  1420. "job_name": job_name,
  1421. "printer": printer_name,
  1422. "estimated_time": self._format_duration(estimated_time),
  1423. "eta": eta_str,
  1424. }
  1425. title, message = await self._build_message_from_template(db, "queue_job_started", variables)
  1426. await self._send_to_providers(
  1427. providers, title, message, db, "queue_job_started", printer_id, printer_name, variables=variables
  1428. )
  1429. async def on_queue_job_waiting(
  1430. self,
  1431. job_name: str,
  1432. target_model: str,
  1433. waiting_reason: str,
  1434. db: AsyncSession,
  1435. ):
  1436. """Handle job waiting for filament event."""
  1437. providers = await self._get_providers_for_event(db, "on_queue_job_waiting", None)
  1438. if not providers:
  1439. return
  1440. variables = {
  1441. "job_name": job_name,
  1442. "target_model": target_model,
  1443. "waiting_reason": waiting_reason,
  1444. }
  1445. title, message = await self._build_message_from_template(db, "queue_job_waiting", variables)
  1446. await self._send_to_providers(providers, title, message, db, "queue_job_waiting", variables=variables)
  1447. async def on_queue_job_skipped(
  1448. self,
  1449. job_name: str,
  1450. printer_id: int,
  1451. printer_name: str,
  1452. reason: str,
  1453. db: AsyncSession,
  1454. ):
  1455. """Handle job skipped event (e.g., previous print failed)."""
  1456. providers = await self._get_providers_for_event(db, "on_queue_job_skipped", printer_id)
  1457. if not providers:
  1458. return
  1459. variables = {
  1460. "job_name": job_name,
  1461. "printer": printer_name,
  1462. "reason": reason,
  1463. }
  1464. title, message = await self._build_message_from_template(db, "queue_job_skipped", variables)
  1465. await self._send_to_providers(
  1466. providers, title, message, db, "queue_job_skipped", printer_id, printer_name, variables=variables
  1467. )
  1468. async def on_queue_job_failed(
  1469. self,
  1470. job_name: str,
  1471. printer_id: int | None,
  1472. printer_name: str | None,
  1473. reason: str,
  1474. db: AsyncSession,
  1475. ):
  1476. """Handle job failed to start event (upload error, etc.)."""
  1477. providers = await self._get_providers_for_event(db, "on_queue_job_failed", printer_id)
  1478. if not providers:
  1479. return
  1480. variables = {
  1481. "job_name": job_name,
  1482. "printer": printer_name or "Unknown",
  1483. "reason": reason,
  1484. }
  1485. title, message = await self._build_message_from_template(db, "queue_job_failed", variables)
  1486. await self._send_to_providers(
  1487. providers, title, message, db, "queue_job_failed", printer_id, printer_name, variables=variables
  1488. )
  1489. async def on_queue_completed(
  1490. self,
  1491. completed_count: int,
  1492. db: AsyncSession,
  1493. ):
  1494. """Handle all queue jobs completed event."""
  1495. providers = await self._get_providers_for_event(db, "on_queue_completed", None)
  1496. if not providers:
  1497. return
  1498. variables = {
  1499. "completed_count": str(completed_count),
  1500. }
  1501. title, message = await self._build_message_from_template(db, "queue_completed", variables)
  1502. await self._send_to_providers(providers, title, message, db, "queue_completed", variables=variables)
  1503. # ==================== Inventory Stock Alerts ====================
  1504. async def on_stock_reorder_alert(
  1505. self,
  1506. material: str,
  1507. brand: str | None,
  1508. stock_g: float,
  1509. rate_g_day: float,
  1510. days_left: int,
  1511. db: AsyncSession,
  1512. ):
  1513. """Fire when an inventory SKU reaches its reorder point."""
  1514. providers = await self._get_providers_for_event(db, "on_stock_reorder_alert", None)
  1515. if not providers:
  1516. return
  1517. variables = {
  1518. "material": material,
  1519. "brand": brand or "",
  1520. "stock_g": f"{stock_g:.0f}",
  1521. "rate_g_day": f"{rate_g_day:.1f}",
  1522. "days_left": str(days_left),
  1523. }
  1524. title, message = await self._build_message_from_template(db, "stock_reorder_alert", variables)
  1525. await self._send_to_providers(providers, title, message, db, "stock_reorder_alert", variables=variables)
  1526. async def on_stock_break_alert(
  1527. self,
  1528. material: str,
  1529. brand: str | None,
  1530. stock_g: float,
  1531. rate_g_day: float,
  1532. days_left: int,
  1533. lead_time_days: int,
  1534. db: AsyncSession,
  1535. ):
  1536. """Fire when a stock break is detected (stock runs out before lead time)."""
  1537. providers = await self._get_providers_for_event(db, "on_stock_break_alert", None)
  1538. if not providers:
  1539. return
  1540. variables = {
  1541. "material": material,
  1542. "brand": brand or "",
  1543. "stock_g": f"{stock_g:.0f}",
  1544. "rate_g_day": f"{rate_g_day:.1f}",
  1545. "days_left": str(days_left),
  1546. "lead_time_days": str(lead_time_days),
  1547. }
  1548. title, message = await self._build_message_from_template(db, "stock_break_alert", variables)
  1549. await self._send_to_providers(providers, title, message, db, "stock_break_alert", variables=variables)
  1550. async def _queue_for_digest(
  1551. self,
  1552. provider: NotificationProvider,
  1553. event_type: str,
  1554. title: str,
  1555. message: str,
  1556. db: AsyncSession,
  1557. printer_id: int | None = None,
  1558. printer_name: str | None = None,
  1559. ):
  1560. """Queue a notification for later delivery in the daily digest."""
  1561. try:
  1562. queue_entry = NotificationDigestQueue(
  1563. provider_id=provider.id,
  1564. event_type=event_type,
  1565. title=title,
  1566. message=message,
  1567. printer_id=printer_id,
  1568. printer_name=printer_name,
  1569. )
  1570. db.add(queue_entry)
  1571. await db.commit()
  1572. logger.info("Queued notification for digest: %s for provider %s", event_type, provider.name)
  1573. except Exception as e:
  1574. logger.warning("Failed to queue notification for digest: %s", e)
  1575. async def send_digest(self, provider_id: int):
  1576. """Send all queued notifications as a single digest for a provider."""
  1577. from backend.app.core.database import async_session
  1578. async with async_session() as db:
  1579. # Get the provider
  1580. result = await db.execute(select(NotificationProvider).where(NotificationProvider.id == provider_id))
  1581. provider = result.scalar_one_or_none()
  1582. if not provider or not provider.enabled:
  1583. return
  1584. # Get all queued notifications for this provider
  1585. result = await db.execute(
  1586. select(NotificationDigestQueue)
  1587. .where(NotificationDigestQueue.provider_id == provider_id)
  1588. .order_by(NotificationDigestQueue.created_at)
  1589. )
  1590. queue_entries = list(result.scalars().all())
  1591. if not queue_entries:
  1592. logger.debug("No queued notifications for provider %s", provider.name)
  1593. return
  1594. # Build digest message
  1595. title = f"Daily Digest - {len(queue_entries)} Events"
  1596. # Group by event type
  1597. events_by_type: dict[str, list] = {}
  1598. for entry in queue_entries:
  1599. if entry.event_type not in events_by_type:
  1600. events_by_type[entry.event_type] = []
  1601. events_by_type[entry.event_type].append(entry)
  1602. # Format the digest body
  1603. body_parts = []
  1604. for event_type, entries in events_by_type.items():
  1605. event_label = event_type.replace("_", " ").title()
  1606. body_parts.append(f"== {event_label} ({len(entries)}) ==")
  1607. for entry in entries:
  1608. time_str = entry.created_at.strftime("%H:%M")
  1609. printer_info = f"[{entry.printer_name}] " if entry.printer_name else ""
  1610. body_parts.append(f" {time_str} {printer_info}{entry.title}")
  1611. body_parts.append("")
  1612. body = "\n".join(body_parts)
  1613. # Send the digest
  1614. success, error = await self._send_to_provider(provider, title, body, db)
  1615. # Log the digest
  1616. await self._log_notification(
  1617. db=db,
  1618. provider_id=provider.id,
  1619. event_type="daily_digest",
  1620. title=title,
  1621. message=body,
  1622. success=success,
  1623. error_message=error if not success else None,
  1624. )
  1625. # Clear the queue
  1626. for entry in queue_entries:
  1627. await db.delete(entry)
  1628. await db.commit()
  1629. if success:
  1630. logger.info("Sent daily digest with %s events to %s", len(queue_entries), provider.name)
  1631. else:
  1632. logger.warning("Failed to send daily digest to %s: %s", provider.name, error)
  1633. async def check_and_send_digests(self):
  1634. """Check all providers and send digests if it's their scheduled time."""
  1635. from backend.app.core.database import async_session
  1636. current_time = datetime.now().strftime("%H:%M")
  1637. # Avoid duplicate checks within the same minute
  1638. if current_time == self._last_digest_check:
  1639. return
  1640. self._last_digest_check = current_time
  1641. async with async_session() as db:
  1642. # Find all providers with digest enabled at this time
  1643. result = await db.execute(
  1644. select(NotificationProvider).where(
  1645. NotificationProvider.enabled.is_(True),
  1646. NotificationProvider.daily_digest_enabled.is_(True),
  1647. NotificationProvider.daily_digest_time == current_time,
  1648. )
  1649. )
  1650. providers = result.scalars().all()
  1651. for provider in providers:
  1652. try:
  1653. await self.send_digest(provider.id)
  1654. except Exception as e:
  1655. logger.error("Error sending digest for provider %s: %s", provider.id, e)
  1656. def start_digest_scheduler(self):
  1657. """Start the background scheduler for daily digest notifications."""
  1658. if self._digest_scheduler_task is None:
  1659. self._digest_scheduler_task = asyncio.create_task(self._digest_scheduler_loop())
  1660. logger.info("Notification digest scheduler started")
  1661. def stop_digest_scheduler(self):
  1662. """Stop the background scheduler for daily digests."""
  1663. if self._digest_scheduler_task:
  1664. self._digest_scheduler_task.cancel()
  1665. self._digest_scheduler_task = None
  1666. logger.info("Notification digest scheduler stopped")
  1667. async def _digest_scheduler_loop(self):
  1668. """Background loop that checks for scheduled digests every minute."""
  1669. while True:
  1670. try:
  1671. await self.check_and_send_digests()
  1672. except Exception as e:
  1673. logger.error("Error in digest scheduler: %s", e)
  1674. # Wait until the next minute
  1675. await asyncio.sleep(60)
  1676. # Global instance
  1677. notification_service = NotificationService()