1418 lines
53 KiB
Python
1418 lines
53 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
ENCODERPRO - PHASE 3
|
|
====================
|
|
High-performance media re-encoding with GPU acceleration, parallel processing,
|
|
and advanced encoding profiles.
|
|
|
|
Features:
|
|
- GPU acceleration (NVENC, QSV, VAAPI)
|
|
- Automatic encoder detection and fallback
|
|
- Parallel processing with worker pools
|
|
- Multiple encoding profiles
|
|
- Progress tracking with ETA
|
|
- Resource management (CPU/GPU limits)
|
|
- Advanced encoding rules (resolution, HDR, audio)
|
|
- Docker-ready architecture
|
|
"""
|
|
|
|
import argparse
|
|
import concurrent.futures
|
|
import json
|
|
import logging
|
|
import multiprocessing
|
|
import os
|
|
import re
|
|
import shutil
|
|
import signal
|
|
import sqlite3
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import time
|
|
from dataclasses import dataclass, asdict
|
|
from datetime import datetime, timedelta
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple, Any
|
|
|
|
__version__ = "3.0.0"
|
|
|
|
|
|
# =============================================================================
|
|
# CONSTANTS & ENUMS
|
|
# =============================================================================
|
|
|
|
class ProcessingState(Enum):
|
|
"""File processing states"""
|
|
DISCOVERED = "discovered" # Found during scan, not selected yet
|
|
PENDING = "pending" # User selected for encoding
|
|
PROCESSING = "processing" # Currently being encoded
|
|
COMPLETED = "completed" # Successfully encoded
|
|
FAILED = "failed" # Encoding failed, can retry
|
|
SKIPPED = "skipped" # No subtitles or excluded
|
|
|
|
|
|
class EncoderType(Enum):
|
|
"""Hardware encoder types"""
|
|
CPU_X265 = "cpu_x265"
|
|
CPU_X264 = "cpu_x264"
|
|
CPU_AV1 = "cpu_av1"
|
|
NVIDIA_NVENC_H265 = "nvenc_h265"
|
|
NVIDIA_NVENC_H264 = "nvenc_h264"
|
|
NVIDIA_NVENC_AV1 = "nvenc_av1"
|
|
INTEL_QSV_H265 = "qsv_h265"
|
|
INTEL_QSV_H264 = "qsv_h264"
|
|
INTEL_QSV_AV1 = "qsv_av1"
|
|
AMD_VAAPI_H265 = "vaapi_h265"
|
|
AMD_VAAPI_H264 = "vaapi_h264"
|
|
AMD_VAAPI_AV1 = "vaapi_av1"
|
|
|
|
|
|
@dataclass
|
|
class EncodingProfile:
|
|
"""Encoding profile configuration"""
|
|
name: str
|
|
encoder: EncoderType
|
|
preset: str
|
|
quality: int # CRF or QP depending on encoder
|
|
audio_codec: str = "copy"
|
|
max_resolution: Optional[str] = None # e.g., "1920x1080"
|
|
hdr_handling: str = "preserve" # preserve, tonemap, strip
|
|
|
|
def to_dict(self) -> Dict:
|
|
return asdict(self)
|
|
|
|
|
|
@dataclass
|
|
class EncoderCapabilities:
|
|
"""System encoder capabilities"""
|
|
has_nvenc: bool = False
|
|
has_qsv: bool = False
|
|
has_vaapi: bool = False
|
|
has_x265: bool = False
|
|
has_x264: bool = False
|
|
has_av1: bool = False
|
|
has_nvenc_av1: bool = False
|
|
has_qsv_av1: bool = False
|
|
has_vaapi_av1: bool = False
|
|
nvenc_devices: List[int] = None
|
|
|
|
def __post_init__(self):
|
|
if self.nvenc_devices is None:
|
|
self.nvenc_devices = []
|
|
|
|
|
|
# =============================================================================
|
|
# CONFIGURATION
|
|
# =============================================================================
|
|
|
|
class Config:
|
|
"""Configuration container for Phase 3"""
|
|
|
|
def __init__(self, config_dict: dict):
|
|
# Phase 2 settings
|
|
self.movies_dir = Path(config_dict.get('movies_dir', '/mnt/user/movies'))
|
|
self.archive_dir = Path(config_dict.get('archive_dir', '/mnt/user/archive/movies'))
|
|
self.work_dir = Path(config_dict.get('work_dir', '/mnt/user/temp/reencode-work'))
|
|
self.state_db = Path(config_dict.get('state_db', '/var/lib/reencode/state.db'))
|
|
self.log_dir = Path(config_dict.get('log_dir', '/var/log/reencode'))
|
|
|
|
# Phase 3: Parallel processing
|
|
parallel = config_dict.get('parallel', {})
|
|
self.max_workers = parallel.get('max_workers', 1)
|
|
self.gpu_slots = parallel.get('gpu_slots', 1) # Concurrent GPU encodes
|
|
self.cpu_slots = parallel.get('cpu_slots', multiprocessing.cpu_count())
|
|
|
|
# Phase 3: Profiles
|
|
profiles_config = config_dict.get('profiles', {})
|
|
self.default_profile = profiles_config.get('default', 'balanced_gpu')
|
|
self.profiles = self._load_profiles(profiles_config.get('definitions', {}))
|
|
|
|
# Processing settings
|
|
processing = config_dict.get('processing', {})
|
|
self.file_extensions = processing.get('file_extensions', ['mkv', 'mp4', 'avi', 'm4v'])
|
|
self.skip_without_subtitles = processing.get('skip_without_subtitles', True)
|
|
self.cleanup_stale_work = processing.get('cleanup_stale_work', True)
|
|
|
|
# Phase 3: Advanced options
|
|
advanced = config_dict.get('advanced', {})
|
|
self.auto_detect_encoders = advanced.get('auto_detect_encoders', True)
|
|
self.prefer_gpu = advanced.get('prefer_gpu', True)
|
|
self.fallback_to_cpu = advanced.get('fallback_to_cpu', True)
|
|
self.progress_interval = advanced.get('progress_interval', 10) # seconds
|
|
|
|
# Ensure directories exist
|
|
self.work_dir.mkdir(parents=True, exist_ok=True)
|
|
self.archive_dir.mkdir(parents=True, exist_ok=True)
|
|
self.state_db.parent.mkdir(parents=True, exist_ok=True)
|
|
self.log_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
def _load_profiles(self, profiles_dict: Dict) -> Dict[str, EncodingProfile]:
|
|
"""Load encoding profiles from configuration"""
|
|
profiles = {}
|
|
|
|
# Default profiles if none specified
|
|
if not profiles_dict:
|
|
profiles_dict = {
|
|
'balanced_gpu': {
|
|
'encoder': 'nvenc_h265',
|
|
'preset': 'p4',
|
|
'quality': 23
|
|
},
|
|
'fast_gpu': {
|
|
'encoder': 'nvenc_h264',
|
|
'preset': 'p1',
|
|
'quality': 26
|
|
},
|
|
'quality_cpu': {
|
|
'encoder': 'cpu_x265',
|
|
'preset': 'slow',
|
|
'quality': 20
|
|
},
|
|
'balanced_cpu': {
|
|
'encoder': 'cpu_x265',
|
|
'preset': 'medium',
|
|
'quality': 23
|
|
}
|
|
}
|
|
|
|
for name, profile_config in profiles_dict.items():
|
|
encoder_str = profile_config.get('encoder', 'cpu_x265')
|
|
|
|
# Map string to EncoderType
|
|
try:
|
|
encoder = EncoderType[encoder_str.upper()]
|
|
except KeyError:
|
|
encoder = EncoderType.CPU_X265
|
|
|
|
profiles[name] = EncodingProfile(
|
|
name=name,
|
|
encoder=encoder,
|
|
preset=profile_config.get('preset', 'medium'),
|
|
quality=profile_config.get('quality', 23),
|
|
audio_codec=profile_config.get('audio_codec', 'copy'),
|
|
max_resolution=profile_config.get('max_resolution'),
|
|
hdr_handling=profile_config.get('hdr_handling', 'preserve')
|
|
)
|
|
|
|
return profiles
|
|
|
|
def get_profile(self, name: Optional[str] = None) -> EncodingProfile:
|
|
"""Get encoding profile by name"""
|
|
profile_name = name or self.default_profile
|
|
return self.profiles.get(profile_name, list(self.profiles.values())[0])
|
|
|
|
|
|
# =============================================================================
|
|
# ENCODER DETECTION
|
|
# =============================================================================
|
|
|
|
class EncoderDetector:
|
|
"""Detects available hardware encoders"""
|
|
|
|
@staticmethod
|
|
def detect_capabilities() -> EncoderCapabilities:
|
|
"""Detect available encoders on the system"""
|
|
caps = EncoderCapabilities()
|
|
|
|
# Check FFmpeg encoders
|
|
try:
|
|
result = subprocess.run(
|
|
['ffmpeg', '-hide_banner', '-encoders'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10
|
|
)
|
|
|
|
encoders_output = result.stdout.lower()
|
|
|
|
# CPU encoders
|
|
caps.has_x265 = 'libx265' in encoders_output
|
|
caps.has_x264 = 'libx264' in encoders_output
|
|
caps.has_av1 = 'libsvtav1' in encoders_output or 'libaom-av1' in encoders_output
|
|
|
|
# NVIDIA NVENC
|
|
caps.has_nvenc = 'hevc_nvenc' in encoders_output or 'h264_nvenc' in encoders_output
|
|
caps.has_nvenc_av1 = 'av1_nvenc' in encoders_output
|
|
|
|
# Intel QSV
|
|
caps.has_qsv = 'hevc_qsv' in encoders_output or 'h264_qsv' in encoders_output
|
|
caps.has_qsv_av1 = 'av1_qsv' in encoders_output
|
|
|
|
# AMD VAAPI
|
|
caps.has_vaapi = 'hevc_vaapi' in encoders_output or 'h264_vaapi' in encoders_output
|
|
caps.has_vaapi_av1 = 'av1_vaapi' in encoders_output
|
|
|
|
except Exception as e:
|
|
logging.warning(f"Failed to detect encoders: {e}")
|
|
|
|
# Detect NVIDIA GPU devices
|
|
if caps.has_nvenc:
|
|
caps.nvenc_devices = EncoderDetector._detect_nvidia_gpus()
|
|
|
|
return caps
|
|
|
|
@staticmethod
|
|
def _detect_nvidia_gpus() -> List[int]:
|
|
"""Detect NVIDIA GPU device IDs"""
|
|
try:
|
|
result = subprocess.run(
|
|
['nvidia-smi', '--query-gpu=index', '--format=csv,noheader'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
return [int(line.strip()) for line in result.stdout.strip().split('\n') if line.strip()]
|
|
except:
|
|
pass
|
|
|
|
return []
|
|
|
|
@staticmethod
|
|
def select_best_encoder(caps: EncoderCapabilities, prefer_gpu: bool = True) -> EncoderType:
|
|
"""Select best available encoder"""
|
|
if prefer_gpu:
|
|
# Priority: NVENC > QSV > VAAPI > CPU
|
|
if caps.has_nvenc:
|
|
return EncoderType.NVIDIA_NVENC_H265
|
|
elif caps.has_qsv:
|
|
return EncoderType.INTEL_QSV_H265
|
|
elif caps.has_vaapi:
|
|
return EncoderType.AMD_VAAPI_H265
|
|
|
|
# Fallback to CPU
|
|
if caps.has_x265:
|
|
return EncoderType.CPU_X265
|
|
elif caps.has_x264:
|
|
return EncoderType.CPU_X264
|
|
|
|
raise RuntimeError("No suitable encoder found")
|
|
|
|
|
|
# =============================================================================
|
|
# DATABASE (Extended from Phase 2)
|
|
# =============================================================================
|
|
|
|
class StateDatabase:
|
|
"""Extended database with Phase 3 features"""
|
|
|
|
def __init__(self, db_path: Path):
|
|
self.db_path = db_path
|
|
self.conn = None
|
|
self._lock = threading.Lock()
|
|
self._init_database()
|
|
|
|
def _init_database(self):
|
|
"""Initialize database schema"""
|
|
self.conn = sqlite3.connect(str(self.db_path), check_same_thread=False)
|
|
self.conn.row_factory = sqlite3.Row
|
|
|
|
cursor = self.conn.cursor()
|
|
|
|
# Main files table (Phase 2 schema)
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS files (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
filepath TEXT UNIQUE NOT NULL,
|
|
relative_path TEXT NOT NULL,
|
|
state TEXT NOT NULL,
|
|
has_subtitles BOOLEAN,
|
|
original_size INTEGER,
|
|
encoded_size INTEGER,
|
|
subtitle_count INTEGER,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
started_at TIMESTAMP,
|
|
completed_at TIMESTAMP,
|
|
error_message TEXT,
|
|
profile_name TEXT,
|
|
encoder_used TEXT,
|
|
encode_time_seconds REAL,
|
|
fps REAL
|
|
)
|
|
""")
|
|
|
|
# Phase 3: Processing history
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS processing_history (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
file_id INTEGER NOT NULL,
|
|
profile_name TEXT,
|
|
encoder_used TEXT,
|
|
started_at TIMESTAMP,
|
|
completed_at TIMESTAMP,
|
|
success BOOLEAN,
|
|
error_message TEXT,
|
|
original_size INTEGER,
|
|
encoded_size INTEGER,
|
|
encode_time_seconds REAL,
|
|
fps REAL,
|
|
FOREIGN KEY (file_id) REFERENCES files (id)
|
|
)
|
|
""")
|
|
|
|
# Indices (core columns only)
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_state ON files(state)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_filepath ON files(filepath)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_profile ON files(profile_name)")
|
|
|
|
# Migration: Add new columns if they don't exist
|
|
cursor.execute("PRAGMA table_info(files)")
|
|
columns = {row[1] for row in cursor.fetchall()}
|
|
|
|
migrations = [
|
|
("video_codec", "ALTER TABLE files ADD COLUMN video_codec TEXT"),
|
|
("audio_codec", "ALTER TABLE files ADD COLUMN audio_codec TEXT"),
|
|
("audio_channels", "ALTER TABLE files ADD COLUMN audio_channels INTEGER"),
|
|
("width", "ALTER TABLE files ADD COLUMN width INTEGER"),
|
|
("height", "ALTER TABLE files ADD COLUMN height INTEGER"),
|
|
("duration", "ALTER TABLE files ADD COLUMN duration REAL"),
|
|
("bitrate", "ALTER TABLE files ADD COLUMN bitrate INTEGER"),
|
|
("container_format", "ALTER TABLE files ADD COLUMN container_format TEXT"),
|
|
("file_hash", "ALTER TABLE files ADD COLUMN file_hash TEXT"),
|
|
]
|
|
|
|
for column_name, alter_sql in migrations:
|
|
if column_name not in columns:
|
|
logging.info(f"Adding column '{column_name}' to files table")
|
|
cursor.execute(alter_sql)
|
|
|
|
# Create indices for migrated columns
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_file_hash ON files(file_hash)")
|
|
|
|
self.conn.commit()
|
|
|
|
def add_file(self, filepath: str, relative_path: str, has_subtitles: bool,
|
|
subtitle_count: int, original_size: int, video_codec: str = None,
|
|
audio_codec: str = None, audio_channels: int = None,
|
|
width: int = None, height: int = None, duration: float = None,
|
|
bitrate: int = None, container_format: str = None, file_hash: str = None):
|
|
"""Add or update a file in the database"""
|
|
with self._lock:
|
|
cursor = self.conn.cursor()
|
|
# All scanned files are marked as DISCOVERED (found but not selected)
|
|
# Users can then filter and select which files to encode
|
|
# Only when user selects files do they become PENDING
|
|
state = ProcessingState.DISCOVERED.value
|
|
|
|
cursor.execute("""
|
|
INSERT INTO files (filepath, relative_path, state, has_subtitles,
|
|
subtitle_count, original_size, video_codec, audio_codec,
|
|
audio_channels, width, height, duration, bitrate, container_format, file_hash)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(filepath) DO UPDATE SET
|
|
state = excluded.state,
|
|
has_subtitles = excluded.has_subtitles,
|
|
subtitle_count = excluded.subtitle_count,
|
|
original_size = excluded.original_size,
|
|
video_codec = excluded.video_codec,
|
|
audio_codec = excluded.audio_codec,
|
|
audio_channels = excluded.audio_channels,
|
|
width = excluded.width,
|
|
height = excluded.height,
|
|
duration = excluded.duration,
|
|
bitrate = excluded.bitrate,
|
|
container_format = excluded.container_format,
|
|
file_hash = excluded.file_hash,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
""", (filepath, relative_path, state, has_subtitles, subtitle_count, original_size,
|
|
video_codec, audio_codec, audio_channels, width, height, duration, bitrate, container_format, file_hash))
|
|
|
|
self.conn.commit()
|
|
|
|
def get_file(self, filepath: str) -> Optional[Dict]:
|
|
"""Get file record"""
|
|
with self._lock:
|
|
cursor = self.conn.cursor()
|
|
cursor.execute("SELECT * FROM files WHERE filepath = ?", (filepath,))
|
|
row = cursor.fetchone()
|
|
return dict(row) if row else None
|
|
|
|
def find_duplicates_by_hash(self, file_hash: str) -> List[Dict]:
|
|
"""Find all files with the same content hash"""
|
|
with self._lock:
|
|
cursor = self.conn.cursor()
|
|
cursor.execute("SELECT * FROM files WHERE file_hash = ?", (file_hash,))
|
|
rows = cursor.fetchall()
|
|
return [dict(row) for row in rows]
|
|
|
|
def update_state(self, filepath: str, state: ProcessingState,
|
|
error_message: Optional[str] = None):
|
|
"""Update file processing state"""
|
|
with self._lock:
|
|
cursor = self.conn.cursor()
|
|
|
|
timestamp_field = None
|
|
if state == ProcessingState.PROCESSING:
|
|
timestamp_field = "started_at"
|
|
elif state in (ProcessingState.COMPLETED, ProcessingState.FAILED, ProcessingState.SKIPPED):
|
|
timestamp_field = "completed_at"
|
|
|
|
if timestamp_field:
|
|
cursor.execute(f"""
|
|
UPDATE files
|
|
SET state = ?, error_message = ?, {timestamp_field} = CURRENT_TIMESTAMP,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE filepath = ?
|
|
""", (state.value, error_message, filepath))
|
|
else:
|
|
cursor.execute("""
|
|
UPDATE files
|
|
SET state = ?, error_message = ?, updated_at = CURRENT_TIMESTAMP
|
|
WHERE filepath = ?
|
|
""", (state.value, error_message, filepath))
|
|
|
|
self.conn.commit()
|
|
|
|
def update_processing_info(self, filepath: str, profile_name: str, encoder: str,
|
|
encoded_size: int, encode_time: float, fps: float):
|
|
"""Update processing information"""
|
|
with self._lock:
|
|
cursor = self.conn.cursor()
|
|
cursor.execute("""
|
|
UPDATE files
|
|
SET encoded_size = ?, profile_name = ?, encoder_used = ?,
|
|
encode_time_seconds = ?, fps = ?
|
|
WHERE filepath = ?
|
|
""", (encoded_size, profile_name, encoder, encode_time, fps, filepath))
|
|
self.conn.commit()
|
|
|
|
def get_files_by_state(self, state: ProcessingState, limit: Optional[int] = None) -> List[Dict]:
|
|
"""Get files in a given state"""
|
|
with self._lock:
|
|
cursor = self.conn.cursor()
|
|
query = "SELECT * FROM files WHERE state = ? ORDER BY created_at"
|
|
if limit:
|
|
query += f" LIMIT {limit}"
|
|
cursor.execute(query, (state.value,))
|
|
return [dict(row) for row in cursor.fetchall()]
|
|
|
|
def get_statistics(self) -> Dict:
|
|
"""Get processing statistics"""
|
|
with self._lock:
|
|
cursor = self.conn.cursor()
|
|
|
|
stats = {}
|
|
for state in ProcessingState:
|
|
cursor.execute("SELECT COUNT(*) FROM files WHERE state = ?", (state.value,))
|
|
stats[state.value] = cursor.fetchone()[0]
|
|
|
|
# Size statistics
|
|
cursor.execute("""
|
|
SELECT SUM(original_size), SUM(encoded_size), AVG(fps), AVG(encode_time_seconds)
|
|
FROM files WHERE state = ?
|
|
""", (ProcessingState.COMPLETED.value,))
|
|
row = cursor.fetchone()
|
|
stats['original_size_completed'] = row[0] or 0
|
|
stats['encoded_size_completed'] = row[1] or 0
|
|
stats['avg_fps'] = row[2] or 0
|
|
stats['avg_encode_time'] = row[3] or 0
|
|
|
|
# Encoder usage
|
|
cursor.execute("""
|
|
SELECT encoder_used, COUNT(*) as count
|
|
FROM files
|
|
WHERE state = ? AND encoder_used IS NOT NULL
|
|
GROUP BY encoder_used
|
|
""", (ProcessingState.COMPLETED.value,))
|
|
stats['encoder_usage'] = {row[0]: row[1] for row in cursor.fetchall()}
|
|
|
|
return stats
|
|
|
|
def reset_processing_files(self):
|
|
"""Reset files stuck in 'processing' state"""
|
|
with self._lock:
|
|
cursor = self.conn.cursor()
|
|
cursor.execute("""
|
|
UPDATE files
|
|
SET state = ?, updated_at = CURRENT_TIMESTAMP
|
|
WHERE state = ?
|
|
""", (ProcessingState.PENDING.value, ProcessingState.PROCESSING.value))
|
|
reset_count = cursor.rowcount
|
|
self.conn.commit()
|
|
return reset_count
|
|
|
|
def close(self):
|
|
"""Close database connection"""
|
|
if self.conn:
|
|
self.conn.close()
|
|
|
|
|
|
# =============================================================================
|
|
# MEDIA INSPECTION
|
|
# =============================================================================
|
|
|
|
class MediaInspector:
|
|
"""Enhanced media inspection for Phase 3"""
|
|
|
|
@staticmethod
|
|
def has_subtitles(filepath: Path) -> Tuple[bool, int]:
|
|
"""Check if file has subtitle streams"""
|
|
try:
|
|
result = subprocess.run(
|
|
['ffprobe', '-v', 'error', '-select_streams', 's',
|
|
'-show_entries', 'stream=codec_name',
|
|
'-of', 'default=noprint_wrappers=1:nokey=1', str(filepath)],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
subtitle_streams = [s for s in result.stdout.strip().split('\n') if s]
|
|
count = len(subtitle_streams)
|
|
return (count > 0, count)
|
|
|
|
return (False, 0)
|
|
except:
|
|
return (False, 0)
|
|
|
|
@staticmethod
|
|
def get_media_info(filepath: Path) -> Dict[str, Any]:
|
|
"""Get detailed media information"""
|
|
try:
|
|
result = subprocess.run(
|
|
['ffprobe', '-v', 'quiet', '-print_format', 'json',
|
|
'-show_format', '-show_streams', str(filepath)],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
return json.loads(result.stdout)
|
|
|
|
except Exception as e:
|
|
logging.error(f"Failed to get media info for {filepath}: {e}")
|
|
|
|
return {}
|
|
|
|
@staticmethod
|
|
def validate_file(filepath: Path) -> bool:
|
|
"""Validate media file"""
|
|
try:
|
|
result = subprocess.run(
|
|
['ffprobe', '-v', 'error', str(filepath)],
|
|
capture_output=True,
|
|
timeout=30
|
|
)
|
|
return result.returncode == 0
|
|
except:
|
|
return False
|
|
|
|
@staticmethod
|
|
def get_file_hash(filepath: Path, chunk_size: int = 8192) -> str:
|
|
"""
|
|
Calculate a fast hash of the file using first/last chunks + size.
|
|
This is faster than hashing the entire file for large videos.
|
|
"""
|
|
import hashlib
|
|
|
|
try:
|
|
file_size = filepath.stat().st_size
|
|
|
|
# For small files (<100MB), hash the entire file
|
|
if file_size < 100 * 1024 * 1024:
|
|
hasher = hashlib.sha256()
|
|
with open(filepath, 'rb') as f:
|
|
while chunk := f.read(chunk_size):
|
|
hasher.update(chunk)
|
|
return hasher.hexdigest()
|
|
|
|
# For large files, hash: size + first 64KB + middle 64KB + last 64KB
|
|
hasher = hashlib.sha256()
|
|
hasher.update(str(file_size).encode())
|
|
|
|
with open(filepath, 'rb') as f:
|
|
# First chunk
|
|
hasher.update(f.read(65536))
|
|
|
|
# Middle chunk
|
|
f.seek(file_size // 2)
|
|
hasher.update(f.read(65536))
|
|
|
|
# Last chunk
|
|
f.seek(-65536, 2)
|
|
hasher.update(f.read(65536))
|
|
|
|
return hasher.hexdigest()
|
|
except Exception as e:
|
|
logging.error(f"Failed to hash file {filepath}: {e}")
|
|
return None
|
|
|
|
|
|
# =============================================================================
|
|
# FILE PROCESSOR (Enhanced for Phase 3)
|
|
# =============================================================================
|
|
|
|
class FileProcessor:
|
|
"""Enhanced file processor with GPU support and progress tracking"""
|
|
|
|
def __init__(self, config: Config, db: StateDatabase, caps: EncoderCapabilities):
|
|
self.config = config
|
|
self.db = db
|
|
self.caps = caps
|
|
self.logger = logging.getLogger(f'FileProcessor-{id(self)}')
|
|
self._stop_flag = threading.Event()
|
|
|
|
def stop(self):
|
|
"""Signal processor to stop"""
|
|
self._stop_flag.set()
|
|
|
|
def process_file(self, filepath: Path, profile_name: Optional[str] = None) -> bool:
|
|
"""Process a single file with given profile"""
|
|
if self._stop_flag.is_set():
|
|
return False
|
|
|
|
relative_path = filepath.relative_to(self.config.movies_dir)
|
|
self.logger.info(f"Processing: {relative_path}")
|
|
|
|
# Get profile
|
|
profile = self.config.get_profile(profile_name)
|
|
|
|
# Select appropriate encoder (may fallback if GPU not available)
|
|
encoder = self._select_encoder(profile.encoder)
|
|
|
|
self.db.update_state(str(filepath), ProcessingState.PROCESSING)
|
|
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Create work path
|
|
work_path = self.config.work_dir / relative_path
|
|
|
|
# Change extension to .mp4 if encoding HEVC/AV1 and original is .m4v
|
|
# (.m4v doesn't support HEVC or AV1 codecs)
|
|
is_modern_codec = encoder in (
|
|
EncoderType.INTEL_QSV_H265, EncoderType.NVIDIA_NVENC_H265,
|
|
EncoderType.AMD_VAAPI_H265, EncoderType.CPU_X265,
|
|
EncoderType.INTEL_QSV_AV1, EncoderType.NVIDIA_NVENC_AV1,
|
|
EncoderType.AMD_VAAPI_AV1, EncoderType.CPU_AV1
|
|
)
|
|
if is_modern_codec and work_path.suffix.lower() == '.m4v':
|
|
work_path = work_path.with_suffix('.mp4')
|
|
relative_path = relative_path.with_suffix('.mp4')
|
|
codec_name = "HEVC" if "H265" in encoder.name else "AV1" if "AV1" in encoder.name else "modern codec"
|
|
self.logger.info(f"Changed output extension to .mp4 for {codec_name} compatibility")
|
|
|
|
work_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Re-encode with progress tracking
|
|
if not self._reencode(filepath, work_path, profile, encoder):
|
|
raise Exception("Encoding failed")
|
|
|
|
# Calculate stats
|
|
encode_time = time.time() - start_time
|
|
|
|
# Verify output
|
|
if not self._verify_output(work_path):
|
|
raise Exception("Output verification failed")
|
|
|
|
# Get encoding stats
|
|
encoded_size = work_path.stat().st_size
|
|
original_size = filepath.stat().st_size
|
|
|
|
# Calculate FPS (approximate)
|
|
media_info = MediaInspector.get_media_info(filepath)
|
|
duration = float(media_info.get('format', {}).get('duration', 0))
|
|
fps = duration / encode_time if duration and encode_time > 0 else 0
|
|
|
|
# Archive original
|
|
archive_path = self.config.archive_dir / relative_path.with_suffix(filepath.suffix)
|
|
archive_path.parent.mkdir(parents=True, exist_ok=True)
|
|
shutil.move(str(filepath), str(archive_path))
|
|
|
|
# Move encoded file into place (with potentially new extension)
|
|
final_path = self.config.movies_dir / relative_path
|
|
shutil.move(str(work_path), str(final_path))
|
|
|
|
# Update database
|
|
self.db.update_processing_info(
|
|
str(filepath),
|
|
profile.name,
|
|
encoder.name,
|
|
encoded_size,
|
|
encode_time,
|
|
fps
|
|
)
|
|
self.db.update_state(str(filepath), ProcessingState.COMPLETED)
|
|
|
|
# Log success
|
|
savings = original_size - encoded_size
|
|
savings_pct = (savings / original_size * 100) if original_size > 0 else 0
|
|
|
|
self.logger.info(
|
|
f"[OK] Completed: {relative_path} | "
|
|
f"Encoder: {encoder.name} | "
|
|
f"Time: {encode_time:.1f}s | "
|
|
f"FPS: {fps:.2f} | "
|
|
f"Saved: {savings_pct:.1f}%"
|
|
)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"[FAIL] Failed: {relative_path} - {e}")
|
|
self.db.update_state(str(filepath), ProcessingState.FAILED, str(e))
|
|
|
|
# Cleanup work file
|
|
if work_path.exists():
|
|
work_path.unlink()
|
|
|
|
return False
|
|
|
|
def _select_encoder(self, preferred: EncoderType) -> EncoderType:
|
|
"""Select encoder with fallback logic"""
|
|
# Check if preferred encoder is available
|
|
if self._is_encoder_available(preferred):
|
|
return preferred
|
|
|
|
# Try to find GPU alternative
|
|
if self.config.prefer_gpu:
|
|
for encoder in [EncoderType.NVIDIA_NVENC_H265, EncoderType.INTEL_QSV_H265,
|
|
EncoderType.AMD_VAAPI_H265]:
|
|
if self._is_encoder_available(encoder):
|
|
self.logger.info(f"Falling back to {encoder.name}")
|
|
return encoder
|
|
|
|
# Fallback to CPU
|
|
if self.config.fallback_to_cpu:
|
|
if self.caps.has_x265:
|
|
self.logger.info("Falling back to CPU H.265")
|
|
return EncoderType.CPU_X265
|
|
elif self.caps.has_x264:
|
|
self.logger.info("Falling back to CPU H.264")
|
|
return EncoderType.CPU_X264
|
|
|
|
raise RuntimeError(f"Encoder {preferred.name} not available and no fallback found")
|
|
|
|
def _is_encoder_available(self, encoder: EncoderType) -> bool:
|
|
"""Check if encoder is available"""
|
|
if encoder == EncoderType.CPU_X265:
|
|
return self.caps.has_x265
|
|
elif encoder == EncoderType.CPU_X264:
|
|
return self.caps.has_x264
|
|
elif encoder in (EncoderType.NVIDIA_NVENC_H265, EncoderType.NVIDIA_NVENC_H264):
|
|
return self.caps.has_nvenc
|
|
elif encoder in (EncoderType.INTEL_QSV_H265, EncoderType.INTEL_QSV_H264):
|
|
return self.caps.has_qsv
|
|
elif encoder in (EncoderType.AMD_VAAPI_H265, EncoderType.AMD_VAAPI_H264):
|
|
return self.caps.has_vaapi
|
|
return False
|
|
|
|
def _build_ffmpeg_command(self, input_path: Path, output_path: Path,
|
|
profile: EncodingProfile, encoder: EncoderType) -> List[str]:
|
|
"""Build FFmpeg command for given encoder and profile"""
|
|
cmd = ['ffmpeg', '-hide_banner', '-loglevel', 'warning', '-stats']
|
|
|
|
# Input
|
|
cmd.extend(['-i', str(input_path)])
|
|
|
|
# Hardware acceleration setup
|
|
if encoder in (EncoderType.NVIDIA_NVENC_H265, EncoderType.NVIDIA_NVENC_H264, EncoderType.NVIDIA_NVENC_AV1):
|
|
# NVENC
|
|
if encoder == EncoderType.NVIDIA_NVENC_H265:
|
|
video_codec = 'hevc_nvenc'
|
|
elif encoder == EncoderType.NVIDIA_NVENC_AV1:
|
|
video_codec = 'av1_nvenc'
|
|
else:
|
|
video_codec = 'h264_nvenc'
|
|
|
|
cmd.extend([
|
|
'-map', '0:v', '-map', '0:a',
|
|
'-c:v', video_codec,
|
|
'-preset', profile.preset,
|
|
'-cq', str(profile.quality),
|
|
'-c:a', profile.audio_codec,
|
|
'-sn'
|
|
])
|
|
|
|
elif encoder in (EncoderType.INTEL_QSV_H265, EncoderType.INTEL_QSV_H264, EncoderType.INTEL_QSV_AV1):
|
|
# QSV
|
|
if encoder == EncoderType.INTEL_QSV_H265:
|
|
video_codec = 'hevc_qsv'
|
|
elif encoder == EncoderType.INTEL_QSV_AV1:
|
|
video_codec = 'av1_qsv'
|
|
else:
|
|
video_codec = 'h264_qsv'
|
|
|
|
cmd.extend([
|
|
'-map', '0:v', '-map', '0:a',
|
|
'-c:v', video_codec,
|
|
'-preset', profile.preset,
|
|
'-global_quality', str(profile.quality),
|
|
'-c:a', profile.audio_codec,
|
|
'-sn'
|
|
])
|
|
|
|
elif encoder in (EncoderType.AMD_VAAPI_H265, EncoderType.AMD_VAAPI_H264, EncoderType.AMD_VAAPI_AV1):
|
|
# VAAPI
|
|
if encoder == EncoderType.AMD_VAAPI_H265:
|
|
video_codec = 'hevc_vaapi'
|
|
elif encoder == EncoderType.AMD_VAAPI_AV1:
|
|
video_codec = 'av1_vaapi'
|
|
else:
|
|
video_codec = 'h264_vaapi'
|
|
|
|
cmd.extend([
|
|
'-vaapi_device', '/dev/dri/renderD128',
|
|
'-map', '0:v', '-map', '0:a',
|
|
'-c:v', video_codec,
|
|
'-qp', str(profile.quality),
|
|
'-c:a', profile.audio_codec,
|
|
'-sn'
|
|
])
|
|
|
|
else:
|
|
# CPU encoding
|
|
if encoder == EncoderType.CPU_X265:
|
|
video_codec = 'libx265'
|
|
elif encoder == EncoderType.CPU_AV1:
|
|
video_codec = 'libsvtav1'
|
|
else:
|
|
video_codec = 'libx264'
|
|
|
|
cmd.extend([
|
|
'-map', '0:v', '-map', '0:a',
|
|
'-c:v', video_codec,
|
|
'-preset', profile.preset,
|
|
'-crf', str(profile.quality),
|
|
'-c:a', profile.audio_codec,
|
|
'-sn'
|
|
])
|
|
|
|
# Output options
|
|
cmd.extend([
|
|
'-movflags', '+faststart',
|
|
'-n',
|
|
str(output_path)
|
|
])
|
|
|
|
return cmd
|
|
|
|
def _reencode(self, input_path: Path, output_path: Path,
|
|
profile: EncodingProfile, encoder: EncoderType) -> bool:
|
|
"""Re-encode video file"""
|
|
self.logger.info(f"Encoding with {encoder.name}, profile: {profile.name}")
|
|
|
|
cmd = self._build_ffmpeg_command(input_path, output_path, profile, encoder)
|
|
|
|
try:
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
|
|
if result.returncode != 0:
|
|
self.logger.error(f"FFmpeg error: {result.stderr}")
|
|
return False
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Encoding exception: {e}")
|
|
return False
|
|
|
|
def _verify_output(self, output_path: Path) -> bool:
|
|
"""Verify encoded output"""
|
|
if not output_path.exists():
|
|
self.logger.error(f"Output file missing: {output_path}")
|
|
return False
|
|
|
|
size = output_path.stat().st_size
|
|
if size < 1024:
|
|
self.logger.error(f"Output file too small: {size} bytes")
|
|
return False
|
|
|
|
if not MediaInspector.validate_file(output_path):
|
|
self.logger.error("Output failed validation")
|
|
return False
|
|
|
|
# Verify subtitles were removed
|
|
has_subs, count = MediaInspector.has_subtitles(output_path)
|
|
if has_subs:
|
|
self.logger.error(f"Output still has {count} subtitle stream(s)")
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
# =============================================================================
|
|
# LIBRARY SCANNER
|
|
# =============================================================================
|
|
|
|
class LibraryScanner:
|
|
"""Library scanner (same as Phase 2)"""
|
|
|
|
def __init__(self, config: Config, db: StateDatabase):
|
|
self.config = config
|
|
self.db = db
|
|
self.logger = logging.getLogger('LibraryScanner')
|
|
|
|
def scan(self) -> int:
|
|
"""Scan movie directory"""
|
|
self.logger.info(f"Scanning: {self.config.movies_dir}")
|
|
|
|
# Build set of valid extensions (lowercase for case-insensitive comparison)
|
|
valid_extensions = {ext.lower() for ext in self.config.file_extensions}
|
|
files_found = 0
|
|
files_added = 0
|
|
|
|
# Iterate all files and filter by extension case-insensitively
|
|
for filepath in self.config.movies_dir.rglob('*'):
|
|
if not filepath.is_file():
|
|
continue
|
|
|
|
# Check extension case-insensitively
|
|
ext = filepath.suffix.lstrip('.').lower()
|
|
if ext not in valid_extensions:
|
|
continue
|
|
|
|
files_found += 1
|
|
|
|
# Check if already completed/skipped
|
|
existing = self.db.get_file(str(filepath))
|
|
if existing and existing['state'] in (ProcessingState.COMPLETED.value,
|
|
ProcessingState.SKIPPED.value):
|
|
continue
|
|
|
|
# Inspect file
|
|
has_subs, sub_count = MediaInspector.has_subtitles(filepath)
|
|
original_size = filepath.stat().st_size
|
|
relative_path = str(filepath.relative_to(self.config.movies_dir))
|
|
|
|
# Calculate file hash for duplicate detection
|
|
file_hash = MediaInspector.get_file_hash(filepath)
|
|
|
|
# Check for duplicates by content hash
|
|
if file_hash:
|
|
duplicates = self.db.find_duplicates_by_hash(file_hash)
|
|
# Check if any duplicate has been completed
|
|
completed_duplicate = next(
|
|
(d for d in duplicates if d['state'] == ProcessingState.COMPLETED.value),
|
|
None
|
|
)
|
|
|
|
if completed_duplicate:
|
|
self.logger.info(f"Skipping duplicate of already encoded file: {filepath.name}")
|
|
self.logger.info(f" Original: {completed_duplicate['relative_path']}")
|
|
# Mark this file as skipped
|
|
self.db.add_file(
|
|
str(filepath),
|
|
relative_path,
|
|
False,
|
|
0,
|
|
original_size,
|
|
None, None, None, None, None, None, None, None, file_hash
|
|
)
|
|
self.db.update_state(
|
|
str(filepath),
|
|
ProcessingState.SKIPPED,
|
|
f"Duplicate of: {completed_duplicate['relative_path']}"
|
|
)
|
|
continue
|
|
|
|
# Get detailed media info
|
|
media_info = MediaInspector.get_media_info(filepath)
|
|
video_codec = None
|
|
audio_codec = None
|
|
audio_channels = None
|
|
width = None
|
|
height = None
|
|
duration = None
|
|
bitrate = None
|
|
container_format = None
|
|
|
|
if media_info and 'streams' in media_info:
|
|
# Extract video stream info
|
|
for stream in media_info['streams']:
|
|
if stream.get('codec_type') == 'video' and not video_codec:
|
|
video_codec = stream.get('codec_name')
|
|
width = stream.get('width')
|
|
height = stream.get('height')
|
|
elif stream.get('codec_type') == 'audio' and not audio_codec:
|
|
audio_codec = stream.get('codec_name')
|
|
audio_channels = stream.get('channels')
|
|
|
|
# Extract format info
|
|
if 'format' in media_info:
|
|
duration = float(media_info['format'].get('duration', 0))
|
|
bitrate = int(media_info['format'].get('bit_rate', 0))
|
|
container_format = media_info['format'].get('format_name', '').split(',')[0] # Get first format
|
|
|
|
self.db.add_file(
|
|
str(filepath),
|
|
relative_path,
|
|
has_subs,
|
|
sub_count,
|
|
original_size,
|
|
video_codec,
|
|
audio_codec,
|
|
audio_channels,
|
|
width,
|
|
height,
|
|
duration,
|
|
bitrate,
|
|
container_format,
|
|
file_hash
|
|
)
|
|
|
|
files_added += 1
|
|
|
|
if files_found % 10 == 0:
|
|
self.logger.info(f"Scanned {files_found} files...")
|
|
|
|
self.logger.info(f"Scan complete: {files_found} files found, {files_added} added/updated")
|
|
return files_found
|
|
|
|
|
|
# =============================================================================
|
|
# PARALLEL PROCESSING COORDINATOR
|
|
# =============================================================================
|
|
|
|
class ParallelCoordinator:
|
|
"""Coordinates parallel processing with worker pools"""
|
|
|
|
def __init__(self, config: Config, db: StateDatabase, caps: EncoderCapabilities):
|
|
self.config = config
|
|
self.db = db
|
|
self.caps = caps
|
|
self.logger = logging.getLogger('ParallelCoordinator')
|
|
self._stop_event = threading.Event()
|
|
self._active_workers = 0
|
|
self._workers_lock = threading.Lock()
|
|
|
|
def process_parallel(self, files: List[Dict], profile_name: Optional[str] = None):
|
|
"""Process files in parallel"""
|
|
total_files = len(files)
|
|
self.logger.info(f"Starting parallel processing of {total_files} files")
|
|
self.logger.info(f"Max workers: {self.config.max_workers}")
|
|
|
|
completed = 0
|
|
failed = 0
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=self.config.max_workers) as executor:
|
|
# Submit all tasks
|
|
future_to_file = {
|
|
executor.submit(self._process_file_wrapper, Path(f['filepath']), profile_name): f
|
|
for f in files
|
|
}
|
|
|
|
# Process results as they complete
|
|
for future in concurrent.futures.as_completed(future_to_file):
|
|
if self._stop_event.is_set():
|
|
self.logger.info("Stop signal received, cancelling pending tasks")
|
|
break
|
|
|
|
file_record = future_to_file[future]
|
|
|
|
try:
|
|
success = future.result()
|
|
if success:
|
|
completed += 1
|
|
else:
|
|
failed += 1
|
|
|
|
# Progress update
|
|
progress = completed + failed
|
|
pct = (progress / total_files) * 100
|
|
self.logger.info(f"Progress: {progress}/{total_files} ({pct:.1f}%) - "
|
|
f"Success: {completed}, Failed: {failed}")
|
|
|
|
except Exception as e:
|
|
failed += 1
|
|
self.logger.error(f"Task exception: {e}")
|
|
|
|
self.logger.info(f"Parallel processing complete: {completed} success, {failed} failed")
|
|
|
|
def _process_file_wrapper(self, filepath: Path, profile_name: Optional[str]) -> bool:
|
|
"""Wrapper for processing file (for thread pool)"""
|
|
processor = FileProcessor(self.config, self.db, self.caps)
|
|
|
|
with self._workers_lock:
|
|
self._active_workers += 1
|
|
|
|
try:
|
|
return processor.process_file(filepath, profile_name)
|
|
finally:
|
|
with self._workers_lock:
|
|
self._active_workers -= 1
|
|
|
|
def stop(self):
|
|
"""Signal to stop processing"""
|
|
self._stop_event.set()
|
|
|
|
|
|
# =============================================================================
|
|
# MAIN APPLICATION
|
|
# =============================================================================
|
|
|
|
class ReencodeApplication:
|
|
"""Main application for Phase 3"""
|
|
|
|
def __init__(self, config: Config):
|
|
self.config = config
|
|
self.db = StateDatabase(config.state_db)
|
|
self.caps = EncoderDetector.detect_capabilities() if config.auto_detect_encoders else EncoderCapabilities()
|
|
self.scanner = LibraryScanner(config, self.db)
|
|
self.coordinator = ParallelCoordinator(config, self.db, self.caps)
|
|
self.logger = logging.getLogger('ReencodeApp')
|
|
|
|
# Signal handling
|
|
signal.signal(signal.SIGINT, self._signal_handler)
|
|
signal.signal(signal.SIGTERM, self._signal_handler)
|
|
|
|
def _signal_handler(self, signum, frame):
|
|
"""Handle interrupt signals"""
|
|
self.logger.info("Interrupt received, stopping gracefully...")
|
|
self.coordinator.stop()
|
|
|
|
def setup_logging(self):
|
|
"""Configure logging"""
|
|
log_file = self.config.log_dir / 'reencode.log'
|
|
|
|
from logging.handlers import RotatingFileHandler
|
|
file_handler = RotatingFileHandler(
|
|
log_file,
|
|
maxBytes=10*1024*1024,
|
|
backupCount=5
|
|
)
|
|
file_handler.setLevel(logging.DEBUG)
|
|
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setLevel(logging.INFO)
|
|
|
|
formatter = logging.Formatter(
|
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
)
|
|
file_handler.setFormatter(formatter)
|
|
console_handler.setFormatter(formatter)
|
|
|
|
root_logger = logging.getLogger()
|
|
root_logger.setLevel(logging.DEBUG)
|
|
root_logger.addHandler(file_handler)
|
|
root_logger.addHandler(console_handler)
|
|
|
|
def print_encoder_capabilities(self):
|
|
"""Print detected encoder capabilities"""
|
|
# Use ASCII characters for Windows compatibility
|
|
yes = '[YES]'
|
|
no = '[ NO]'
|
|
|
|
print("\n" + "="*60)
|
|
print("ENCODER CAPABILITIES")
|
|
print("="*60)
|
|
print(f"CPU Encoders:")
|
|
print(f" H.265 (libx265): {yes if self.caps.has_x265 else no}")
|
|
print(f" H.264 (libx264): {yes if self.caps.has_x264 else no}")
|
|
print(f" AV1 (SVT-AV1): {yes if self.caps.has_av1 else no}")
|
|
print(f"\nGPU Encoders:")
|
|
print(f" NVIDIA NVENC: {yes if self.caps.has_nvenc else no}")
|
|
if self.caps.has_nvenc and self.caps.nvenc_devices:
|
|
print(f" Devices: {', '.join(map(str, self.caps.nvenc_devices))}")
|
|
print(f" AV1: {yes if self.caps.has_nvenc_av1 else no}")
|
|
print(f" Intel QSV: {yes if self.caps.has_qsv else no}")
|
|
print(f" AV1: {yes if self.caps.has_qsv_av1 else no}")
|
|
print(f" AMD VAAPI: {yes if self.caps.has_vaapi else no}")
|
|
print(f" AV1: {yes if self.caps.has_vaapi_av1 else no}")
|
|
print("="*60 + "\n")
|
|
|
|
def print_statistics(self):
|
|
"""Print processing statistics"""
|
|
stats = self.db.get_statistics()
|
|
|
|
print("\n" + "="*60)
|
|
print("PROCESSING STATISTICS")
|
|
print("="*60)
|
|
print(f"Pending: {stats['pending']:5d}")
|
|
print(f"Processing: {stats['processing']:5d}")
|
|
print(f"Completed: {stats['completed']:5d}")
|
|
print(f"Failed: {stats['failed']:5d}")
|
|
print(f"Skipped: {stats['skipped']:5d} (no subtitles)")
|
|
print("-"*60)
|
|
print(f"Total: {sum(v for k, v in stats.items() if k in ['pending', 'processing', 'completed', 'failed', 'skipped']):5d}")
|
|
|
|
if stats['completed'] > 0:
|
|
orig_size = stats['original_size_completed']
|
|
enc_size = stats['encoded_size_completed']
|
|
|
|
print(f"\nOriginal size: {self._human_size(orig_size)}")
|
|
print(f"Encoded size: {self._human_size(enc_size)}")
|
|
|
|
if orig_size > 0:
|
|
savings = orig_size - enc_size
|
|
percent = (savings / orig_size) * 100
|
|
print(f"Space saved: {self._human_size(savings)} ({percent:.1f}%)")
|
|
|
|
if stats['avg_fps'] > 0:
|
|
print(f"\nAverage FPS: {stats['avg_fps']:.2f}")
|
|
if stats['avg_encode_time'] > 0:
|
|
print(f"Avg encode time: {stats['avg_encode_time']:.1f}s")
|
|
|
|
if stats.get('encoder_usage'):
|
|
print(f"\nEncoder Usage:")
|
|
for encoder, count in stats['encoder_usage'].items():
|
|
print(f" {encoder}: {count}")
|
|
|
|
print("="*60 + "\n")
|
|
|
|
def _human_size(self, size: int) -> str:
|
|
"""Convert bytes to human readable format"""
|
|
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
|
|
if size < 1024.0:
|
|
return f"{size:.2f} {unit}"
|
|
size /= 1024.0
|
|
return f"{size:.2f} PB"
|
|
|
|
def cleanup_stale_work_files(self):
|
|
"""Remove stale work files"""
|
|
if not self.config.cleanup_stale_work:
|
|
return
|
|
|
|
self.logger.info("Cleaning up stale work files...")
|
|
count = 0
|
|
for filepath in self.config.work_dir.rglob('*'):
|
|
if filepath.is_file():
|
|
filepath.unlink()
|
|
count += 1
|
|
|
|
if count > 0:
|
|
self.logger.info(f"Removed {count} stale work file(s)")
|
|
|
|
def run(self, scan_only: bool = False, no_scan: bool = False, profile_name: Optional[str] = None):
|
|
"""Main execution"""
|
|
self.logger.info("="*60)
|
|
self.logger.info("ENCODERPRO - PHASE 3")
|
|
self.logger.info(f"Version: {__version__}")
|
|
self.logger.info("="*60)
|
|
|
|
# Show capabilities
|
|
if self.config.auto_detect_encoders:
|
|
self.print_encoder_capabilities()
|
|
|
|
# Reset stuck files
|
|
reset_count = self.db.reset_processing_files()
|
|
if reset_count > 0:
|
|
self.logger.info(f"Reset {reset_count} stuck file(s)")
|
|
|
|
# Cleanup
|
|
self.cleanup_stale_work_files()
|
|
|
|
# Scan (unless --no-scan flag is set)
|
|
if not no_scan:
|
|
self.scanner.scan()
|
|
else:
|
|
self.logger.info("Skipping library scan (--no-scan mode)")
|
|
|
|
# Statistics
|
|
self.print_statistics()
|
|
|
|
if scan_only:
|
|
self.logger.info("Scan-only mode: exiting")
|
|
return
|
|
|
|
# Get pending files
|
|
pending_files = self.db.get_files_by_state(ProcessingState.PENDING)
|
|
|
|
if not pending_files:
|
|
self.logger.info("No files to process!")
|
|
return
|
|
|
|
self.logger.info(f"Processing {len(pending_files)} file(s)...")
|
|
|
|
# Process with parallel coordinator
|
|
if self.config.max_workers > 1:
|
|
self.coordinator.process_parallel(pending_files, profile_name)
|
|
else:
|
|
# Single-threaded fallback
|
|
processor = FileProcessor(self.config, self.db, self.caps)
|
|
for file_record in pending_files:
|
|
processor.process_file(Path(file_record['filepath']), profile_name)
|
|
|
|
# Final statistics
|
|
self.logger.info("="*60)
|
|
self.logger.info("PROCESSING COMPLETE")
|
|
self.logger.info("="*60)
|
|
self.print_statistics()
|
|
|
|
def cleanup(self):
|
|
"""Cleanup resources"""
|
|
self.db.close()
|
|
|
|
|
|
# =============================================================================
|
|
# CONFIGURATION LOADING
|
|
# =============================================================================
|
|
|
|
def load_config(config_file: Optional[Path]) -> Config:
|
|
"""Load configuration from file"""
|
|
config_dict = {}
|
|
|
|
if config_file and config_file.exists():
|
|
try:
|
|
import yaml
|
|
with open(config_file, 'r') as f:
|
|
config_dict = yaml.safe_load(f) or {}
|
|
logging.info(f"Loaded configuration from: {config_file}")
|
|
except ImportError:
|
|
logging.warning("PyYAML not installed, trying JSON")
|
|
try:
|
|
with open(config_file, 'r') as f:
|
|
config_dict = json.load(f)
|
|
logging.info(f"Loaded configuration from: {config_file}")
|
|
except Exception as e:
|
|
logging.error(f"Failed to load config: {e}")
|
|
|
|
return Config(config_dict)
|
|
|
|
|
|
# =============================================================================
|
|
# ENTRY POINT
|
|
# =============================================================================
|
|
|
|
def main():
|
|
"""Main entry point"""
|
|
parser = argparse.ArgumentParser(
|
|
description='encoderPro - Phase 3',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter
|
|
)
|
|
|
|
parser.add_argument('-c', '--config', type=Path, help='Configuration file (YAML or JSON)')
|
|
parser.add_argument('--scan-only', action='store_true', help='Only scan, do not process')
|
|
parser.add_argument('--no-scan', action='store_true', help='Skip library scan (for dashboard use)')
|
|
parser.add_argument('--stats', action='store_true', help='Show statistics and exit')
|
|
parser.add_argument('-p', '--profile', help='Encoding profile to use')
|
|
parser.add_argument('-v', '--version', action='version', version=f'%(prog)s {__version__}')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Load configuration
|
|
config = load_config(args.config)
|
|
|
|
# Create application
|
|
app = ReencodeApplication(config)
|
|
app.setup_logging()
|
|
|
|
try:
|
|
if args.stats:
|
|
app.print_statistics()
|
|
else:
|
|
app.run(scan_only=args.scan_only, no_scan=args.no_scan, profile_name=args.profile)
|
|
|
|
except KeyboardInterrupt:
|
|
app.logger.info("Interrupted by user")
|
|
|
|
except Exception as e:
|
|
app.logger.error(f"Fatal error: {e}", exc_info=True)
|
|
return 1
|
|
|
|
finally:
|
|
app.cleanup()
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|