| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816 |
- """Notification service for sending push notifications via various providers."""
- import asyncio
- import json
- import logging
- import re
- import smtplib
- from datetime import datetime, timedelta, timezone
- from email.mime.multipart import MIMEMultipart
- from email.mime.text import MIMEText
- from typing import Any
- from urllib.parse import quote
- import httpx
- from sqlalchemy import select
- from sqlalchemy.ext.asyncio import AsyncSession
- from backend.app.models.notification import NotificationDigestQueue, NotificationLog, NotificationProvider
- from backend.app.models.notification_template import NotificationTemplate
- logger = logging.getLogger(__name__)
- class NotificationService:
- """Service for sending notifications through various providers."""
- def __init__(self):
- self._http_client: httpx.AsyncClient | None = None
- self._template_cache: dict[str, NotificationTemplate] = {}
- self._digest_scheduler_task: asyncio.Task | None = None
- self._last_digest_check: str = "" # "HH:MM" to avoid duplicate checks
- async def _get_client(self) -> httpx.AsyncClient:
- """Get or create HTTP client."""
- if self._http_client is None or self._http_client.is_closed:
- self._http_client = httpx.AsyncClient(timeout=30.0)
- return self._http_client
- async def close(self):
- """Close HTTP client."""
- if self._http_client and not self._http_client.is_closed:
- await self._http_client.aclose()
- def _is_in_quiet_hours(self, provider: NotificationProvider) -> bool:
- """Check if current time is within provider's quiet hours."""
- if not provider.quiet_hours_enabled:
- return False
- if not provider.quiet_hours_start or not provider.quiet_hours_end:
- return False
- try:
- now = datetime.now()
- current_time = now.hour * 60 + now.minute
- start_parts = provider.quiet_hours_start.split(":")
- end_parts = provider.quiet_hours_end.split(":")
- start_minutes = int(start_parts[0]) * 60 + int(start_parts[1])
- end_minutes = int(end_parts[0]) * 60 + int(end_parts[1])
- # Handle overnight quiet hours (e.g., 22:00 to 07:00)
- if start_minutes > end_minutes:
- # Quiet hours span midnight
- return current_time >= start_minutes or current_time < end_minutes
- else:
- # Same day quiet hours
- return start_minutes <= current_time < end_minutes
- except (ValueError, TypeError, AttributeError):
- logger.warning("Invalid quiet hours format for provider %s", provider.name)
- return False
- async def _get_template(self, db: AsyncSession, event_type: str) -> NotificationTemplate | None:
- """Get a notification template by event type."""
- # Check cache first
- if event_type in self._template_cache:
- return self._template_cache[event_type]
- result = await db.execute(select(NotificationTemplate).where(NotificationTemplate.event_type == event_type))
- template = result.scalar_one_or_none()
- if template:
- self._template_cache[event_type] = template
- return template
- def _render_template(self, template_str: str, variables: dict[str, Any]) -> str:
- """Render a template string with variables. Missing variables become empty."""
- result = template_str
- for key, value in variables.items():
- result = result.replace("{" + key + "}", str(value) if value is not None else "")
- # Remove any remaining unreplaced placeholders
- result = re.sub(r"\{[a-z_]+\}", "", result)
- return result
- async def _format_eta(self, seconds: int | None, db: AsyncSession) -> str:
- """Format ETA as wall-clock time, respecting user's time_format setting."""
- if not seconds or seconds <= 0:
- return "Unknown"
- from backend.app.api.routes.settings import get_setting
- eta_time = datetime.now() + timedelta(seconds=seconds)
- time_format = await get_setting(db, "time_format")
- if time_format == "12h":
- return eta_time.strftime("%I:%M %p").lstrip("0")
- # Default to 24h for "24h", "system", or unset
- return eta_time.strftime("%H:%M")
- def _format_duration(self, seconds: int | None) -> str:
- """Format duration in seconds to human-readable string."""
- if seconds is None:
- return "Unknown"
- hours = seconds // 3600
- minutes = (seconds % 3600) // 60
- if hours > 0:
- return f"{hours}h {minutes}m"
- return f"{minutes}m"
- def _clean_filename(self, filename: str) -> str:
- """Extract filename and remove file extensions."""
- import os
- # Strip path prefix (e.g., /data/Metadata/plate_5.gcode -> plate_5.gcode)
- filename = os.path.basename(filename)
- # Remove common extensions
- if filename.endswith(".gcode.3mf"):
- return filename[:-10]
- elif filename.endswith(".gcode"):
- return filename[:-6]
- elif filename.endswith(".3mf"):
- return filename[:-4]
- return filename
- async def _build_message_from_template(
- self, db: AsyncSession, event_type: str, variables: dict[str, Any]
- ) -> tuple[str, str]:
- """Build notification title and body from template."""
- # Add common variables
- variables["timestamp"] = datetime.now().strftime("%Y-%m-%d %H:%M")
- variables["app_name"] = "Bambuddy"
- template = await self._get_template(db, event_type)
- if not template:
- # Fallback to simple message
- logger.warning("Template not found for event type: %s", event_type)
- return event_type.replace("_", " ").title(), str(variables)
- title = self._render_template(template.title_template, variables)
- body = self._render_template(template.body_template, variables)
- return title, body
- async def send_test_notification(
- self, provider_type: str, config: dict[str, Any], db: AsyncSession | None = None
- ) -> tuple[bool, str]:
- """Send a test notification to verify configuration."""
- if db:
- title, message = await self._build_message_from_template(db, "test", {})
- else:
- title = "Bambuddy Test"
- message = "This is a test notification. If you see this, notifications are working!"
- try:
- if provider_type == "callmebot":
- return await self._send_callmebot(config, f"{title}\n{message}")
- elif provider_type == "ntfy":
- return await self._send_ntfy(config, title, message)
- elif provider_type == "pushover":
- return await self._send_pushover(config, title, message)
- elif provider_type == "telegram":
- return await self._send_telegram(config, f"*{title}*\n{message}")
- elif provider_type == "email":
- return await self._send_email(config, title, message)
- elif provider_type == "discord":
- return await self._send_discord(config, title, message)
- elif provider_type == "webhook":
- return await self._send_webhook(config, title, message)
- elif provider_type == "homeassistant":
- return await self._send_homeassistant(config, title, message, db=db)
- else:
- return False, f"Unknown provider type: {provider_type}"
- except Exception as e:
- logger.exception("Error sending test notification via %s", provider_type)
- return False, str(e)
- async def _send_callmebot(self, config: dict, message: str) -> tuple[bool, str]:
- """Send notification via CallMeBot (WhatsApp)."""
- phone = config.get("phone", "").strip()
- apikey = config.get("apikey", "").strip()
- if not phone or not apikey:
- return False, "Phone number and API key are required"
- # URL encode the message
- encoded_message = quote(message)
- url = f"https://api.callmebot.com/whatsapp.php?phone={phone}&text={encoded_message}&apikey={apikey}"
- client = await self._get_client()
- response = await client.get(url)
- if response.status_code == 200:
- return True, "Message sent successfully"
- else:
- return False, f"HTTP {response.status_code}: {response.text[:200]}"
- async def _send_ntfy(
- self,
- config: dict,
- title: str,
- message: str,
- image_data: bytes | None = None,
- event_type: str | None = None,
- ) -> tuple[bool, str]:
- """Send notification via ntfy."""
- server = config.get("server", "https://ntfy.sh").rstrip("/")
- topic = config.get("topic", "").strip()
- auth_token = config.get("auth_token", "").strip()
- if not topic:
- return False, "Topic is required"
- url = f"{server}/{topic}"
- # ntfy reads Title/Message from HTTP headers. httpx enforces ASCII
- # for str header values, but printer names and filenames can contain
- # non-ASCII characters (e.g. accented letters, CJK). Passing bytes
- # bypasses the ASCII check — ntfy handles UTF-8 headers correctly.
- headers: dict[str, str | bytes] = {"Title": title.encode("utf-8")}
- # Per-event Priority header (#990). Only set when the user has
- # explicitly mapped this event to a 1-5 value; otherwise fall through
- # to the ntfy server's default so existing setups stay unchanged.
- event_priorities = config.get("event_priorities") or {}
- if event_type and isinstance(event_priorities, dict):
- raw = event_priorities.get(event_type)
- try:
- priority = int(raw) if raw is not None else None
- except (TypeError, ValueError):
- priority = None
- if priority is not None and 1 <= priority <= 5:
- headers["Priority"] = str(priority)
- if auth_token:
- headers["Authorization"] = f"Bearer {auth_token}"
- client = await self._get_client()
- if image_data:
- # ntfy supports image attachments via multipart form-data.
- # HTTP headers cannot contain newlines, but ntfy interprets
- # literal \n (backslash-n) as newlines in the Message header.
- headers["Filename"] = "photo.jpg"
- headers["Message"] = message.replace("\n", "\\n").encode("utf-8")
- response = await client.put(url, content=image_data, headers=headers)
- if response.status_code == 400 and "attachments not allowed" in response.text:
- # Server has attachments disabled — retry without the image
- headers.pop("Filename", None)
- headers.pop("Message", None)
- response = await client.post(url, content=message.encode("utf-8"), headers=headers)
- else:
- response = await client.post(url, content=message.encode("utf-8"), headers=headers)
- if response.status_code in (200, 204):
- return True, "Message sent successfully"
- else:
- return False, f"HTTP {response.status_code}: {response.text[:200]}"
- async def _send_pushover(
- self, config: dict, title: str, message: str, image_data: bytes | None = None
- ) -> tuple[bool, str]:
- """Send notification via Pushover.
- Args:
- config: Provider configuration with user_key, app_token, priority
- title: Notification title
- message: Notification body
- image_data: Optional JPEG image bytes to attach (max 2.5MB)
- """
- user_key = config.get("user_key", "").strip()
- app_token = config.get("app_token", "").strip()
- priority = config.get("priority", 0)
- if not user_key or not app_token:
- return False, "User key and app token are required"
- url = "https://api.pushover.net/1/messages.json"
- data = {
- "token": app_token,
- "user": user_key,
- "title": title,
- "message": message,
- "priority": priority,
- }
- client = await self._get_client()
- if image_data:
- # Pushover supports image attachments via multipart form-data
- files = {"attachment": ("photo.jpg", image_data, "image/jpeg")}
- response = await client.post(url, data=data, files=files)
- else:
- response = await client.post(url, data=data)
- if response.status_code == 200:
- return True, "Message sent successfully"
- else:
- try:
- error_data = response.json()
- errors = error_data.get("errors", [])
- return False, f"Pushover error: {', '.join(errors)}"
- except Exception:
- return False, f"HTTP {response.status_code}: {response.text[:200]}"
- async def _send_telegram(self, config: dict, message: str, image_data: bytes | None = None) -> tuple[bool, str]:
- """Send notification via Telegram bot."""
- bot_token = config.get("bot_token", "").strip()
- chat_id = config.get("chat_id", "").strip()
- if not bot_token or not chat_id:
- return False, "Bot token and chat ID are required"
- # Escape underscores in the message body so Telegram Markdown
- # parsing doesn't break on job names like "A1_plate_8" or error
- # codes like "0300_0001". The title is already wrapped in *bold*
- # markers, so only escape after the first newline.
- if "\n" in message:
- title_part, body_part = message.split("\n", 1)
- body_part = body_part.replace("_", "\\_")
- message = f"{title_part}\n{body_part}"
- client = await self._get_client()
- if image_data:
- # Use sendPhoto to attach the thumbnail with the caption
- url = f"https://api.telegram.org/bot{bot_token}/sendPhoto"
- response = await client.post(
- url,
- data={"chat_id": chat_id, "caption": message, "parse_mode": "Markdown"},
- files={"photo": ("photo.jpg", image_data, "image/jpeg")},
- )
- else:
- url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
- data = {
- "chat_id": chat_id,
- "text": message,
- "parse_mode": "Markdown",
- }
- response = await client.post(url, json=data)
- if response.status_code == 200:
- result = response.json()
- if result.get("ok"):
- return True, "Message sent successfully"
- else:
- return False, f"Telegram error: {result.get('description', 'Unknown error')}"
- else:
- return False, f"HTTP {response.status_code}: {response.text[:200]}"
- async def _send_email(self, config: dict, subject: str, body: str) -> tuple[bool, str]:
- """Send notification via email (SMTP)."""
- smtp_server = config.get("smtp_server", "").strip()
- smtp_port = int(config.get("smtp_port", 587))
- username = config.get("username", "").strip()
- password = config.get("password", "").strip()
- from_email = config.get("from_email", "").strip()
- to_email = config.get("to_email", "").strip()
- # Security: "starttls" (port 587), "ssl" (port 465), "none" (port 25)
- security = config.get("security", "starttls")
- # Authentication: "true" or "false"
- auth_enabled = config.get("auth_enabled", "true").lower() == "true"
- if not all([smtp_server, from_email, to_email]):
- return False, "SMTP server, from email, and to email are required"
- if auth_enabled and not all([username, password]):
- return False, "Username and password are required when authentication is enabled"
- try:
- msg = MIMEMultipart()
- msg["From"] = from_email
- msg["To"] = to_email
- msg["Subject"] = f"[Bambuddy] {subject}"
- msg.attach(MIMEText(body, "plain"))
- if security == "ssl":
- # Direct SSL connection (typically port 465)
- server = smtplib.SMTP_SSL(smtp_server, smtp_port)
- elif security == "starttls":
- # STARTTLS upgrade (typically port 587)
- server = smtplib.SMTP(smtp_server, smtp_port)
- server.starttls()
- else:
- # No encryption (typically port 25) - use with caution
- server = smtplib.SMTP(smtp_server, smtp_port)
- if auth_enabled:
- server.login(username, password)
- server.sendmail(from_email, to_email, msg.as_string())
- server.quit()
- return True, "Email sent successfully"
- except smtplib.SMTPAuthenticationError:
- return False, "SMTP authentication failed - check username/password"
- except smtplib.SMTPException as e:
- return False, f"SMTP error: {str(e)}"
- except Exception as e:
- return False, f"Email error: {str(e)}"
- async def _send_discord(
- self, config: dict, title: str, message: str, image_data: bytes | None = None
- ) -> tuple[bool, str]:
- """Send notification via Discord webhook."""
- webhook_url = config.get("webhook_url", "").strip()
- if not webhook_url:
- return False, "Webhook URL is required"
- if not webhook_url.startswith("https://discord.com/api/webhooks/"):
- return False, "Invalid Discord webhook URL"
- # Discord embed format for nicer messages
- embed = {
- "title": title,
- "description": message,
- "color": 0x00AE42, # Bambu green
- }
- client = await self._get_client()
- if image_data:
- # Attach image via multipart form-data and reference in embed
- embed["image"] = {"url": "attachment://photo.jpg"}
- payload = {"embeds": [embed]}
- response = await client.post(
- webhook_url,
- data={"payload_json": json.dumps(payload)},
- files={"files[0]": ("photo.jpg", image_data, "image/jpeg")},
- )
- else:
- response = await client.post(webhook_url, json={"embeds": [embed]})
- if response.status_code in (200, 204):
- return True, "Message sent successfully"
- else:
- return False, f"HTTP {response.status_code}: {response.text[:200]}"
- async def _send_webhook(
- self,
- config: dict,
- title: str,
- message: str,
- image_data: bytes | None = None,
- event_type: str | None = None,
- variables: dict | None = None,
- ) -> tuple[bool, str]:
- """Send notification via generic webhook (POST JSON).
- Supports two payload formats:
- - generic: Custom field names with timestamp/source metadata + structured event data
- - slack: Slack/Mattermost compatible format (just {"text": "..."})
- """
- webhook_url = config.get("webhook_url", "").strip()
- auth_header = config.get("auth_header", "").strip()
- payload_format = config.get("payload_format", "generic").strip()
- if not webhook_url:
- return False, "Webhook URL is required"
- # Build payload based on format
- if payload_format == "slack":
- # Slack/Mattermost format - just text field
- data = {"text": f"*{title}*\n{message}"}
- else:
- # Generic format with custom field names
- custom_field_title = config.get("field_title", "title").strip() or "title"
- custom_field_message = config.get("field_message", "message").strip() or "message"
- data = {
- custom_field_title: title,
- custom_field_message: message,
- "timestamp": datetime.now().isoformat(),
- "source": "Bambuddy",
- }
- # For generic format, include structured event data for automation tools
- if payload_format != "slack":
- if event_type:
- data["event"] = event_type
- if variables:
- for key, value in variables.items():
- if key not in data: # Don't overwrite title/message/timestamp/source
- data[key] = value
- # Attach base64-encoded image when available (generic format only)
- if image_data and payload_format != "slack":
- import base64
- data["image"] = base64.b64encode(image_data).decode("ascii")
- headers = {"Content-Type": "application/json"}
- if auth_header:
- # Support "Bearer token" or just "token" format
- if " " in auth_header:
- headers["Authorization"] = auth_header
- else:
- headers["Authorization"] = f"Bearer {auth_header}"
- client = await self._get_client()
- try:
- response = await client.post(webhook_url, json=data, headers=headers)
- if response.status_code in (200, 201, 202, 204):
- return True, "Webhook delivered successfully"
- else:
- return False, f"HTTP {response.status_code}: {response.text[:200]}"
- except Exception as e:
- return False, f"Webhook error: {str(e)}"
- async def _send_homeassistant(
- self, config: dict, title: str, message: str, db: AsyncSession | None = None
- ) -> tuple[bool, str]:
- """Send notification via Home Assistant.
- Uses the globally configured HA URL/token from settings.
- Defaults to persistent_notification/create, but supports
- custom services via config["service"] (e.g. notify.mobile_app_myphone).
- """
- # Get HA connection settings from global config
- ha_url = ""
- ha_token = ""
- if db:
- from backend.app.api.routes.settings import get_homeassistant_settings
- try:
- ha_settings = await get_homeassistant_settings(db)
- ha_url = ha_settings.get("ha_url", "")
- ha_token = ha_settings.get("ha_token", "")
- except Exception as e:
- logger.warning("Failed to read HA settings from database: %s", e)
- else:
- # Fallback: read directly from environment if no DB session
- import os
- ha_url = os.environ.get("HA_URL", "")
- ha_token = os.environ.get("HA_TOKEN", "")
- if not ha_url or not ha_token:
- return False, (
- "Home Assistant is not configured. Please set HA URL and token in Settings → Network → Home Assistant."
- )
- # Determine which HA service to call - Default: persistent_notification.create
- service = (config.get("service") or "").strip()
- if service:
- # Allow in different forms:
- # - notify.mobile_app_<device>
- # - notify/mobile_app_<device>
- # - api/services/notify/mobile_app_<device>
- service_str = service.lstrip("/")
- if service_str.startswith("api/services/"):
- endpoint = service_str
- elif "/" in service_str:
- endpoint = f"api/services/{service_str}"
- elif "." in service_str:
- domain, svc = service_str.split(".", 1)
- endpoint = f"api/services/{domain}/{svc}"
- else:
- return False, (
- "Invalid Home Assistant service name. Use e.g. 'notify.mobile_app_yourdevice' or 'notify/your_service'."
- )
- if not re.match(r"^api/services/[a-zA-Z0-9_]+/[a-zA-Z0-9_]+$", endpoint):
- return False, (
- "Invalid Home Assistant service name. Domain and service must only contain letters, numbers, and underscores."
- )
- else:
- endpoint = "api/services/persistent_notification/create"
- url = f"{ha_url.rstrip('/')}/{endpoint}"
- headers = {
- "Authorization": f"Bearer {ha_token}",
- "Content-Type": "application/json",
- }
- payload = {
- "title": title,
- "message": message,
- }
- client = await self._get_client()
- response = await client.post(url, json=payload, headers=headers)
- if response.status_code in (200, 201):
- return True, "Notification sent via Home Assistant"
- elif response.status_code == 401:
- return False, "Home Assistant authentication failed - check your token"
- else:
- return False, f"HTTP {response.status_code}: {response.text[:200]}"
- async def _send_to_provider(
- self,
- provider: NotificationProvider,
- title: str,
- message: str,
- db: AsyncSession | None = None,
- image_data: bytes | None = None,
- event_type: str | None = None,
- variables: dict | None = None,
- ) -> tuple[bool, str]:
- """Send notification to a specific provider."""
- # Check quiet hours
- if self._is_in_quiet_hours(provider):
- logger.info("Skipping notification to %s - quiet hours active", provider.name)
- return True, "Skipped - quiet hours"
- config = json.loads(provider.config) if isinstance(provider.config, str) else provider.config
- try:
- if provider.provider_type == "callmebot":
- return await self._send_callmebot(config, f"{title}\n{message}")
- elif provider.provider_type == "ntfy":
- return await self._send_ntfy(config, title, message, image_data=image_data, event_type=event_type)
- elif provider.provider_type == "pushover":
- return await self._send_pushover(config, title, message, image_data=image_data)
- elif provider.provider_type == "telegram":
- return await self._send_telegram(config, f"*{title}*\n{message}", image_data=image_data)
- elif provider.provider_type == "email":
- return await self._send_email(config, title, message)
- elif provider.provider_type == "discord":
- return await self._send_discord(config, title, message, image_data=image_data)
- elif provider.provider_type == "webhook":
- return await self._send_webhook(
- config, title, message, image_data=image_data, event_type=event_type, variables=variables
- )
- elif provider.provider_type == "homeassistant":
- return await self._send_homeassistant(config, title, message, db=db)
- else:
- return False, f"Unknown provider type: {provider.provider_type}"
- except Exception as e:
- logger.exception("Error sending notification via %s", provider.provider_type)
- return False, str(e)
- async def _update_provider_status(
- self, db: AsyncSession, provider_id: int, success: bool, error: str | None = None
- ):
- """Update provider status after sending notification."""
- result = await db.execute(select(NotificationProvider).where(NotificationProvider.id == provider_id))
- provider = result.scalar_one_or_none()
- if provider:
- if success:
- provider.last_success = datetime.now(timezone.utc)
- else:
- provider.last_error = error
- provider.last_error_at = datetime.now(timezone.utc)
- await db.commit()
- async def _get_providers_for_event(
- self,
- db: AsyncSession,
- event_field: str,
- printer_id: int | None = None,
- ) -> list[NotificationProvider]:
- """Get all enabled providers that want a specific event type."""
- # Build the query dynamically based on event field
- query = select(NotificationProvider).where(
- NotificationProvider.enabled.is_(True),
- getattr(NotificationProvider, event_field).is_(True),
- )
- if printer_id is not None:
- query = query.where(
- (NotificationProvider.printer_id.is_(None)) | (NotificationProvider.printer_id == printer_id)
- )
- result = await db.execute(query)
- return list(result.scalars().all())
- async def _log_notification(
- self,
- db: AsyncSession,
- provider_id: int,
- event_type: str,
- title: str,
- message: str,
- success: bool,
- error_message: str | None = None,
- printer_id: int | None = None,
- printer_name: str | None = None,
- ):
- """Create a log entry for a sent notification."""
- try:
- log = NotificationLog(
- provider_id=provider_id,
- event_type=event_type,
- title=title,
- message=message,
- success=success,
- error_message=error_message,
- printer_id=printer_id,
- printer_name=printer_name,
- )
- db.add(log)
- await db.commit()
- except Exception as e:
- logger.warning("Failed to log notification: %s", e)
- # Don't fail the notification just because logging failed
- async def _send_to_providers(
- self,
- providers: list[NotificationProvider],
- title: str,
- message: str,
- db: AsyncSession,
- event_type: str = "unknown",
- printer_id: int | None = None,
- printer_name: str | None = None,
- force_immediate: bool = False,
- image_data: bytes | None = None,
- variables: dict | None = None,
- ):
- """Send notification to multiple providers and log the results.
- All notifications are always sent immediately. If digest mode is enabled,
- the notification is ALSO queued for the daily digest summary.
- """
- for provider in providers:
- try:
- # Always send notification immediately
- success, error = await self._send_to_provider(
- provider, title, message, db, image_data=image_data, event_type=event_type, variables=variables
- )
- # Also queue for digest if enabled (digest is a summary, not a queue)
- if provider.daily_digest_enabled and provider.daily_digest_time:
- await self._queue_for_digest(
- provider=provider,
- event_type=event_type,
- title=title,
- message=message,
- db=db,
- printer_id=printer_id,
- printer_name=printer_name,
- )
- await self._update_provider_status(db, provider.id, success, error if not success else None)
- await self._log_notification(
- db=db,
- provider_id=provider.id,
- event_type=event_type,
- title=title,
- message=message,
- success=success,
- error_message=error if not success else None,
- printer_id=printer_id,
- printer_name=printer_name,
- )
- if success:
- logger.info("Sent notification via %s", provider.name)
- else:
- logger.warning("Failed to send notification via %s: %s", provider.name, error)
- except Exception as e:
- logger.exception("Error sending notification via %s", provider.name)
- await self._update_provider_status(db, provider.id, False, str(e))
- await self._log_notification(
- db=db,
- provider_id=provider.id,
- event_type=event_type,
- title=title,
- message=message,
- success=False,
- error_message=str(e),
- printer_id=printer_id,
- printer_name=printer_name,
- )
- async def on_print_start(
- self,
- printer_id: int,
- printer_name: str,
- data: dict,
- db: AsyncSession,
- archive_data: dict | None = None,
- ):
- """Handle print start event - send notifications to relevant providers.
- Args:
- printer_id: The printer ID
- printer_name: The printer name
- data: MQTT event data with filename, subtask_name, remaining_time, raw_data
- db: Database session
- archive_data: Optional archive data with print_time_seconds from 3MF parsing
- """
- logger.info("on_print_start called for printer %s (%s)", printer_id, printer_name)
- providers = await self._get_providers_for_event(db, "on_print_start", printer_id)
- if not providers:
- logger.info("No notification providers configured for print_start event on printer %s", printer_id)
- return
- # Use subtask_name (project name) if available, otherwise use filename
- subtask_name = data.get("subtask_name")
- if subtask_name:
- # Replace underscores with spaces for readability
- filename = subtask_name.replace("_", " ")
- else:
- filename = self._clean_filename(data.get("filename", "Unknown"))
- # Priority for estimated_time:
- # 1. Archive's print_time_seconds from 3MF parsing (most reliable)
- # 2. MQTT remaining_time (may be 0 at print start)
- # 3. raw_data mc_remaining_time
- estimated_time = None
- # Try archive data first (from 3MF parsing - most reliable)
- if archive_data and archive_data.get("print_time_seconds"):
- estimated_time = archive_data["print_time_seconds"]
- logger.debug("Using print_time_seconds from archive: %s", estimated_time)
- # Fall back to MQTT remaining_time
- if estimated_time is None:
- estimated_time = data.get("remaining_time")
- if estimated_time:
- logger.debug("Using remaining_time from MQTT: %s", estimated_time)
- # Last resort: raw_data mc_remaining_time (in minutes, convert to seconds)
- if estimated_time is None:
- raw_time = data.get("raw_data", {}).get("mc_remaining_time")
- if raw_time:
- estimated_time = raw_time * 60
- logger.debug("Using mc_remaining_time from raw_data: %s", estimated_time)
- time_str = self._format_duration(estimated_time)
- eta_str = await self._format_eta(estimated_time, db)
- variables = {
- "printer": printer_name,
- "filename": filename,
- "estimated_time": time_str,
- "eta": eta_str,
- }
- # Extract image data for providers that support attachments (e.g. Pushover)
- image_data = None
- if archive_data:
- image_data = archive_data.get("image_data")
- logger.info("Found %s providers for print_start: %s", len(providers), [p.name for p in providers])
- title, message = await self._build_message_from_template(db, "print_start", variables)
- await self._send_to_providers(
- providers,
- title,
- message,
- db,
- "print_start",
- printer_id,
- printer_name,
- image_data=image_data,
- variables=variables,
- )
- async def on_print_complete(
- self,
- printer_id: int,
- printer_name: str,
- status: str,
- data: dict,
- db: AsyncSession,
- archive_data: dict | None = None,
- ):
- """Handle print complete event - send notifications to relevant providers."""
- logger.info("on_print_complete called for printer %s (%s), status=%s", printer_id, printer_name, status)
- # Determine event type based on status
- if status == "completed":
- event_field = "on_print_complete"
- event_type = "print_complete"
- elif status in ("failed",):
- event_field = "on_print_failed"
- event_type = "print_failed"
- elif status in ("aborted", "stopped", "cancelled"):
- event_field = "on_print_stopped"
- event_type = "print_stopped"
- else:
- logger.warning("Unknown print status '%s', defaulting to on_print_complete", status)
- event_field = "on_print_complete"
- event_type = "print_complete"
- providers = await self._get_providers_for_event(db, event_field, printer_id)
- if not providers:
- logger.info("No notification providers configured for %s event on printer %s", event_field, printer_id)
- return
- # Use subtask_name (project name) if available, otherwise use filename
- subtask_name = data.get("subtask_name")
- if subtask_name:
- filename = subtask_name.replace("_", " ")
- else:
- filename = self._clean_filename(data.get("filename", "Unknown"))
- variables = {
- "printer": printer_name,
- "filename": filename,
- "duration": "Unknown",
- "filament_grams": "Unknown",
- "reason": "Unknown",
- }
- if archive_data:
- # {{duration}} on completion / failure / stopped events is the *actual*
- # elapsed time (#1198). Slicer-estimated print_time_seconds is only used
- # as a last-resort fallback when timestamps weren't recorded.
- duration_seconds = archive_data.get("actual_time_seconds") or archive_data.get("print_time_seconds")
- if duration_seconds:
- variables["duration"] = self._format_duration(duration_seconds)
- if archive_data.get("actual_filament_grams"):
- variables["filament_grams"] = f"{archive_data['actual_filament_grams']:.1f}"
- if status == "failed" and archive_data.get("failure_reason"):
- variables["reason"] = archive_data["failure_reason"]
- if archive_data.get("finish_photo_url"):
- variables["finish_photo_url"] = archive_data["finish_photo_url"]
- # Build per-slot breakdown string with AMS info when available
- if archive_data.get("usage_results"):
- parts = []
- for u in archive_data["usage_results"]:
- ams_id = u.get("ams_id", 0)
- tray_id = u.get("tray_id", 0)
- material = u.get("material", "Unknown") or "Unknown"
- used = u.get("weight_used", 0)
- if ams_id >= 128:
- slot_label = "Ext"
- else:
- slot_label = f"AMS-{chr(65 + ams_id)} T{tray_id + 1}"
- parts.append(f"{slot_label} {material}: {used:.1f}g")
- variables["filament_details"] = " | ".join(parts)
- elif archive_data.get("filament_slots"):
- parts = []
- for slot in archive_data["filament_slots"]:
- ftype = slot.get("type", "Unknown") or "Unknown"
- used = slot.get("used_g", 0)
- parts.append(f"{ftype}: {used:.1f}g")
- variables["filament_details"] = " | ".join(parts)
- # Add progress for partial prints
- if archive_data.get("progress") is not None:
- variables["progress"] = str(archive_data["progress"])
- # Extract image data for providers that support attachments (e.g. Pushover)
- image_data = None
- if archive_data:
- image_data = archive_data.get("image_data")
- logger.info("Found %s providers for %s: %s", len(providers), event_field, [p.name for p in providers])
- title, message = await self._build_message_from_template(db, event_type, variables)
- await self._send_to_providers(
- providers,
- title,
- message,
- db,
- event_type,
- printer_id,
- printer_name,
- image_data=image_data,
- variables=variables,
- )
- async def on_print_progress(
- self,
- printer_id: int,
- printer_name: str,
- filename: str,
- progress: int,
- db: AsyncSession,
- remaining_time: int | None = None,
- image_data: bytes | None = None,
- ):
- """Handle print progress milestone (25%, 50%, 75%)."""
- providers = await self._get_providers_for_event(db, "on_print_progress", printer_id)
- if not providers:
- return
- eta_str = await self._format_eta(remaining_time, db)
- variables = {
- "printer": printer_name,
- "filename": self._clean_filename(filename),
- "progress": str(progress),
- "remaining_time": self._format_duration(remaining_time) if remaining_time else "Unknown",
- "eta": eta_str,
- }
- title, message = await self._build_message_from_template(db, "print_progress", variables)
- await self._send_to_providers(
- providers,
- title,
- message,
- db,
- "print_progress",
- printer_id,
- printer_name,
- image_data=image_data,
- variables=variables,
- )
- async def on_print_missing_spool_assignment(
- self,
- printer_id: int,
- printer_name: str,
- missing_slots: list[dict[str, str]],
- db: AsyncSession,
- ):
- """Handle print-start event when required trays are missing spool assignments."""
- if not missing_slots:
- return
- providers = await self._get_providers_for_event(db, "on_print_missing_spool_assignment", printer_id)
- if not providers:
- return
- missing_slot_names = ", ".join(slot.get("slot", "Unknown") for slot in missing_slots)
- detail_lines = []
- for slot in missing_slots:
- slot_name = slot.get("slot", "Unknown")
- profile = slot.get("profile", "Unknown")
- detail_lines.append(f"- {slot_name}: {profile}")
- missing_profile_details = "\n".join(detail_lines)
- variables = {
- "printer": printer_name,
- "missing_slots": missing_slot_names,
- "missing_slot_details": missing_profile_details,
- }
- title, message = await self._build_message_from_template(db, "print_missing_spool_assignment", variables)
- await self._send_to_providers(
- providers,
- title,
- message,
- db,
- "print_missing_spool_assignment",
- printer_id,
- printer_name,
- force_immediate=True,
- variables=variables,
- )
- async def on_printer_offline(self, printer_id: int, printer_name: str, db: AsyncSession):
- """Handle printer offline event."""
- providers = await self._get_providers_for_event(db, "on_printer_offline", printer_id)
- if not providers:
- return
- variables = {"printer": printer_name}
- title, message = await self._build_message_from_template(db, "printer_offline", variables)
- await self._send_to_providers(
- providers, title, message, db, "printer_offline", printer_id, printer_name, variables=variables
- )
- async def on_printer_error(
- self,
- printer_id: int,
- printer_name: str,
- error_type: str,
- db: AsyncSession,
- error_detail: str | None = None,
- image_data: bytes | None = None,
- ):
- """Handle printer error event (AMS issues, etc.)."""
- providers = await self._get_providers_for_event(db, "on_printer_error", printer_id)
- if not providers:
- return
- variables = {
- "printer": printer_name,
- "error_type": error_type,
- "error_detail": error_detail or "No details available",
- }
- title, message = await self._build_message_from_template(db, "printer_error", variables)
- await self._send_to_providers(
- providers,
- title,
- message,
- db,
- "printer_error",
- printer_id,
- printer_name,
- image_data=image_data,
- variables=variables,
- )
- async def on_plate_not_empty(
- self,
- printer_id: int,
- printer_name: str,
- db: AsyncSession,
- difference_percent: float | None = None,
- ):
- """Handle plate not empty event - objects detected on build plate before print."""
- providers = await self._get_providers_for_event(db, "on_plate_not_empty", printer_id)
- if not providers:
- return
- variables = {
- "printer": printer_name,
- "difference_percent": f"{difference_percent:.1f}" if difference_percent else "N/A",
- }
- title, message = await self._build_message_from_template(db, "plate_not_empty", variables)
- await self._send_to_providers(
- providers,
- title,
- message,
- db,
- "plate_not_empty",
- printer_id,
- printer_name,
- force_immediate=True,
- variables=variables,
- )
- async def on_filament_low(
- self,
- printer_id: int,
- printer_name: str,
- slot: int,
- remaining_percent: int,
- db: AsyncSession,
- color: str | None = None,
- ):
- """Handle low filament event."""
- providers = await self._get_providers_for_event(db, "on_filament_low", printer_id)
- if not providers:
- return
- variables = {
- "printer": printer_name,
- "slot": str(slot),
- "remaining_percent": str(remaining_percent),
- "color": color or "",
- }
- title, message = await self._build_message_from_template(db, "filament_low", variables)
- await self._send_to_providers(
- providers, title, message, db, "filament_low", printer_id, printer_name, variables=variables
- )
- async def on_maintenance_due(
- self,
- printer_id: int,
- printer_name: str,
- maintenance_items: list[dict],
- db: AsyncSession,
- ):
- """Handle maintenance due event - sends notification when maintenance is due or warning."""
- if not maintenance_items:
- return
- providers = await self._get_providers_for_event(db, "on_maintenance_due", printer_id)
- if not providers:
- logger.info("No notification providers configured for maintenance_due event on printer %s", printer_id)
- return
- # Format maintenance items list
- items_list = []
- for item in maintenance_items:
- status = "OVERDUE" if item.get("is_due") else "Soon"
- items_list.append(f"- {item['name']} ({status})")
- items_str = "\n".join(items_list)
- variables = {
- "printer": printer_name,
- "items": items_str,
- }
- logger.info("Found %s providers for maintenance_due: %s", len(providers), [p.name for p in providers])
- title, message = await self._build_message_from_template(db, "maintenance_due", variables)
- await self._send_to_providers(
- providers, title, message, db, "maintenance_due", printer_id, printer_name, variables=variables
- )
- async def on_ams_humidity_high(
- self,
- printer_id: int,
- printer_name: str,
- ams_label: str,
- humidity: float,
- threshold: float,
- db: AsyncSession,
- ):
- """Handle AMS high humidity alarm event. Always sends immediately (bypasses digest)."""
- providers = await self._get_providers_for_event(db, "on_ams_humidity_high", printer_id)
- if not providers:
- return
- variables = {
- "printer": printer_name,
- "ams_label": ams_label,
- "humidity": f"{humidity:.0f}",
- "threshold": f"{threshold:.0f}",
- }
- title, message = await self._build_message_from_template(db, "ams_humidity_high", variables)
- # Alarms always send immediately, bypassing digest mode
- await self._send_to_providers(
- providers,
- title,
- message,
- db,
- "ams_humidity_high",
- printer_id,
- printer_name,
- force_immediate=True,
- variables=variables,
- )
- async def on_ams_temperature_high(
- self,
- printer_id: int,
- printer_name: str,
- ams_label: str,
- temperature: float,
- threshold: float,
- db: AsyncSession,
- ):
- """Handle AMS high temperature alarm event. Always sends immediately (bypasses digest)."""
- providers = await self._get_providers_for_event(db, "on_ams_temperature_high", printer_id)
- if not providers:
- return
- variables = {
- "printer": printer_name,
- "ams_label": ams_label,
- "temperature": f"{temperature:.1f}",
- "threshold": f"{threshold:.1f}",
- }
- title, message = await self._build_message_from_template(db, "ams_temperature_high", variables)
- # Alarms always send immediately, bypassing digest mode
- await self._send_to_providers(
- providers,
- title,
- message,
- db,
- "ams_temperature_high",
- printer_id,
- printer_name,
- force_immediate=True,
- variables=variables,
- )
- async def on_ams_ht_humidity_high(
- self,
- printer_id: int,
- printer_name: str,
- ams_label: str,
- humidity: float,
- threshold: float,
- db: AsyncSession,
- ):
- """Handle AMS-HT high humidity alarm event. Always sends immediately (bypasses digest)."""
- providers = await self._get_providers_for_event(db, "on_ams_ht_humidity_high", printer_id)
- if not providers:
- return
- variables = {
- "printer": printer_name,
- "ams_label": ams_label,
- "humidity": f"{humidity:.0f}",
- "threshold": f"{threshold:.0f}",
- }
- # Use the same template as regular AMS (can create separate templates later if needed)
- title, message = await self._build_message_from_template(db, "ams_humidity_high", variables)
- # Alarms always send immediately, bypassing digest mode
- await self._send_to_providers(
- providers,
- title,
- message,
- db,
- "ams_ht_humidity_high",
- printer_id,
- printer_name,
- force_immediate=True,
- variables=variables,
- )
- async def on_ams_ht_temperature_high(
- self,
- printer_id: int,
- printer_name: str,
- ams_label: str,
- temperature: float,
- threshold: float,
- db: AsyncSession,
- ):
- """Handle AMS-HT high temperature alarm event. Always sends immediately (bypasses digest)."""
- providers = await self._get_providers_for_event(db, "on_ams_ht_temperature_high", printer_id)
- if not providers:
- return
- variables = {
- "printer": printer_name,
- "ams_label": ams_label,
- "temperature": f"{temperature:.1f}",
- "threshold": f"{threshold:.1f}",
- }
- # Use the same template as regular AMS (can create separate templates later if needed)
- title, message = await self._build_message_from_template(db, "ams_temperature_high", variables)
- # Alarms always send immediately, bypassing digest mode
- await self._send_to_providers(
- providers,
- title,
- message,
- db,
- "ams_ht_temperature_high",
- printer_id,
- printer_name,
- force_immediate=True,
- variables=variables,
- )
- async def on_bed_cooled(
- self,
- printer_id: int,
- printer_name: str,
- bed_temp: float,
- threshold: float,
- filename: str,
- db: AsyncSession,
- ):
- """Handle bed cooled event - bed temperature dropped below threshold after print."""
- providers = await self._get_providers_for_event(db, "on_bed_cooled", printer_id)
- if not providers:
- return
- variables = {
- "printer": printer_name,
- "bed_temp": f"{bed_temp:.0f}",
- "threshold": f"{threshold:.0f}",
- "filename": self._clean_filename(filename) if filename else "Unknown",
- }
- title, message = await self._build_message_from_template(db, "bed_cooled", variables)
- await self._send_to_providers(
- providers, title, message, db, "bed_cooled", printer_id, printer_name, variables=variables
- )
- async def on_first_layer_complete(
- self,
- printer_id: int,
- printer_name: str,
- filename: str,
- total_layers: int,
- db: AsyncSession,
- image_data: bytes | None = None,
- ):
- """Handle first layer complete event."""
- providers = await self._get_providers_for_event(db, "on_first_layer_complete", printer_id)
- if not providers:
- return
- variables = {
- "printer": printer_name,
- "filename": self._clean_filename(filename),
- "total_layers": str(total_layers),
- }
- title, message = await self._build_message_from_template(db, "first_layer_complete", variables)
- await self._send_to_providers(
- providers,
- title,
- message,
- db,
- "first_layer_complete",
- printer_id,
- printer_name,
- image_data=image_data,
- variables=variables,
- )
- def clear_template_cache(self):
- """Clear the template cache. Call this when templates are updated."""
- self._template_cache.clear()
- async def send_user_print_email(
- self,
- event_type: str,
- created_by_id: int | None,
- printer_name: str,
- filename: str,
- db: AsyncSession,
- ) -> None:
- """Send a print event email notification to the user who submitted the job.
- Args:
- event_type: 'user_print_start', 'user_print_complete', 'user_print_failed', or 'user_print_stopped'
- created_by_id: User ID who submitted the print job (from archive)
- printer_name: Name of the printer
- filename: Raw filename or subtask name
- db: Database session
- """
- if created_by_id is None:
- logger.debug("[EMAIL] Skipping user print email (%s): no created_by_id", event_type)
- return
- try:
- # Check if advanced auth is enabled - required for user email notifications
- from backend.app.models.settings import Settings
- result = await db.execute(select(Settings).where(Settings.key == "advanced_auth_enabled"))
- setting = result.scalar_one_or_none()
- if not setting or setting.value.lower() != "true":
- logger.debug("[EMAIL] Skipping user print email (%s): advanced_auth not enabled", event_type)
- return
- # Check if user notifications are enabled (admin-controlled toggle)
- notif_enabled_result = await db.execute(
- select(Settings).where(Settings.key == "user_notifications_enabled")
- )
- notif_enabled_setting = notif_enabled_result.scalar_one_or_none()
- if notif_enabled_setting and notif_enabled_setting.value.lower() == "false":
- logger.debug("[EMAIL] Skipping user print email (%s): user_notifications_enabled is false", event_type)
- return
- # Check SMTP settings are configured - required for sending emails
- from backend.app.services.email_service import get_smtp_settings, send_user_print_notification
- smtp_settings = await get_smtp_settings(db)
- if not smtp_settings:
- logger.debug("[EMAIL] Skipping user print email (%s): SMTP settings not configured", event_type)
- return
- # Load user preferences
- from backend.app.models.user import User
- from backend.app.models.user_email_pref import UserEmailPreference
- user_result = await db.execute(select(User).where(User.id == created_by_id))
- user = user_result.scalar_one_or_none()
- if user is None or not user.email:
- logger.debug(
- "[EMAIL] Skipping user print email (%s): user %s not found or has no email address",
- event_type,
- created_by_id,
- )
- return
- # Load user's notification preferences
- pref_result = await db.execute(
- select(UserEmailPreference).where(UserEmailPreference.user_id == created_by_id)
- )
- pref = pref_result.scalar_one_or_none()
- # Determine if this event type should be sent
- should_send = False
- if event_type == "user_print_start":
- should_send = pref is None or pref.notify_print_start
- elif event_type == "user_print_complete":
- should_send = pref is None or pref.notify_print_complete
- elif event_type == "user_print_failed":
- should_send = pref is None or pref.notify_print_failed
- elif event_type == "user_print_stopped":
- should_send = pref is None or pref.notify_print_stopped
- if not should_send:
- logger.debug(
- "[EMAIL] Skipping user print email (%s): user %s has notifications disabled for this event",
- event_type,
- created_by_id,
- )
- return
- logger.info(
- "[EMAIL] Sending user print email: event=%s, user=%s (%s), printer=%s, file=%s",
- event_type,
- user.username,
- user.email,
- printer_name,
- filename,
- )
- # Build variables
- variables = {
- "printer": printer_name,
- "filename": self._clean_filename(filename),
- }
- # Send the email
- await send_user_print_notification(
- db=db,
- event_type=event_type,
- user_email=user.email,
- username=user.username,
- variables=variables,
- )
- logger.info("[EMAIL] User print email sent: event=%s → %s", event_type, user.email)
- except Exception as e:
- logger.warning("Failed to send user print email notification: %s", e, exc_info=True)
- # ==================== Queue Notifications ====================
- async def on_queue_job_added(
- self,
- job_name: str,
- target: str,
- db: AsyncSession,
- printer_id: int | None = None,
- printer_name: str | None = None,
- ):
- """Handle queue job added event."""
- providers = await self._get_providers_for_event(db, "on_queue_job_added", printer_id)
- if not providers:
- return
- variables = {
- "job_name": job_name,
- "target": target, # e.g., "Printer1" or "Any X1C"
- "printer": printer_name or target,
- }
- title, message = await self._build_message_from_template(db, "queue_job_added", variables)
- await self._send_to_providers(
- providers, title, message, db, "queue_job_added", printer_id, printer_name, variables=variables
- )
- async def on_queue_job_assigned(
- self,
- job_name: str,
- printer_id: int,
- printer_name: str,
- target_model: str,
- db: AsyncSession,
- ):
- """Handle model-based job assigned to printer event."""
- providers = await self._get_providers_for_event(db, "on_queue_job_assigned", printer_id)
- if not providers:
- return
- variables = {
- "job_name": job_name,
- "printer": printer_name,
- "target_model": target_model,
- }
- title, message = await self._build_message_from_template(db, "queue_job_assigned", variables)
- await self._send_to_providers(
- providers, title, message, db, "queue_job_assigned", printer_id, printer_name, variables=variables
- )
- async def on_queue_job_started(
- self,
- job_name: str,
- printer_id: int,
- printer_name: str,
- db: AsyncSession,
- estimated_time: int | None = None,
- ):
- """Handle queue job started printing event."""
- providers = await self._get_providers_for_event(db, "on_queue_job_started", printer_id)
- if not providers:
- return
- eta_str = await self._format_eta(estimated_time, db)
- variables = {
- "job_name": job_name,
- "printer": printer_name,
- "estimated_time": self._format_duration(estimated_time),
- "eta": eta_str,
- }
- title, message = await self._build_message_from_template(db, "queue_job_started", variables)
- await self._send_to_providers(
- providers, title, message, db, "queue_job_started", printer_id, printer_name, variables=variables
- )
- async def on_queue_job_waiting(
- self,
- job_name: str,
- target_model: str,
- waiting_reason: str,
- db: AsyncSession,
- ):
- """Handle job waiting for filament event."""
- providers = await self._get_providers_for_event(db, "on_queue_job_waiting", None)
- if not providers:
- return
- variables = {
- "job_name": job_name,
- "target_model": target_model,
- "waiting_reason": waiting_reason,
- }
- title, message = await self._build_message_from_template(db, "queue_job_waiting", variables)
- await self._send_to_providers(providers, title, message, db, "queue_job_waiting", variables=variables)
- async def on_queue_job_skipped(
- self,
- job_name: str,
- printer_id: int,
- printer_name: str,
- reason: str,
- db: AsyncSession,
- ):
- """Handle job skipped event (e.g., previous print failed)."""
- providers = await self._get_providers_for_event(db, "on_queue_job_skipped", printer_id)
- if not providers:
- return
- variables = {
- "job_name": job_name,
- "printer": printer_name,
- "reason": reason,
- }
- title, message = await self._build_message_from_template(db, "queue_job_skipped", variables)
- await self._send_to_providers(
- providers, title, message, db, "queue_job_skipped", printer_id, printer_name, variables=variables
- )
- async def on_queue_job_failed(
- self,
- job_name: str,
- printer_id: int | None,
- printer_name: str | None,
- reason: str,
- db: AsyncSession,
- ):
- """Handle job failed to start event (upload error, etc.)."""
- providers = await self._get_providers_for_event(db, "on_queue_job_failed", printer_id)
- if not providers:
- return
- variables = {
- "job_name": job_name,
- "printer": printer_name or "Unknown",
- "reason": reason,
- }
- title, message = await self._build_message_from_template(db, "queue_job_failed", variables)
- await self._send_to_providers(
- providers, title, message, db, "queue_job_failed", printer_id, printer_name, variables=variables
- )
- async def on_queue_completed(
- self,
- completed_count: int,
- db: AsyncSession,
- ):
- """Handle all queue jobs completed event."""
- providers = await self._get_providers_for_event(db, "on_queue_completed", None)
- if not providers:
- return
- variables = {
- "completed_count": str(completed_count),
- }
- title, message = await self._build_message_from_template(db, "queue_completed", variables)
- await self._send_to_providers(providers, title, message, db, "queue_completed", variables=variables)
- async def _queue_for_digest(
- self,
- provider: NotificationProvider,
- event_type: str,
- title: str,
- message: str,
- db: AsyncSession,
- printer_id: int | None = None,
- printer_name: str | None = None,
- ):
- """Queue a notification for later delivery in the daily digest."""
- try:
- queue_entry = NotificationDigestQueue(
- provider_id=provider.id,
- event_type=event_type,
- title=title,
- message=message,
- printer_id=printer_id,
- printer_name=printer_name,
- )
- db.add(queue_entry)
- await db.commit()
- logger.info("Queued notification for digest: %s for provider %s", event_type, provider.name)
- except Exception as e:
- logger.warning("Failed to queue notification for digest: %s", e)
- async def send_digest(self, provider_id: int):
- """Send all queued notifications as a single digest for a provider."""
- from backend.app.core.database import async_session
- async with async_session() as db:
- # Get the provider
- result = await db.execute(select(NotificationProvider).where(NotificationProvider.id == provider_id))
- provider = result.scalar_one_or_none()
- if not provider or not provider.enabled:
- return
- # Get all queued notifications for this provider
- result = await db.execute(
- select(NotificationDigestQueue)
- .where(NotificationDigestQueue.provider_id == provider_id)
- .order_by(NotificationDigestQueue.created_at)
- )
- queue_entries = list(result.scalars().all())
- if not queue_entries:
- logger.debug("No queued notifications for provider %s", provider.name)
- return
- # Build digest message
- title = f"Daily Digest - {len(queue_entries)} Events"
- # Group by event type
- events_by_type: dict[str, list] = {}
- for entry in queue_entries:
- if entry.event_type not in events_by_type:
- events_by_type[entry.event_type] = []
- events_by_type[entry.event_type].append(entry)
- # Format the digest body
- body_parts = []
- for event_type, entries in events_by_type.items():
- event_label = event_type.replace("_", " ").title()
- body_parts.append(f"== {event_label} ({len(entries)}) ==")
- for entry in entries:
- time_str = entry.created_at.strftime("%H:%M")
- printer_info = f"[{entry.printer_name}] " if entry.printer_name else ""
- body_parts.append(f" {time_str} {printer_info}{entry.title}")
- body_parts.append("")
- body = "\n".join(body_parts)
- # Send the digest
- success, error = await self._send_to_provider(provider, title, body, db)
- # Log the digest
- await self._log_notification(
- db=db,
- provider_id=provider.id,
- event_type="daily_digest",
- title=title,
- message=body,
- success=success,
- error_message=error if not success else None,
- )
- # Clear the queue
- for entry in queue_entries:
- await db.delete(entry)
- await db.commit()
- if success:
- logger.info("Sent daily digest with %s events to %s", len(queue_entries), provider.name)
- else:
- logger.warning("Failed to send daily digest to %s: %s", provider.name, error)
- async def check_and_send_digests(self):
- """Check all providers and send digests if it's their scheduled time."""
- from backend.app.core.database import async_session
- current_time = datetime.now().strftime("%H:%M")
- # Avoid duplicate checks within the same minute
- if current_time == self._last_digest_check:
- return
- self._last_digest_check = current_time
- async with async_session() as db:
- # Find all providers with digest enabled at this time
- result = await db.execute(
- select(NotificationProvider).where(
- NotificationProvider.enabled.is_(True),
- NotificationProvider.daily_digest_enabled.is_(True),
- NotificationProvider.daily_digest_time == current_time,
- )
- )
- providers = result.scalars().all()
- for provider in providers:
- try:
- await self.send_digest(provider.id)
- except Exception as e:
- logger.error("Error sending digest for provider %s: %s", provider.id, e)
- def start_digest_scheduler(self):
- """Start the background scheduler for daily digest notifications."""
- if self._digest_scheduler_task is None:
- self._digest_scheduler_task = asyncio.create_task(self._digest_scheduler_loop())
- logger.info("Notification digest scheduler started")
- def stop_digest_scheduler(self):
- """Stop the background scheduler for daily digests."""
- if self._digest_scheduler_task:
- self._digest_scheduler_task.cancel()
- self._digest_scheduler_task = None
- logger.info("Notification digest scheduler stopped")
- async def _digest_scheduler_loop(self):
- """Background loop that checks for scheduled digests every minute."""
- while True:
- try:
- await self.check_and_send_digests()
- except Exception as e:
- logger.error("Error in digest scheduler: %s", e)
- # Wait until the next minute
- await asyncio.sleep(60)
- # Global instance
- notification_service = NotificationService()
|