smart_plug_manager.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610
  1. """Manager for smart plug automation and delayed turn-off."""
  2. import asyncio
  3. import logging
  4. from datetime import datetime, timezone
  5. from typing import TYPE_CHECKING
  6. from sqlalchemy import select
  7. from sqlalchemy.ext.asyncio import AsyncSession
  8. from backend.app.services.homeassistant import homeassistant_service
  9. from backend.app.services.printer_manager import printer_manager
  10. from backend.app.services.rest_smart_plug import rest_smart_plug_service
  11. from backend.app.services.tasmota import tasmota_service
  12. if TYPE_CHECKING:
  13. from backend.app.models.smart_plug import SmartPlug
  14. logger = logging.getLogger(__name__)
  15. class SmartPlugManager:
  16. """Manages smart plug automation and delayed turn-off."""
  17. def __init__(self):
  18. self._pending_off: dict[int, asyncio.Task] = {} # plug_id -> task
  19. self._loop: asyncio.AbstractEventLoop | None = None
  20. self._scheduler_task: asyncio.Task | None = None
  21. self._snapshot_task: asyncio.Task | None = None
  22. self._last_schedule_check: dict[int, str] = {} # plug_id -> "HH:MM" last executed
  23. async def get_service_for_plug(self, plug: "SmartPlug", db: AsyncSession | None = None):
  24. """Get the appropriate service for the plug type.
  25. For HA plugs, configures the service with current settings from DB.
  26. """
  27. if plug.plug_type == "homeassistant":
  28. # Configure HA service with current settings
  29. await self._configure_ha_service(db)
  30. return homeassistant_service
  31. if plug.plug_type == "rest":
  32. return rest_smart_plug_service
  33. return tasmota_service
  34. async def _configure_ha_service(self, db: AsyncSession | None = None):
  35. """Configure the HA service with URL and token from settings."""
  36. from backend.app.api.routes.settings import get_homeassistant_settings
  37. try:
  38. if db:
  39. # Use provided session
  40. ha_settings = await get_homeassistant_settings(db)
  41. else:
  42. # Create new session
  43. from backend.app.core.database import async_session
  44. async with async_session() as session:
  45. ha_settings = await get_homeassistant_settings(session)
  46. homeassistant_service.configure(ha_settings["ha_url"], ha_settings["ha_token"])
  47. except Exception as e:
  48. logger.warning("Failed to configure HA service: %s", e)
  49. def set_event_loop(self, loop: asyncio.AbstractEventLoop):
  50. """Set the event loop for async operations."""
  51. self._loop = loop
  52. def start_scheduler(self):
  53. """Start the background scheduler for time-based plug control."""
  54. if self._scheduler_task is None:
  55. self._scheduler_task = asyncio.create_task(self._schedule_loop())
  56. logger.info("Smart plug scheduler started")
  57. if self._snapshot_task is None:
  58. self._snapshot_task = asyncio.create_task(self._snapshot_loop())
  59. logger.info("Smart plug energy snapshot loop started")
  60. def stop_scheduler(self):
  61. """Stop the background scheduler."""
  62. if self._scheduler_task:
  63. self._scheduler_task.cancel()
  64. self._scheduler_task = None
  65. logger.info("Smart plug scheduler stopped")
  66. if self._snapshot_task:
  67. self._snapshot_task.cancel()
  68. self._snapshot_task = None
  69. logger.info("Smart plug energy snapshot loop stopped")
  70. async def _schedule_loop(self):
  71. """Background loop that checks scheduled on/off times every minute."""
  72. while True:
  73. try:
  74. await self._check_schedules()
  75. except Exception as e:
  76. logger.error("Error in schedule check: %s", e)
  77. # Wait until the next minute
  78. await asyncio.sleep(60)
  79. async def _snapshot_loop(self):
  80. """Background loop that captures each plug's lifetime energy counter hourly.
  81. Powers date-range queries in "total consumption" energy mode (#941). Takes
  82. a snapshot shortly after startup so the first bucket isn't empty, then
  83. every hour.
  84. """
  85. # Short warm-up delay so other services finish booting; still gives us
  86. # an initial snapshot well before the first hour mark.
  87. await asyncio.sleep(30)
  88. while True:
  89. try:
  90. await self._capture_energy_snapshots()
  91. except Exception as e:
  92. logger.error("Error in energy snapshot capture: %s", e)
  93. await asyncio.sleep(3600) # 1 hour
  94. async def _capture_energy_snapshots(self):
  95. """Capture one energy snapshot row per plug with a usable lifetime counter."""
  96. from datetime import timezone
  97. from backend.app.core.database import async_session
  98. from backend.app.models.smart_plug import SmartPlug
  99. from backend.app.models.smart_plug_energy_snapshot import SmartPlugEnergySnapshot
  100. async with async_session() as db:
  101. plugs_result = await db.execute(select(SmartPlug).where(SmartPlug.enabled.is_(True)))
  102. plugs = list(plugs_result.scalars().all())
  103. if not plugs:
  104. return
  105. now = datetime.now(timezone.utc)
  106. captured = 0
  107. for plug in plugs:
  108. # MQTT plugs only publish a "today" counter that resets at midnight —
  109. # they can never feed cumulative snapshots, so skip them outright to
  110. # avoid a noisy tasmota-service fallback attempt on an IP-less plug.
  111. if plug.plug_type == "mqtt":
  112. continue
  113. try:
  114. service = await self.get_service_for_plug(plug, db)
  115. energy = await service.get_energy(plug)
  116. except Exception as e:
  117. logger.debug("Snapshot: failed to read energy from plug %s: %s", plug.id, e)
  118. continue
  119. if not energy:
  120. continue
  121. lifetime = energy.get("total")
  122. if lifetime is None:
  123. # MQTT / REST plugs that only expose "today" can't be used for
  124. # cumulative snapshots — skip them.
  125. continue
  126. db.add(
  127. SmartPlugEnergySnapshot(
  128. plug_id=plug.id,
  129. recorded_at=now,
  130. lifetime_kwh=float(lifetime),
  131. )
  132. )
  133. captured += 1
  134. if captured:
  135. await db.commit()
  136. logger.info("Captured %d energy snapshot(s)", captured)
  137. async def _check_schedules(self):
  138. """Check all plugs for scheduled on/off times."""
  139. from backend.app.core.database import async_session
  140. from backend.app.models.smart_plug import SmartPlug
  141. current_time = datetime.now().strftime("%H:%M")
  142. async with async_session() as db:
  143. result = await db.execute(
  144. select(SmartPlug).where(
  145. SmartPlug.enabled.is_(True),
  146. SmartPlug.schedule_enabled.is_(True),
  147. )
  148. )
  149. plugs = result.scalars().all()
  150. for plug in plugs:
  151. service = await self.get_service_for_plug(plug, db)
  152. # Check if we should turn on
  153. if plug.schedule_on_time == current_time:
  154. last_check = self._last_schedule_check.get(plug.id)
  155. if last_check != f"on:{current_time}":
  156. logger.info("Schedule: Turning on plug '%s' at %s", plug.name, current_time)
  157. success = await service.turn_on(plug)
  158. if success:
  159. plug.last_state = "ON"
  160. plug.last_checked = datetime.now(timezone.utc)
  161. self._last_schedule_check[plug.id] = f"on:{current_time}"
  162. # Check if we should turn off
  163. if plug.schedule_off_time == current_time:
  164. last_check = self._last_schedule_check.get(plug.id)
  165. if last_check != f"off:{current_time}":
  166. logger.info("Schedule: Turning off plug '%s' at %s", plug.name, current_time)
  167. success = await service.turn_off(plug)
  168. if success:
  169. plug.last_state = "OFF"
  170. plug.last_checked = datetime.now(timezone.utc)
  171. self._last_schedule_check[plug.id] = f"off:{current_time}"
  172. # Mark printer offline if linked
  173. if plug.printer_id:
  174. printer_manager.mark_printer_offline(plug.printer_id)
  175. await db.commit()
  176. async def _get_plugs_for_printer(self, printer_id: int, db: AsyncSession) -> list["SmartPlug"]:
  177. """Get all smart plugs linked to a printer for automation control."""
  178. from backend.app.models.smart_plug import SmartPlug
  179. result = await db.execute(select(SmartPlug).where(SmartPlug.printer_id == printer_id))
  180. return list(result.scalars().all())
  181. async def on_print_start(self, printer_id: int, db: AsyncSession):
  182. """Called when a print starts - turn on all plugs linked to this printer."""
  183. plugs = await self._get_plugs_for_printer(printer_id, db)
  184. if not plugs:
  185. return
  186. for plug in plugs:
  187. if not plug.enabled:
  188. logger.debug("Smart plug '%s' is disabled, skipping auto-on", plug.name)
  189. continue
  190. if not plug.auto_on:
  191. logger.debug("Smart plug '%s' auto_on is disabled", plug.name)
  192. continue
  193. # Cancel any pending off task
  194. self._cancel_pending_off(plug.id)
  195. # Turn on the plug
  196. logger.info("Print started on printer %s, turning on plug '%s'", printer_id, plug.name)
  197. try:
  198. service = await self.get_service_for_plug(plug, db)
  199. success = await service.turn_on(plug)
  200. if success:
  201. plug.last_state = "ON"
  202. plug.last_checked = datetime.now(timezone.utc)
  203. plug.auto_off_executed = False # Reset flag when turning on
  204. except Exception as e:
  205. logger.warning("Failed to turn on plug '%s' for printer %s: %s", plug.name, printer_id, e)
  206. await db.commit()
  207. async def on_print_complete(self, printer_id: int, status: str, db: AsyncSession):
  208. """Called when a print completes - schedule turn off for all plugs linked to this printer.
  209. Only triggers auto-off on successful completion (status='completed').
  210. Failed prints keep the printer powered on for user investigation.
  211. """
  212. # Only auto-off on successful completion, not on failures
  213. if status != "completed":
  214. logger.info(
  215. "Print on printer %s ended with status '%s', skipping auto-off to allow investigation",
  216. printer_id,
  217. status,
  218. )
  219. return
  220. plugs = await self._get_plugs_for_printer(printer_id, db)
  221. if not plugs:
  222. return
  223. for plug in plugs:
  224. if not plug.enabled:
  225. logger.debug("Smart plug '%s' is disabled, skipping auto-off", plug.name)
  226. continue
  227. if not plug.auto_off:
  228. logger.debug("Smart plug '%s' auto_off is disabled", plug.name)
  229. continue
  230. # Skip auto-off for HA script entities (scripts can only be triggered, not turned off)
  231. if plug.plug_type == "homeassistant" and plug.ha_entity_id and plug.ha_entity_id.startswith("script."):
  232. logger.debug("Smart plug '%s' is a HA script entity, skipping auto-off", plug.name)
  233. continue
  234. logger.info(
  235. "Print completed successfully on printer %s, scheduling turn-off for plug '%s'",
  236. printer_id,
  237. plug.name,
  238. )
  239. if plug.off_delay_mode == "time":
  240. self._schedule_delayed_off(plug, printer_id, plug.off_delay_minutes * 60)
  241. elif plug.off_delay_mode == "temperature":
  242. self._schedule_temp_based_off(plug, printer_id, plug.off_temp_threshold)
  243. def _schedule_delayed_off(self, plug: "SmartPlug", printer_id: int, delay_seconds: int):
  244. """Schedule turn-off after delay."""
  245. # Cancel any existing task for this plug
  246. self._cancel_pending_off(plug.id)
  247. logger.info("Scheduling turn-off for plug '%s' in %s seconds", plug.name, delay_seconds)
  248. # Mark as pending in database (survives restarts)
  249. asyncio.create_task(self._mark_auto_off_pending(plug.id, True))
  250. task = asyncio.create_task(
  251. self._delayed_off(
  252. plug.id,
  253. plug.plug_type,
  254. plug.ip_address,
  255. plug.ha_entity_id,
  256. plug.username,
  257. plug.password,
  258. printer_id,
  259. delay_seconds,
  260. rest_off_url=plug.rest_off_url if plug.plug_type == "rest" else None,
  261. rest_off_body=plug.rest_off_body if plug.plug_type == "rest" else None,
  262. rest_method=plug.rest_method if plug.plug_type == "rest" else None,
  263. rest_headers=plug.rest_headers if plug.plug_type == "rest" else None,
  264. )
  265. )
  266. self._pending_off[plug.id] = task
  267. async def _delayed_off(
  268. self,
  269. plug_id: int,
  270. plug_type: str,
  271. ip_address: str | None,
  272. ha_entity_id: str | None,
  273. username: str | None,
  274. password: str | None,
  275. printer_id: int,
  276. delay_seconds: int,
  277. *,
  278. rest_off_url: str | None = None,
  279. rest_off_body: str | None = None,
  280. rest_method: str | None = None,
  281. rest_headers: str | None = None,
  282. ):
  283. """Wait and turn off."""
  284. try:
  285. await asyncio.sleep(delay_seconds)
  286. # Create a minimal plug-like object for the service
  287. class PlugInfo:
  288. def __init__(self):
  289. self.plug_type = plug_type
  290. self.ip_address = ip_address
  291. self.ha_entity_id = ha_entity_id
  292. self.username = username
  293. self.password = password
  294. self.name = f"plug_{plug_id}"
  295. # REST fields
  296. self.rest_off_url = rest_off_url
  297. self.rest_off_body = rest_off_body
  298. self.rest_method = rest_method
  299. self.rest_headers = rest_headers
  300. plug_info = PlugInfo()
  301. service = await self.get_service_for_plug(plug_info)
  302. success = await service.turn_off(plug_info)
  303. logger.info("Turned off plug %s after time delay", plug_id)
  304. # Mark auto_off_executed in database and update printer status
  305. if success:
  306. await self._mark_auto_off_executed(plug_id)
  307. # Mark the printer as offline immediately
  308. printer_manager.mark_printer_offline(printer_id)
  309. except asyncio.CancelledError:
  310. logger.debug("Delayed turn-off cancelled for plug %s", plug_id)
  311. finally:
  312. self._pending_off.pop(plug_id, None)
  313. def _schedule_temp_based_off(self, plug: "SmartPlug", printer_id: int, temp_threshold: int):
  314. """Monitor temperature and turn off when below threshold."""
  315. # Cancel any existing task for this plug
  316. self._cancel_pending_off(plug.id)
  317. logger.info("Scheduling temperature-based turn-off for plug '%s' (threshold: %s°C)", plug.name, temp_threshold)
  318. # Mark as pending in database (survives restarts)
  319. asyncio.create_task(self._mark_auto_off_pending(plug.id, True))
  320. task = asyncio.create_task(
  321. self._temp_based_off(
  322. plug.id,
  323. plug.plug_type,
  324. plug.ip_address,
  325. plug.ha_entity_id,
  326. plug.username,
  327. plug.password,
  328. printer_id,
  329. temp_threshold,
  330. rest_off_url=plug.rest_off_url if plug.plug_type == "rest" else None,
  331. rest_off_body=plug.rest_off_body if plug.plug_type == "rest" else None,
  332. rest_method=plug.rest_method if plug.plug_type == "rest" else None,
  333. rest_headers=plug.rest_headers if plug.plug_type == "rest" else None,
  334. )
  335. )
  336. self._pending_off[plug.id] = task
  337. async def _temp_based_off(
  338. self,
  339. plug_id: int,
  340. plug_type: str,
  341. ip_address: str | None,
  342. ha_entity_id: str | None,
  343. username: str | None,
  344. password: str | None,
  345. printer_id: int,
  346. temp_threshold: int,
  347. *,
  348. rest_off_url: str | None = None,
  349. rest_off_body: str | None = None,
  350. rest_method: str | None = None,
  351. rest_headers: str | None = None,
  352. ):
  353. """Poll temperature until below threshold, then turn off.
  354. For dual-extruder printers (H2 series), checks both nozzles.
  355. """
  356. try:
  357. check_interval = 10 # seconds
  358. max_wait = 3600 # 1 hour max
  359. elapsed = 0
  360. while elapsed < max_wait:
  361. status = printer_manager.get_status(printer_id)
  362. if status:
  363. temps = status.temperatures or {}
  364. nozzle_temp = temps.get("nozzle", 999)
  365. # Check second nozzle for dual-extruder printers (H2 series)
  366. nozzle_2_temp = temps.get("nozzle_2")
  367. # Get the maximum temperature across all nozzles
  368. max_nozzle_temp = nozzle_temp
  369. if nozzle_2_temp is not None:
  370. max_nozzle_temp = max(nozzle_temp, nozzle_2_temp)
  371. logger.info(
  372. f"Temp check plug {plug_id}: nozzle1={nozzle_temp}°C, "
  373. f"nozzle2={nozzle_2_temp}°C, max={max_nozzle_temp}°C, "
  374. f"threshold={temp_threshold}°C"
  375. )
  376. else:
  377. logger.info(
  378. "Temp check plug %s: nozzle=%s°C, threshold=%s°C", plug_id, nozzle_temp, temp_threshold
  379. )
  380. if max_nozzle_temp < temp_threshold:
  381. # All nozzles are below threshold, turn off
  382. class PlugInfo:
  383. def __init__(self):
  384. self.plug_type = plug_type
  385. self.ip_address = ip_address
  386. self.ha_entity_id = ha_entity_id
  387. self.username = username
  388. self.password = password
  389. self.name = f"plug_{plug_id}"
  390. # REST fields
  391. self.rest_off_url = rest_off_url
  392. self.rest_off_body = rest_off_body
  393. self.rest_method = rest_method
  394. self.rest_headers = rest_headers
  395. plug_info = PlugInfo()
  396. service = await self.get_service_for_plug(plug_info)
  397. success = await service.turn_off(plug_info)
  398. logger.info(
  399. f"Turned off plug {plug_id} after nozzle temp dropped to "
  400. f"{max_nozzle_temp}°C (threshold: {temp_threshold}°C)"
  401. )
  402. # Mark auto_off_executed in database and update printer status
  403. if success:
  404. await self._mark_auto_off_executed(plug_id)
  405. # Mark the printer as offline immediately
  406. printer_manager.mark_printer_offline(printer_id)
  407. break
  408. await asyncio.sleep(check_interval)
  409. elapsed += check_interval
  410. if elapsed >= max_wait:
  411. logger.warning("Temperature-based turn-off timed out for plug %s after %ss", plug_id, max_wait)
  412. except asyncio.CancelledError:
  413. logger.debug("Temperature-based turn-off cancelled for plug %s", plug_id)
  414. finally:
  415. self._pending_off.pop(plug_id, None)
  416. async def _mark_auto_off_pending(self, plug_id: int, pending: bool):
  417. """Mark a plug as having a pending auto-off (survives restarts)."""
  418. try:
  419. from backend.app.core.database import async_session
  420. from backend.app.models.smart_plug import SmartPlug
  421. async with async_session() as db:
  422. result = await db.execute(select(SmartPlug).where(SmartPlug.id == plug_id))
  423. plug = result.scalar_one_or_none()
  424. if plug:
  425. plug.auto_off_pending = pending
  426. plug.auto_off_pending_since = datetime.now(timezone.utc) if pending else None
  427. await db.commit()
  428. logger.debug("Marked plug %s auto_off_pending=%s", plug_id, pending)
  429. except Exception as e:
  430. logger.warning("Failed to update plug %s pending state: %s", plug_id, e)
  431. async def _mark_auto_off_executed(self, plug_id: int):
  432. """Disable auto-off after it was executed (one-shot behavior unless persistent)."""
  433. try:
  434. from backend.app.core.database import async_session
  435. from backend.app.models.smart_plug import SmartPlug
  436. async with async_session() as db:
  437. result = await db.execute(select(SmartPlug).where(SmartPlug.id == plug_id))
  438. plug = result.scalar_one_or_none()
  439. if plug:
  440. if not plug.auto_off_persistent:
  441. plug.auto_off = False # Disable auto-off (one-shot behavior)
  442. plug.auto_off_executed = False # Reset the flag
  443. plug.auto_off_pending = False # Clear pending state
  444. plug.auto_off_pending_since = None
  445. plug.last_state = "OFF"
  446. plug.last_checked = datetime.now(timezone.utc)
  447. await db.commit()
  448. if plug.auto_off_persistent:
  449. logger.info("Auto-off executed for plug %s (persistent, stays enabled)", plug_id)
  450. else:
  451. logger.info("Auto-off executed and disabled for plug %s", plug_id)
  452. except Exception as e:
  453. logger.warning("Failed to update plug %s after auto-off: %s", plug_id, e)
  454. def _cancel_pending_off(self, plug_id: int):
  455. """Cancel any pending off task for this plug."""
  456. if plug_id in self._pending_off:
  457. logger.debug("Cancelling pending turn-off for plug %s", plug_id)
  458. self._pending_off[plug_id].cancel()
  459. del self._pending_off[plug_id]
  460. # Clear pending state in database
  461. asyncio.create_task(self._mark_auto_off_pending(plug_id, False))
  462. def cancel_all_pending(self):
  463. """Cancel all pending turn-off tasks."""
  464. for plug_id in list(self._pending_off.keys()):
  465. self._cancel_pending_off(plug_id)
  466. async def resume_pending_auto_offs(self):
  467. """Resume any pending auto-offs that were interrupted by a restart.
  468. Called on startup to check for plugs that had auto-off pending but
  469. never completed (e.g., due to service restart).
  470. """
  471. try:
  472. from backend.app.core.database import async_session
  473. from backend.app.models.smart_plug import SmartPlug
  474. async with async_session() as db:
  475. # Find all plugs with pending auto-off
  476. result = await db.execute(
  477. select(SmartPlug).where(
  478. SmartPlug.auto_off_pending.is_(True),
  479. SmartPlug.printer_id.isnot(None),
  480. )
  481. )
  482. pending_plugs = result.scalars().all()
  483. for plug in pending_plugs:
  484. # Check how long it's been pending (timeout after 2 hours)
  485. if plug.auto_off_pending_since:
  486. pending_since = plug.auto_off_pending_since
  487. if pending_since.tzinfo is None:
  488. pending_since = pending_since.replace(tzinfo=timezone.utc)
  489. elapsed = (datetime.now(timezone.utc) - pending_since).total_seconds()
  490. if elapsed > 7200: # 2 hours
  491. logger.warning(
  492. f"Auto-off for plug '{plug.name}' was pending for {elapsed / 60:.0f} minutes, "
  493. f"clearing stale pending state"
  494. )
  495. plug.auto_off_pending = False
  496. plug.auto_off_pending_since = None
  497. await db.commit()
  498. continue
  499. logger.info("Resuming pending auto-off for plug '%s' (printer %s)", plug.name, plug.printer_id)
  500. # Resume the appropriate off mode
  501. if plug.off_delay_mode == "temperature":
  502. self._schedule_temp_based_off(plug, plug.printer_id, plug.off_temp_threshold)
  503. else:
  504. # For time mode, just turn off immediately since delay already passed
  505. logger.info("Time-based auto-off was pending, turning off plug '%s' now", plug.name)
  506. service = await self.get_service_for_plug(plug, db)
  507. success = await service.turn_off(plug)
  508. if success:
  509. await self._mark_auto_off_executed(plug.id)
  510. printer_manager.mark_printer_offline(plug.printer_id)
  511. if pending_plugs:
  512. logger.info("Resumed %s pending auto-off(s)", len(pending_plugs))
  513. except Exception as e:
  514. logger.warning("Failed to resume pending auto-offs: %s", e)
  515. # Global singleton
  516. smart_plug_manager = SmartPlugManager()