Files
encoderPro/reencode.py
2026-01-24 17:43:28 -05:00

1418 lines
53 KiB
Python

#!/usr/bin/env python3
"""
ENCODERPRO - PHASE 3
====================
High-performance media re-encoding with GPU acceleration, parallel processing,
and advanced encoding profiles.
Features:
- GPU acceleration (NVENC, QSV, VAAPI)
- Automatic encoder detection and fallback
- Parallel processing with worker pools
- Multiple encoding profiles
- Progress tracking with ETA
- Resource management (CPU/GPU limits)
- Advanced encoding rules (resolution, HDR, audio)
- Docker-ready architecture
"""
import argparse
import concurrent.futures
import json
import logging
import multiprocessing
import os
import re
import shutil
import signal
import sqlite3
import subprocess
import sys
import threading
import time
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from enum import Enum
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any
__version__ = "3.0.0"
# =============================================================================
# CONSTANTS & ENUMS
# =============================================================================
class ProcessingState(Enum):
"""File processing states"""
DISCOVERED = "discovered" # Found during scan, not selected yet
PENDING = "pending" # User selected for encoding
PROCESSING = "processing" # Currently being encoded
COMPLETED = "completed" # Successfully encoded
FAILED = "failed" # Encoding failed, can retry
SKIPPED = "skipped" # No subtitles or excluded
class EncoderType(Enum):
"""Hardware encoder types"""
CPU_X265 = "cpu_x265"
CPU_X264 = "cpu_x264"
CPU_AV1 = "cpu_av1"
NVIDIA_NVENC_H265 = "nvenc_h265"
NVIDIA_NVENC_H264 = "nvenc_h264"
NVIDIA_NVENC_AV1 = "nvenc_av1"
INTEL_QSV_H265 = "qsv_h265"
INTEL_QSV_H264 = "qsv_h264"
INTEL_QSV_AV1 = "qsv_av1"
AMD_VAAPI_H265 = "vaapi_h265"
AMD_VAAPI_H264 = "vaapi_h264"
AMD_VAAPI_AV1 = "vaapi_av1"
@dataclass
class EncodingProfile:
"""Encoding profile configuration"""
name: str
encoder: EncoderType
preset: str
quality: int # CRF or QP depending on encoder
audio_codec: str = "copy"
max_resolution: Optional[str] = None # e.g., "1920x1080"
hdr_handling: str = "preserve" # preserve, tonemap, strip
def to_dict(self) -> Dict:
return asdict(self)
@dataclass
class EncoderCapabilities:
"""System encoder capabilities"""
has_nvenc: bool = False
has_qsv: bool = False
has_vaapi: bool = False
has_x265: bool = False
has_x264: bool = False
has_av1: bool = False
has_nvenc_av1: bool = False
has_qsv_av1: bool = False
has_vaapi_av1: bool = False
nvenc_devices: List[int] = None
def __post_init__(self):
if self.nvenc_devices is None:
self.nvenc_devices = []
# =============================================================================
# CONFIGURATION
# =============================================================================
class Config:
"""Configuration container for Phase 3"""
def __init__(self, config_dict: dict):
# Phase 2 settings
self.movies_dir = Path(config_dict.get('movies_dir', '/mnt/user/movies'))
self.archive_dir = Path(config_dict.get('archive_dir', '/mnt/user/archive/movies'))
self.work_dir = Path(config_dict.get('work_dir', '/mnt/user/temp/reencode-work'))
self.state_db = Path(config_dict.get('state_db', '/var/lib/reencode/state.db'))
self.log_dir = Path(config_dict.get('log_dir', '/var/log/reencode'))
# Phase 3: Parallel processing
parallel = config_dict.get('parallel', {})
self.max_workers = parallel.get('max_workers', 1)
self.gpu_slots = parallel.get('gpu_slots', 1) # Concurrent GPU encodes
self.cpu_slots = parallel.get('cpu_slots', multiprocessing.cpu_count())
# Phase 3: Profiles
profiles_config = config_dict.get('profiles', {})
self.default_profile = profiles_config.get('default', 'balanced_gpu')
self.profiles = self._load_profiles(profiles_config.get('definitions', {}))
# Processing settings
processing = config_dict.get('processing', {})
self.file_extensions = processing.get('file_extensions', ['mkv', 'mp4', 'avi', 'm4v'])
self.skip_without_subtitles = processing.get('skip_without_subtitles', True)
self.cleanup_stale_work = processing.get('cleanup_stale_work', True)
# Phase 3: Advanced options
advanced = config_dict.get('advanced', {})
self.auto_detect_encoders = advanced.get('auto_detect_encoders', True)
self.prefer_gpu = advanced.get('prefer_gpu', True)
self.fallback_to_cpu = advanced.get('fallback_to_cpu', True)
self.progress_interval = advanced.get('progress_interval', 10) # seconds
# Ensure directories exist
self.work_dir.mkdir(parents=True, exist_ok=True)
self.archive_dir.mkdir(parents=True, exist_ok=True)
self.state_db.parent.mkdir(parents=True, exist_ok=True)
self.log_dir.mkdir(parents=True, exist_ok=True)
def _load_profiles(self, profiles_dict: Dict) -> Dict[str, EncodingProfile]:
"""Load encoding profiles from configuration"""
profiles = {}
# Default profiles if none specified
if not profiles_dict:
profiles_dict = {
'balanced_gpu': {
'encoder': 'nvenc_h265',
'preset': 'p4',
'quality': 23
},
'fast_gpu': {
'encoder': 'nvenc_h264',
'preset': 'p1',
'quality': 26
},
'quality_cpu': {
'encoder': 'cpu_x265',
'preset': 'slow',
'quality': 20
},
'balanced_cpu': {
'encoder': 'cpu_x265',
'preset': 'medium',
'quality': 23
}
}
for name, profile_config in profiles_dict.items():
encoder_str = profile_config.get('encoder', 'cpu_x265')
# Map string to EncoderType
try:
encoder = EncoderType[encoder_str.upper()]
except KeyError:
encoder = EncoderType.CPU_X265
profiles[name] = EncodingProfile(
name=name,
encoder=encoder,
preset=profile_config.get('preset', 'medium'),
quality=profile_config.get('quality', 23),
audio_codec=profile_config.get('audio_codec', 'copy'),
max_resolution=profile_config.get('max_resolution'),
hdr_handling=profile_config.get('hdr_handling', 'preserve')
)
return profiles
def get_profile(self, name: Optional[str] = None) -> EncodingProfile:
"""Get encoding profile by name"""
profile_name = name or self.default_profile
return self.profiles.get(profile_name, list(self.profiles.values())[0])
# =============================================================================
# ENCODER DETECTION
# =============================================================================
class EncoderDetector:
"""Detects available hardware encoders"""
@staticmethod
def detect_capabilities() -> EncoderCapabilities:
"""Detect available encoders on the system"""
caps = EncoderCapabilities()
# Check FFmpeg encoders
try:
result = subprocess.run(
['ffmpeg', '-hide_banner', '-encoders'],
capture_output=True,
text=True,
timeout=10
)
encoders_output = result.stdout.lower()
# CPU encoders
caps.has_x265 = 'libx265' in encoders_output
caps.has_x264 = 'libx264' in encoders_output
caps.has_av1 = 'libsvtav1' in encoders_output or 'libaom-av1' in encoders_output
# NVIDIA NVENC
caps.has_nvenc = 'hevc_nvenc' in encoders_output or 'h264_nvenc' in encoders_output
caps.has_nvenc_av1 = 'av1_nvenc' in encoders_output
# Intel QSV
caps.has_qsv = 'hevc_qsv' in encoders_output or 'h264_qsv' in encoders_output
caps.has_qsv_av1 = 'av1_qsv' in encoders_output
# AMD VAAPI
caps.has_vaapi = 'hevc_vaapi' in encoders_output or 'h264_vaapi' in encoders_output
caps.has_vaapi_av1 = 'av1_vaapi' in encoders_output
except Exception as e:
logging.warning(f"Failed to detect encoders: {e}")
# Detect NVIDIA GPU devices
if caps.has_nvenc:
caps.nvenc_devices = EncoderDetector._detect_nvidia_gpus()
return caps
@staticmethod
def _detect_nvidia_gpus() -> List[int]:
"""Detect NVIDIA GPU device IDs"""
try:
result = subprocess.run(
['nvidia-smi', '--query-gpu=index', '--format=csv,noheader'],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
return [int(line.strip()) for line in result.stdout.strip().split('\n') if line.strip()]
except:
pass
return []
@staticmethod
def select_best_encoder(caps: EncoderCapabilities, prefer_gpu: bool = True) -> EncoderType:
"""Select best available encoder"""
if prefer_gpu:
# Priority: NVENC > QSV > VAAPI > CPU
if caps.has_nvenc:
return EncoderType.NVIDIA_NVENC_H265
elif caps.has_qsv:
return EncoderType.INTEL_QSV_H265
elif caps.has_vaapi:
return EncoderType.AMD_VAAPI_H265
# Fallback to CPU
if caps.has_x265:
return EncoderType.CPU_X265
elif caps.has_x264:
return EncoderType.CPU_X264
raise RuntimeError("No suitable encoder found")
# =============================================================================
# DATABASE (Extended from Phase 2)
# =============================================================================
class StateDatabase:
"""Extended database with Phase 3 features"""
def __init__(self, db_path: Path):
self.db_path = db_path
self.conn = None
self._lock = threading.Lock()
self._init_database()
def _init_database(self):
"""Initialize database schema"""
self.conn = sqlite3.connect(str(self.db_path), check_same_thread=False)
self.conn.row_factory = sqlite3.Row
cursor = self.conn.cursor()
# Main files table (Phase 2 schema)
cursor.execute("""
CREATE TABLE IF NOT EXISTS files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filepath TEXT UNIQUE NOT NULL,
relative_path TEXT NOT NULL,
state TEXT NOT NULL,
has_subtitles BOOLEAN,
original_size INTEGER,
encoded_size INTEGER,
subtitle_count INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
started_at TIMESTAMP,
completed_at TIMESTAMP,
error_message TEXT,
profile_name TEXT,
encoder_used TEXT,
encode_time_seconds REAL,
fps REAL
)
""")
# Phase 3: Processing history
cursor.execute("""
CREATE TABLE IF NOT EXISTS processing_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
file_id INTEGER NOT NULL,
profile_name TEXT,
encoder_used TEXT,
started_at TIMESTAMP,
completed_at TIMESTAMP,
success BOOLEAN,
error_message TEXT,
original_size INTEGER,
encoded_size INTEGER,
encode_time_seconds REAL,
fps REAL,
FOREIGN KEY (file_id) REFERENCES files (id)
)
""")
# Indices (core columns only)
cursor.execute("CREATE INDEX IF NOT EXISTS idx_state ON files(state)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_filepath ON files(filepath)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_profile ON files(profile_name)")
# Migration: Add new columns if they don't exist
cursor.execute("PRAGMA table_info(files)")
columns = {row[1] for row in cursor.fetchall()}
migrations = [
("video_codec", "ALTER TABLE files ADD COLUMN video_codec TEXT"),
("audio_codec", "ALTER TABLE files ADD COLUMN audio_codec TEXT"),
("audio_channels", "ALTER TABLE files ADD COLUMN audio_channels INTEGER"),
("width", "ALTER TABLE files ADD COLUMN width INTEGER"),
("height", "ALTER TABLE files ADD COLUMN height INTEGER"),
("duration", "ALTER TABLE files ADD COLUMN duration REAL"),
("bitrate", "ALTER TABLE files ADD COLUMN bitrate INTEGER"),
("container_format", "ALTER TABLE files ADD COLUMN container_format TEXT"),
("file_hash", "ALTER TABLE files ADD COLUMN file_hash TEXT"),
]
for column_name, alter_sql in migrations:
if column_name not in columns:
logging.info(f"Adding column '{column_name}' to files table")
cursor.execute(alter_sql)
# Create indices for migrated columns
cursor.execute("CREATE INDEX IF NOT EXISTS idx_file_hash ON files(file_hash)")
self.conn.commit()
def add_file(self, filepath: str, relative_path: str, has_subtitles: bool,
subtitle_count: int, original_size: int, video_codec: str = None,
audio_codec: str = None, audio_channels: int = None,
width: int = None, height: int = None, duration: float = None,
bitrate: int = None, container_format: str = None, file_hash: str = None):
"""Add or update a file in the database"""
with self._lock:
cursor = self.conn.cursor()
# All scanned files are marked as DISCOVERED (found but not selected)
# Users can then filter and select which files to encode
# Only when user selects files do they become PENDING
state = ProcessingState.DISCOVERED.value
cursor.execute("""
INSERT INTO files (filepath, relative_path, state, has_subtitles,
subtitle_count, original_size, video_codec, audio_codec,
audio_channels, width, height, duration, bitrate, container_format, file_hash)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(filepath) DO UPDATE SET
state = excluded.state,
has_subtitles = excluded.has_subtitles,
subtitle_count = excluded.subtitle_count,
original_size = excluded.original_size,
video_codec = excluded.video_codec,
audio_codec = excluded.audio_codec,
audio_channels = excluded.audio_channels,
width = excluded.width,
height = excluded.height,
duration = excluded.duration,
bitrate = excluded.bitrate,
container_format = excluded.container_format,
file_hash = excluded.file_hash,
updated_at = CURRENT_TIMESTAMP
""", (filepath, relative_path, state, has_subtitles, subtitle_count, original_size,
video_codec, audio_codec, audio_channels, width, height, duration, bitrate, container_format, file_hash))
self.conn.commit()
def get_file(self, filepath: str) -> Optional[Dict]:
"""Get file record"""
with self._lock:
cursor = self.conn.cursor()
cursor.execute("SELECT * FROM files WHERE filepath = ?", (filepath,))
row = cursor.fetchone()
return dict(row) if row else None
def find_duplicates_by_hash(self, file_hash: str) -> List[Dict]:
"""Find all files with the same content hash"""
with self._lock:
cursor = self.conn.cursor()
cursor.execute("SELECT * FROM files WHERE file_hash = ?", (file_hash,))
rows = cursor.fetchall()
return [dict(row) for row in rows]
def update_state(self, filepath: str, state: ProcessingState,
error_message: Optional[str] = None):
"""Update file processing state"""
with self._lock:
cursor = self.conn.cursor()
timestamp_field = None
if state == ProcessingState.PROCESSING:
timestamp_field = "started_at"
elif state in (ProcessingState.COMPLETED, ProcessingState.FAILED, ProcessingState.SKIPPED):
timestamp_field = "completed_at"
if timestamp_field:
cursor.execute(f"""
UPDATE files
SET state = ?, error_message = ?, {timestamp_field} = CURRENT_TIMESTAMP,
updated_at = CURRENT_TIMESTAMP
WHERE filepath = ?
""", (state.value, error_message, filepath))
else:
cursor.execute("""
UPDATE files
SET state = ?, error_message = ?, updated_at = CURRENT_TIMESTAMP
WHERE filepath = ?
""", (state.value, error_message, filepath))
self.conn.commit()
def update_processing_info(self, filepath: str, profile_name: str, encoder: str,
encoded_size: int, encode_time: float, fps: float):
"""Update processing information"""
with self._lock:
cursor = self.conn.cursor()
cursor.execute("""
UPDATE files
SET encoded_size = ?, profile_name = ?, encoder_used = ?,
encode_time_seconds = ?, fps = ?
WHERE filepath = ?
""", (encoded_size, profile_name, encoder, encode_time, fps, filepath))
self.conn.commit()
def get_files_by_state(self, state: ProcessingState, limit: Optional[int] = None) -> List[Dict]:
"""Get files in a given state"""
with self._lock:
cursor = self.conn.cursor()
query = "SELECT * FROM files WHERE state = ? ORDER BY created_at"
if limit:
query += f" LIMIT {limit}"
cursor.execute(query, (state.value,))
return [dict(row) for row in cursor.fetchall()]
def get_statistics(self) -> Dict:
"""Get processing statistics"""
with self._lock:
cursor = self.conn.cursor()
stats = {}
for state in ProcessingState:
cursor.execute("SELECT COUNT(*) FROM files WHERE state = ?", (state.value,))
stats[state.value] = cursor.fetchone()[0]
# Size statistics
cursor.execute("""
SELECT SUM(original_size), SUM(encoded_size), AVG(fps), AVG(encode_time_seconds)
FROM files WHERE state = ?
""", (ProcessingState.COMPLETED.value,))
row = cursor.fetchone()
stats['original_size_completed'] = row[0] or 0
stats['encoded_size_completed'] = row[1] or 0
stats['avg_fps'] = row[2] or 0
stats['avg_encode_time'] = row[3] or 0
# Encoder usage
cursor.execute("""
SELECT encoder_used, COUNT(*) as count
FROM files
WHERE state = ? AND encoder_used IS NOT NULL
GROUP BY encoder_used
""", (ProcessingState.COMPLETED.value,))
stats['encoder_usage'] = {row[0]: row[1] for row in cursor.fetchall()}
return stats
def reset_processing_files(self):
"""Reset files stuck in 'processing' state"""
with self._lock:
cursor = self.conn.cursor()
cursor.execute("""
UPDATE files
SET state = ?, updated_at = CURRENT_TIMESTAMP
WHERE state = ?
""", (ProcessingState.PENDING.value, ProcessingState.PROCESSING.value))
reset_count = cursor.rowcount
self.conn.commit()
return reset_count
def close(self):
"""Close database connection"""
if self.conn:
self.conn.close()
# =============================================================================
# MEDIA INSPECTION
# =============================================================================
class MediaInspector:
"""Enhanced media inspection for Phase 3"""
@staticmethod
def has_subtitles(filepath: Path) -> Tuple[bool, int]:
"""Check if file has subtitle streams"""
try:
result = subprocess.run(
['ffprobe', '-v', 'error', '-select_streams', 's',
'-show_entries', 'stream=codec_name',
'-of', 'default=noprint_wrappers=1:nokey=1', str(filepath)],
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
subtitle_streams = [s for s in result.stdout.strip().split('\n') if s]
count = len(subtitle_streams)
return (count > 0, count)
return (False, 0)
except:
return (False, 0)
@staticmethod
def get_media_info(filepath: Path) -> Dict[str, Any]:
"""Get detailed media information"""
try:
result = subprocess.run(
['ffprobe', '-v', 'quiet', '-print_format', 'json',
'-show_format', '-show_streams', str(filepath)],
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
return json.loads(result.stdout)
except Exception as e:
logging.error(f"Failed to get media info for {filepath}: {e}")
return {}
@staticmethod
def validate_file(filepath: Path) -> bool:
"""Validate media file"""
try:
result = subprocess.run(
['ffprobe', '-v', 'error', str(filepath)],
capture_output=True,
timeout=30
)
return result.returncode == 0
except:
return False
@staticmethod
def get_file_hash(filepath: Path, chunk_size: int = 8192) -> str:
"""
Calculate a fast hash of the file using first/last chunks + size.
This is faster than hashing the entire file for large videos.
"""
import hashlib
try:
file_size = filepath.stat().st_size
# For small files (<100MB), hash the entire file
if file_size < 100 * 1024 * 1024:
hasher = hashlib.sha256()
with open(filepath, 'rb') as f:
while chunk := f.read(chunk_size):
hasher.update(chunk)
return hasher.hexdigest()
# For large files, hash: size + first 64KB + middle 64KB + last 64KB
hasher = hashlib.sha256()
hasher.update(str(file_size).encode())
with open(filepath, 'rb') as f:
# First chunk
hasher.update(f.read(65536))
# Middle chunk
f.seek(file_size // 2)
hasher.update(f.read(65536))
# Last chunk
f.seek(-65536, 2)
hasher.update(f.read(65536))
return hasher.hexdigest()
except Exception as e:
logging.error(f"Failed to hash file {filepath}: {e}")
return None
# =============================================================================
# FILE PROCESSOR (Enhanced for Phase 3)
# =============================================================================
class FileProcessor:
"""Enhanced file processor with GPU support and progress tracking"""
def __init__(self, config: Config, db: StateDatabase, caps: EncoderCapabilities):
self.config = config
self.db = db
self.caps = caps
self.logger = logging.getLogger(f'FileProcessor-{id(self)}')
self._stop_flag = threading.Event()
def stop(self):
"""Signal processor to stop"""
self._stop_flag.set()
def process_file(self, filepath: Path, profile_name: Optional[str] = None) -> bool:
"""Process a single file with given profile"""
if self._stop_flag.is_set():
return False
relative_path = filepath.relative_to(self.config.movies_dir)
self.logger.info(f"Processing: {relative_path}")
# Get profile
profile = self.config.get_profile(profile_name)
# Select appropriate encoder (may fallback if GPU not available)
encoder = self._select_encoder(profile.encoder)
self.db.update_state(str(filepath), ProcessingState.PROCESSING)
start_time = time.time()
try:
# Create work path
work_path = self.config.work_dir / relative_path
# Change extension to .mp4 if encoding HEVC/AV1 and original is .m4v
# (.m4v doesn't support HEVC or AV1 codecs)
is_modern_codec = encoder in (
EncoderType.INTEL_QSV_H265, EncoderType.NVIDIA_NVENC_H265,
EncoderType.AMD_VAAPI_H265, EncoderType.CPU_X265,
EncoderType.INTEL_QSV_AV1, EncoderType.NVIDIA_NVENC_AV1,
EncoderType.AMD_VAAPI_AV1, EncoderType.CPU_AV1
)
if is_modern_codec and work_path.suffix.lower() == '.m4v':
work_path = work_path.with_suffix('.mp4')
relative_path = relative_path.with_suffix('.mp4')
codec_name = "HEVC" if "H265" in encoder.name else "AV1" if "AV1" in encoder.name else "modern codec"
self.logger.info(f"Changed output extension to .mp4 for {codec_name} compatibility")
work_path.parent.mkdir(parents=True, exist_ok=True)
# Re-encode with progress tracking
if not self._reencode(filepath, work_path, profile, encoder):
raise Exception("Encoding failed")
# Calculate stats
encode_time = time.time() - start_time
# Verify output
if not self._verify_output(work_path):
raise Exception("Output verification failed")
# Get encoding stats
encoded_size = work_path.stat().st_size
original_size = filepath.stat().st_size
# Calculate FPS (approximate)
media_info = MediaInspector.get_media_info(filepath)
duration = float(media_info.get('format', {}).get('duration', 0))
fps = duration / encode_time if duration and encode_time > 0 else 0
# Archive original
archive_path = self.config.archive_dir / relative_path.with_suffix(filepath.suffix)
archive_path.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(filepath), str(archive_path))
# Move encoded file into place (with potentially new extension)
final_path = self.config.movies_dir / relative_path
shutil.move(str(work_path), str(final_path))
# Update database
self.db.update_processing_info(
str(filepath),
profile.name,
encoder.name,
encoded_size,
encode_time,
fps
)
self.db.update_state(str(filepath), ProcessingState.COMPLETED)
# Log success
savings = original_size - encoded_size
savings_pct = (savings / original_size * 100) if original_size > 0 else 0
self.logger.info(
f"[OK] Completed: {relative_path} | "
f"Encoder: {encoder.name} | "
f"Time: {encode_time:.1f}s | "
f"FPS: {fps:.2f} | "
f"Saved: {savings_pct:.1f}%"
)
return True
except Exception as e:
self.logger.error(f"[FAIL] Failed: {relative_path} - {e}")
self.db.update_state(str(filepath), ProcessingState.FAILED, str(e))
# Cleanup work file
if work_path.exists():
work_path.unlink()
return False
def _select_encoder(self, preferred: EncoderType) -> EncoderType:
"""Select encoder with fallback logic"""
# Check if preferred encoder is available
if self._is_encoder_available(preferred):
return preferred
# Try to find GPU alternative
if self.config.prefer_gpu:
for encoder in [EncoderType.NVIDIA_NVENC_H265, EncoderType.INTEL_QSV_H265,
EncoderType.AMD_VAAPI_H265]:
if self._is_encoder_available(encoder):
self.logger.info(f"Falling back to {encoder.name}")
return encoder
# Fallback to CPU
if self.config.fallback_to_cpu:
if self.caps.has_x265:
self.logger.info("Falling back to CPU H.265")
return EncoderType.CPU_X265
elif self.caps.has_x264:
self.logger.info("Falling back to CPU H.264")
return EncoderType.CPU_X264
raise RuntimeError(f"Encoder {preferred.name} not available and no fallback found")
def _is_encoder_available(self, encoder: EncoderType) -> bool:
"""Check if encoder is available"""
if encoder == EncoderType.CPU_X265:
return self.caps.has_x265
elif encoder == EncoderType.CPU_X264:
return self.caps.has_x264
elif encoder in (EncoderType.NVIDIA_NVENC_H265, EncoderType.NVIDIA_NVENC_H264):
return self.caps.has_nvenc
elif encoder in (EncoderType.INTEL_QSV_H265, EncoderType.INTEL_QSV_H264):
return self.caps.has_qsv
elif encoder in (EncoderType.AMD_VAAPI_H265, EncoderType.AMD_VAAPI_H264):
return self.caps.has_vaapi
return False
def _build_ffmpeg_command(self, input_path: Path, output_path: Path,
profile: EncodingProfile, encoder: EncoderType) -> List[str]:
"""Build FFmpeg command for given encoder and profile"""
cmd = ['ffmpeg', '-hide_banner', '-loglevel', 'warning', '-stats']
# Input
cmd.extend(['-i', str(input_path)])
# Hardware acceleration setup
if encoder in (EncoderType.NVIDIA_NVENC_H265, EncoderType.NVIDIA_NVENC_H264, EncoderType.NVIDIA_NVENC_AV1):
# NVENC
if encoder == EncoderType.NVIDIA_NVENC_H265:
video_codec = 'hevc_nvenc'
elif encoder == EncoderType.NVIDIA_NVENC_AV1:
video_codec = 'av1_nvenc'
else:
video_codec = 'h264_nvenc'
cmd.extend([
'-map', '0:v', '-map', '0:a',
'-c:v', video_codec,
'-preset', profile.preset,
'-cq', str(profile.quality),
'-c:a', profile.audio_codec,
'-sn'
])
elif encoder in (EncoderType.INTEL_QSV_H265, EncoderType.INTEL_QSV_H264, EncoderType.INTEL_QSV_AV1):
# QSV
if encoder == EncoderType.INTEL_QSV_H265:
video_codec = 'hevc_qsv'
elif encoder == EncoderType.INTEL_QSV_AV1:
video_codec = 'av1_qsv'
else:
video_codec = 'h264_qsv'
cmd.extend([
'-map', '0:v', '-map', '0:a',
'-c:v', video_codec,
'-preset', profile.preset,
'-global_quality', str(profile.quality),
'-c:a', profile.audio_codec,
'-sn'
])
elif encoder in (EncoderType.AMD_VAAPI_H265, EncoderType.AMD_VAAPI_H264, EncoderType.AMD_VAAPI_AV1):
# VAAPI
if encoder == EncoderType.AMD_VAAPI_H265:
video_codec = 'hevc_vaapi'
elif encoder == EncoderType.AMD_VAAPI_AV1:
video_codec = 'av1_vaapi'
else:
video_codec = 'h264_vaapi'
cmd.extend([
'-vaapi_device', '/dev/dri/renderD128',
'-map', '0:v', '-map', '0:a',
'-c:v', video_codec,
'-qp', str(profile.quality),
'-c:a', profile.audio_codec,
'-sn'
])
else:
# CPU encoding
if encoder == EncoderType.CPU_X265:
video_codec = 'libx265'
elif encoder == EncoderType.CPU_AV1:
video_codec = 'libsvtav1'
else:
video_codec = 'libx264'
cmd.extend([
'-map', '0:v', '-map', '0:a',
'-c:v', video_codec,
'-preset', profile.preset,
'-crf', str(profile.quality),
'-c:a', profile.audio_codec,
'-sn'
])
# Output options
cmd.extend([
'-movflags', '+faststart',
'-n',
str(output_path)
])
return cmd
def _reencode(self, input_path: Path, output_path: Path,
profile: EncodingProfile, encoder: EncoderType) -> bool:
"""Re-encode video file"""
self.logger.info(f"Encoding with {encoder.name}, profile: {profile.name}")
cmd = self._build_ffmpeg_command(input_path, output_path, profile, encoder)
try:
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
self.logger.error(f"FFmpeg error: {result.stderr}")
return False
return True
except Exception as e:
self.logger.error(f"Encoding exception: {e}")
return False
def _verify_output(self, output_path: Path) -> bool:
"""Verify encoded output"""
if not output_path.exists():
self.logger.error(f"Output file missing: {output_path}")
return False
size = output_path.stat().st_size
if size < 1024:
self.logger.error(f"Output file too small: {size} bytes")
return False
if not MediaInspector.validate_file(output_path):
self.logger.error("Output failed validation")
return False
# Verify subtitles were removed
has_subs, count = MediaInspector.has_subtitles(output_path)
if has_subs:
self.logger.error(f"Output still has {count} subtitle stream(s)")
return False
return True
# =============================================================================
# LIBRARY SCANNER
# =============================================================================
class LibraryScanner:
"""Library scanner (same as Phase 2)"""
def __init__(self, config: Config, db: StateDatabase):
self.config = config
self.db = db
self.logger = logging.getLogger('LibraryScanner')
def scan(self) -> int:
"""Scan movie directory"""
self.logger.info(f"Scanning: {self.config.movies_dir}")
# Build set of valid extensions (lowercase for case-insensitive comparison)
valid_extensions = {ext.lower() for ext in self.config.file_extensions}
files_found = 0
files_added = 0
# Iterate all files and filter by extension case-insensitively
for filepath in self.config.movies_dir.rglob('*'):
if not filepath.is_file():
continue
# Check extension case-insensitively
ext = filepath.suffix.lstrip('.').lower()
if ext not in valid_extensions:
continue
files_found += 1
# Check if already completed/skipped
existing = self.db.get_file(str(filepath))
if existing and existing['state'] in (ProcessingState.COMPLETED.value,
ProcessingState.SKIPPED.value):
continue
# Inspect file
has_subs, sub_count = MediaInspector.has_subtitles(filepath)
original_size = filepath.stat().st_size
relative_path = str(filepath.relative_to(self.config.movies_dir))
# Calculate file hash for duplicate detection
file_hash = MediaInspector.get_file_hash(filepath)
# Check for duplicates by content hash
if file_hash:
duplicates = self.db.find_duplicates_by_hash(file_hash)
# Check if any duplicate has been completed
completed_duplicate = next(
(d for d in duplicates if d['state'] == ProcessingState.COMPLETED.value),
None
)
if completed_duplicate:
self.logger.info(f"Skipping duplicate of already encoded file: {filepath.name}")
self.logger.info(f" Original: {completed_duplicate['relative_path']}")
# Mark this file as skipped
self.db.add_file(
str(filepath),
relative_path,
False,
0,
original_size,
None, None, None, None, None, None, None, None, file_hash
)
self.db.update_state(
str(filepath),
ProcessingState.SKIPPED,
f"Duplicate of: {completed_duplicate['relative_path']}"
)
continue
# Get detailed media info
media_info = MediaInspector.get_media_info(filepath)
video_codec = None
audio_codec = None
audio_channels = None
width = None
height = None
duration = None
bitrate = None
container_format = None
if media_info and 'streams' in media_info:
# Extract video stream info
for stream in media_info['streams']:
if stream.get('codec_type') == 'video' and not video_codec:
video_codec = stream.get('codec_name')
width = stream.get('width')
height = stream.get('height')
elif stream.get('codec_type') == 'audio' and not audio_codec:
audio_codec = stream.get('codec_name')
audio_channels = stream.get('channels')
# Extract format info
if 'format' in media_info:
duration = float(media_info['format'].get('duration', 0))
bitrate = int(media_info['format'].get('bit_rate', 0))
container_format = media_info['format'].get('format_name', '').split(',')[0] # Get first format
self.db.add_file(
str(filepath),
relative_path,
has_subs,
sub_count,
original_size,
video_codec,
audio_codec,
audio_channels,
width,
height,
duration,
bitrate,
container_format,
file_hash
)
files_added += 1
if files_found % 10 == 0:
self.logger.info(f"Scanned {files_found} files...")
self.logger.info(f"Scan complete: {files_found} files found, {files_added} added/updated")
return files_found
# =============================================================================
# PARALLEL PROCESSING COORDINATOR
# =============================================================================
class ParallelCoordinator:
"""Coordinates parallel processing with worker pools"""
def __init__(self, config: Config, db: StateDatabase, caps: EncoderCapabilities):
self.config = config
self.db = db
self.caps = caps
self.logger = logging.getLogger('ParallelCoordinator')
self._stop_event = threading.Event()
self._active_workers = 0
self._workers_lock = threading.Lock()
def process_parallel(self, files: List[Dict], profile_name: Optional[str] = None):
"""Process files in parallel"""
total_files = len(files)
self.logger.info(f"Starting parallel processing of {total_files} files")
self.logger.info(f"Max workers: {self.config.max_workers}")
completed = 0
failed = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=self.config.max_workers) as executor:
# Submit all tasks
future_to_file = {
executor.submit(self._process_file_wrapper, Path(f['filepath']), profile_name): f
for f in files
}
# Process results as they complete
for future in concurrent.futures.as_completed(future_to_file):
if self._stop_event.is_set():
self.logger.info("Stop signal received, cancelling pending tasks")
break
file_record = future_to_file[future]
try:
success = future.result()
if success:
completed += 1
else:
failed += 1
# Progress update
progress = completed + failed
pct = (progress / total_files) * 100
self.logger.info(f"Progress: {progress}/{total_files} ({pct:.1f}%) - "
f"Success: {completed}, Failed: {failed}")
except Exception as e:
failed += 1
self.logger.error(f"Task exception: {e}")
self.logger.info(f"Parallel processing complete: {completed} success, {failed} failed")
def _process_file_wrapper(self, filepath: Path, profile_name: Optional[str]) -> bool:
"""Wrapper for processing file (for thread pool)"""
processor = FileProcessor(self.config, self.db, self.caps)
with self._workers_lock:
self._active_workers += 1
try:
return processor.process_file(filepath, profile_name)
finally:
with self._workers_lock:
self._active_workers -= 1
def stop(self):
"""Signal to stop processing"""
self._stop_event.set()
# =============================================================================
# MAIN APPLICATION
# =============================================================================
class ReencodeApplication:
"""Main application for Phase 3"""
def __init__(self, config: Config):
self.config = config
self.db = StateDatabase(config.state_db)
self.caps = EncoderDetector.detect_capabilities() if config.auto_detect_encoders else EncoderCapabilities()
self.scanner = LibraryScanner(config, self.db)
self.coordinator = ParallelCoordinator(config, self.db, self.caps)
self.logger = logging.getLogger('ReencodeApp')
# Signal handling
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
"""Handle interrupt signals"""
self.logger.info("Interrupt received, stopping gracefully...")
self.coordinator.stop()
def setup_logging(self):
"""Configure logging"""
log_file = self.config.log_dir / 'reencode.log'
from logging.handlers import RotatingFileHandler
file_handler = RotatingFileHandler(
log_file,
maxBytes=10*1024*1024,
backupCount=5
)
file_handler.setLevel(logging.DEBUG)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
root_logger.addHandler(file_handler)
root_logger.addHandler(console_handler)
def print_encoder_capabilities(self):
"""Print detected encoder capabilities"""
# Use ASCII characters for Windows compatibility
yes = '[YES]'
no = '[ NO]'
print("\n" + "="*60)
print("ENCODER CAPABILITIES")
print("="*60)
print(f"CPU Encoders:")
print(f" H.265 (libx265): {yes if self.caps.has_x265 else no}")
print(f" H.264 (libx264): {yes if self.caps.has_x264 else no}")
print(f" AV1 (SVT-AV1): {yes if self.caps.has_av1 else no}")
print(f"\nGPU Encoders:")
print(f" NVIDIA NVENC: {yes if self.caps.has_nvenc else no}")
if self.caps.has_nvenc and self.caps.nvenc_devices:
print(f" Devices: {', '.join(map(str, self.caps.nvenc_devices))}")
print(f" AV1: {yes if self.caps.has_nvenc_av1 else no}")
print(f" Intel QSV: {yes if self.caps.has_qsv else no}")
print(f" AV1: {yes if self.caps.has_qsv_av1 else no}")
print(f" AMD VAAPI: {yes if self.caps.has_vaapi else no}")
print(f" AV1: {yes if self.caps.has_vaapi_av1 else no}")
print("="*60 + "\n")
def print_statistics(self):
"""Print processing statistics"""
stats = self.db.get_statistics()
print("\n" + "="*60)
print("PROCESSING STATISTICS")
print("="*60)
print(f"Pending: {stats['pending']:5d}")
print(f"Processing: {stats['processing']:5d}")
print(f"Completed: {stats['completed']:5d}")
print(f"Failed: {stats['failed']:5d}")
print(f"Skipped: {stats['skipped']:5d} (no subtitles)")
print("-"*60)
print(f"Total: {sum(v for k, v in stats.items() if k in ['pending', 'processing', 'completed', 'failed', 'skipped']):5d}")
if stats['completed'] > 0:
orig_size = stats['original_size_completed']
enc_size = stats['encoded_size_completed']
print(f"\nOriginal size: {self._human_size(orig_size)}")
print(f"Encoded size: {self._human_size(enc_size)}")
if orig_size > 0:
savings = orig_size - enc_size
percent = (savings / orig_size) * 100
print(f"Space saved: {self._human_size(savings)} ({percent:.1f}%)")
if stats['avg_fps'] > 0:
print(f"\nAverage FPS: {stats['avg_fps']:.2f}")
if stats['avg_encode_time'] > 0:
print(f"Avg encode time: {stats['avg_encode_time']:.1f}s")
if stats.get('encoder_usage'):
print(f"\nEncoder Usage:")
for encoder, count in stats['encoder_usage'].items():
print(f" {encoder}: {count}")
print("="*60 + "\n")
def _human_size(self, size: int) -> str:
"""Convert bytes to human readable format"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size < 1024.0:
return f"{size:.2f} {unit}"
size /= 1024.0
return f"{size:.2f} PB"
def cleanup_stale_work_files(self):
"""Remove stale work files"""
if not self.config.cleanup_stale_work:
return
self.logger.info("Cleaning up stale work files...")
count = 0
for filepath in self.config.work_dir.rglob('*'):
if filepath.is_file():
filepath.unlink()
count += 1
if count > 0:
self.logger.info(f"Removed {count} stale work file(s)")
def run(self, scan_only: bool = False, no_scan: bool = False, profile_name: Optional[str] = None):
"""Main execution"""
self.logger.info("="*60)
self.logger.info("ENCODERPRO - PHASE 3")
self.logger.info(f"Version: {__version__}")
self.logger.info("="*60)
# Show capabilities
if self.config.auto_detect_encoders:
self.print_encoder_capabilities()
# Reset stuck files
reset_count = self.db.reset_processing_files()
if reset_count > 0:
self.logger.info(f"Reset {reset_count} stuck file(s)")
# Cleanup
self.cleanup_stale_work_files()
# Scan (unless --no-scan flag is set)
if not no_scan:
self.scanner.scan()
else:
self.logger.info("Skipping library scan (--no-scan mode)")
# Statistics
self.print_statistics()
if scan_only:
self.logger.info("Scan-only mode: exiting")
return
# Get pending files
pending_files = self.db.get_files_by_state(ProcessingState.PENDING)
if not pending_files:
self.logger.info("No files to process!")
return
self.logger.info(f"Processing {len(pending_files)} file(s)...")
# Process with parallel coordinator
if self.config.max_workers > 1:
self.coordinator.process_parallel(pending_files, profile_name)
else:
# Single-threaded fallback
processor = FileProcessor(self.config, self.db, self.caps)
for file_record in pending_files:
processor.process_file(Path(file_record['filepath']), profile_name)
# Final statistics
self.logger.info("="*60)
self.logger.info("PROCESSING COMPLETE")
self.logger.info("="*60)
self.print_statistics()
def cleanup(self):
"""Cleanup resources"""
self.db.close()
# =============================================================================
# CONFIGURATION LOADING
# =============================================================================
def load_config(config_file: Optional[Path]) -> Config:
"""Load configuration from file"""
config_dict = {}
if config_file and config_file.exists():
try:
import yaml
with open(config_file, 'r') as f:
config_dict = yaml.safe_load(f) or {}
logging.info(f"Loaded configuration from: {config_file}")
except ImportError:
logging.warning("PyYAML not installed, trying JSON")
try:
with open(config_file, 'r') as f:
config_dict = json.load(f)
logging.info(f"Loaded configuration from: {config_file}")
except Exception as e:
logging.error(f"Failed to load config: {e}")
return Config(config_dict)
# =============================================================================
# ENTRY POINT
# =============================================================================
def main():
"""Main entry point"""
parser = argparse.ArgumentParser(
description='encoderPro - Phase 3',
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument('-c', '--config', type=Path, help='Configuration file (YAML or JSON)')
parser.add_argument('--scan-only', action='store_true', help='Only scan, do not process')
parser.add_argument('--no-scan', action='store_true', help='Skip library scan (for dashboard use)')
parser.add_argument('--stats', action='store_true', help='Show statistics and exit')
parser.add_argument('-p', '--profile', help='Encoding profile to use')
parser.add_argument('-v', '--version', action='version', version=f'%(prog)s {__version__}')
args = parser.parse_args()
# Load configuration
config = load_config(args.config)
# Create application
app = ReencodeApplication(config)
app.setup_logging()
try:
if args.stats:
app.print_statistics()
else:
app.run(scan_only=args.scan_only, no_scan=args.no_scan, profile_name=args.profile)
except KeyboardInterrupt:
app.logger.info("Interrupted by user")
except Exception as e:
app.logger.error(f"Fatal error: {e}", exc_info=True)
return 1
finally:
app.cleanup()
return 0
if __name__ == '__main__':
sys.exit(main())