| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264 |
- """Timelapse video processing service using FFmpeg."""
- import asyncio
- import json
- import logging
- import tempfile
- from pathlib import Path
- from backend.app.services.camera import get_ffmpeg_path
- logger = logging.getLogger(__name__)
- class TimelapseProcessor:
- """Service for processing timelapse videos with FFmpeg."""
- def __init__(self, input_path: Path):
- self.input_path = input_path
- self.ffmpeg = get_ffmpeg_path()
- if not self.ffmpeg:
- raise RuntimeError("FFmpeg not found")
- # Derive ffprobe path from ffmpeg path
- self.ffprobe = self.ffmpeg.replace("ffmpeg", "ffprobe")
- async def get_info(self) -> dict:
- """Get video metadata using ffprobe."""
- cmd = [
- self.ffprobe,
- "-v",
- "quiet",
- "-print_format",
- "json",
- "-show_format",
- "-show_streams",
- str(self.input_path),
- ]
- process = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- stdout, stderr = await process.communicate()
- if process.returncode != 0:
- logger.error("ffprobe failed: %s", stderr.decode())
- raise RuntimeError(f"ffprobe failed: {stderr.decode()}")
- data = json.loads(stdout.decode())
- video_stream = next(
- (s for s in data.get("streams", []) if s.get("codec_type") == "video"),
- {},
- )
- audio_stream = next(
- (s for s in data.get("streams", []) if s.get("codec_type") == "audio"),
- None,
- )
- # Parse frame rate (can be "30/1" or "29.97")
- fps = 30.0
- r_frame_rate = video_stream.get("r_frame_rate", "30/1")
- try:
- if "/" in r_frame_rate:
- num, den = r_frame_rate.split("/")
- fps = float(num) / float(den)
- else:
- fps = float(r_frame_rate)
- except (ValueError, ZeroDivisionError):
- pass
- return {
- "duration": float(data.get("format", {}).get("duration", 0)),
- "width": video_stream.get("width", 0),
- "height": video_stream.get("height", 0),
- "fps": fps,
- "codec": video_stream.get("codec_name", "unknown"),
- "file_size": int(data.get("format", {}).get("size", 0)),
- "has_audio": audio_stream is not None,
- }
- async def generate_thumbnails(
- self,
- count: int = 10,
- width: int = 160,
- ) -> list[tuple[float, bytes]]:
- """Generate evenly-spaced thumbnail frames."""
- info = await self.get_info()
- duration = info["duration"]
- if duration <= 0:
- return []
- interval = duration / max(count, 1)
- thumbnails = []
- with tempfile.TemporaryDirectory() as tmpdir:
- for i in range(count):
- timestamp = i * interval
- output_path = Path(tmpdir) / f"thumb_{i:03d}.jpg"
- cmd = [
- self.ffmpeg,
- "-y",
- "-ss",
- str(timestamp),
- "-i",
- str(self.input_path),
- "-vframes",
- "1",
- "-vf",
- f"scale={width}:-1",
- "-q:v",
- "5",
- str(output_path),
- ]
- process = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- await process.communicate()
- if output_path.exists():
- thumbnails.append((timestamp, output_path.read_bytes()))
- return thumbnails
- async def process(
- self,
- output_path: Path,
- trim_start: float = 0,
- trim_end: float | None = None,
- speed: float = 1.0,
- audio_path: Path | None = None,
- audio_volume: float = 1.0,
- ) -> bool:
- """Process video with trim, speed, and optional audio overlay.
- Args:
- output_path: Where to save the processed video
- trim_start: Start time in seconds
- trim_end: End time in seconds (None = full duration)
- speed: Speed multiplier (0.25 to 4.0)
- audio_path: Optional music file to overlay
- audio_volume: Volume for audio overlay (0.0 to 1.0)
- Returns:
- True if processing succeeded, False otherwise
- """
- # Build FFmpeg command
- cmd = [self.ffmpeg, "-y"]
- # Input seeking (fast seek before input)
- if trim_start > 0:
- cmd.extend(["-ss", str(trim_start)])
- cmd.extend(["-i", str(self.input_path)])
- # Add audio input if provided
- if audio_path:
- cmd.extend(["-i", str(audio_path)])
- # Duration limit
- if trim_end is not None and trim_end > trim_start:
- duration = trim_end - trim_start
- cmd.extend(["-t", str(duration)])
- # Build filters - use filter_complex when we have audio overlay
- video_filter = ""
- if speed != 1.0:
- # setpts changes video speed: PTS/speed = faster, PTS*speed = slower
- setpts_value = 1.0 / speed
- video_filter = f"setpts={setpts_value}*PTS"
- if audio_path:
- # Use filter_complex for audio overlay (can't mix with -vf/-af)
- filter_parts = []
- # Video filter
- if video_filter:
- filter_parts.append(f"[0:v]{video_filter}[v]")
- video_out = "[v]"
- else:
- video_out = "0:v"
- # Audio filter with volume
- filter_parts.append(f"[1:a]volume={audio_volume}[a]")
- cmd.extend(["-filter_complex", ";".join(filter_parts)])
- cmd.extend(["-map", video_out, "-map", "[a]"])
- cmd.extend(["-shortest"])
- elif speed != 1.0:
- # No audio overlay - use simple -vf and -af
- if video_filter:
- cmd.extend(["-vf", video_filter])
- # Adjust original audio speed with atempo
- atempo_chain = self._build_atempo_chain(speed)
- if atempo_chain:
- cmd.extend(["-af", atempo_chain])
- # Output settings
- cmd.extend(
- [
- "-c:v",
- "libx264",
- "-preset",
- "fast",
- "-crf",
- "23",
- "-c:a",
- "aac",
- "-b:a",
- "128k",
- "-movflags",
- "+faststart", # Enable streaming
- str(output_path),
- ]
- )
- logger.info("Processing timelapse: %s", " ".join(cmd))
- # Run FFmpeg
- process = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- _, stderr = await process.communicate()
- if process.returncode != 0:
- logger.error("FFmpeg processing failed: %s", stderr.decode())
- return False
- return output_path.exists()
- def _build_atempo_chain(self, speed: float) -> str:
- """Build atempo filter chain.
- atempo filter only supports values between 0.5 and 2.0,
- so we chain multiple filters for extreme speeds.
- """
- if speed == 1.0:
- return ""
- filters = []
- remaining_speed = speed
- # Handle speeds > 2.0 by chaining atempo=2.0
- while remaining_speed > 2.0:
- filters.append("atempo=2.0")
- remaining_speed /= 2.0
- # Handle speeds < 0.5 by chaining atempo=0.5
- while remaining_speed < 0.5:
- filters.append("atempo=0.5")
- remaining_speed *= 2.0
- # Add final atempo for remaining adjustment
- if 0.5 <= remaining_speed <= 2.0 and remaining_speed != 1.0:
- filters.append(f"atempo={remaining_speed:.4f}")
- return ",".join(filters)
|