Files
encoderPro/quality_checker.py
2026-01-24 17:43:28 -05:00

408 lines
13 KiB
Python

#!/usr/bin/env python3
"""
Quality Checker Module for encoderPro
======================================
Detects source video quality and warns if encoding will degrade quality.
"""
import json
import logging
import re
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Dict, Tuple
@dataclass
class VideoQuality:
"""Video quality metrics"""
bitrate: int # bits per second
resolution: Tuple[int, int] # (width, height)
codec: str
fps: float
is_hdr: bool
quality_score: float # 0-100, estimated quality
def to_dict(self) -> Dict:
return {
'bitrate': self.bitrate,
'resolution': f"{self.resolution[0]}x{self.resolution[1]}",
'codec': self.codec,
'fps': self.fps,
'is_hdr': self.is_hdr,
'quality_score': self.quality_score
}
class QualityChecker:
"""Analyzes video quality before encoding"""
def __init__(self, logger: Optional[logging.Logger] = None):
self.logger = logger or logging.getLogger(__name__)
def get_video_info(self, filepath: Path) -> Optional[Dict]:
"""Extract detailed video information using ffprobe"""
try:
cmd = [
'ffprobe',
'-v', 'quiet',
'-print_format', 'json',
'-show_format',
'-show_streams',
'-select_streams', 'v:0',
str(filepath)
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
self.logger.error(f"ffprobe failed for {filepath}: {result.stderr}")
return None
data = json.loads(result.stdout)
return data
except subprocess.TimeoutExpired:
self.logger.error(f"ffprobe timeout for {filepath}")
return None
except json.JSONDecodeError as e:
self.logger.error(f"Failed to parse ffprobe output: {e}")
return None
except Exception as e:
self.logger.error(f"Error getting video info: {e}")
return None
def analyze_quality(self, filepath: Path) -> Optional[VideoQuality]:
"""Analyze video quality metrics"""
info = self.get_video_info(filepath)
if not info:
return None
try:
# Get video stream
video_stream = None
if 'streams' in info and len(info['streams']) > 0:
video_stream = info['streams'][0]
if not video_stream:
self.logger.error(f"No video stream found in {filepath}")
return None
# Extract metrics
width = int(video_stream.get('width', 0))
height = int(video_stream.get('height', 0))
codec = video_stream.get('codec_name', 'unknown')
# Get bitrate
bitrate = 0
if 'bit_rate' in video_stream:
bitrate = int(video_stream['bit_rate'])
elif 'format' in info and 'bit_rate' in info['format']:
bitrate = int(info['format']['bit_rate'])
# Get FPS
fps_str = video_stream.get('r_frame_rate', '0/1')
try:
num, den = fps_str.split('/')
fps = float(num) / float(den) if float(den) != 0 else 0
except:
fps = 0
# Detect HDR
is_hdr = self._detect_hdr(video_stream)
# Calculate quality score
quality_score = self._calculate_quality_score(
bitrate, width, height, codec, fps
)
return VideoQuality(
bitrate=bitrate,
resolution=(width, height),
codec=codec,
fps=fps,
is_hdr=is_hdr,
quality_score=quality_score
)
except Exception as e:
self.logger.error(f"Error analyzing quality: {e}")
return None
def _detect_hdr(self, stream: Dict) -> bool:
"""Detect if video has HDR"""
# Check for HDR transfer characteristics
transfer = stream.get('color_transfer', '').lower()
if 'smpte2084' in transfer or 'arib-std-b67' in transfer:
return True
# Check for HDR color primaries
primaries = stream.get('color_primaries', '').lower()
if 'bt2020' in primaries:
return True
# Check tags
tags = stream.get('tags', {})
if any('hdr' in str(v).lower() for v in tags.values()):
return True
return False
def _calculate_quality_score(self, bitrate: int, width: int, height: int,
codec: str, fps: float) -> float:
"""
Calculate a quality score (0-100) based on video metrics
This is a heuristic score based on:
- Bitrate per pixel
- Resolution
- Codec efficiency
- Frame rate
"""
if width == 0 or height == 0:
return 0
pixels = width * height
# Bitrate per pixel per frame
if fps > 0:
bits_per_pixel = bitrate / (pixels * fps)
else:
bits_per_pixel = bitrate / pixels
# Codec efficiency multiplier
codec_multiplier = {
'hevc': 1.5, # H.265 is more efficient
'h265': 1.5,
'av1': 1.8, # AV1 is very efficient
'h264': 1.0, # Baseline
'avc': 1.0,
'mpeg2': 0.5, # Less efficient
'mpeg4': 0.7,
}.get(codec.lower(), 1.0)
# Effective bits per pixel (adjusted for codec)
effective_bpp = bits_per_pixel * codec_multiplier
# Quality score based on bits per pixel
# Typical ranges:
# < 0.1 bpp: Poor quality (score < 50)
# 0.1-0.2 bpp: Acceptable (score 50-70)
# 0.2-0.3 bpp: Good (score 70-85)
# 0.3-0.5 bpp: Excellent (score 85-95)
# > 0.5 bpp: Near lossless (score 95-100)
if effective_bpp >= 0.5:
score = 95 + min(effective_bpp * 10, 5)
elif effective_bpp >= 0.3:
score = 85 + (effective_bpp - 0.3) * 50
elif effective_bpp >= 0.2:
score = 70 + (effective_bpp - 0.2) * 150
elif effective_bpp >= 0.1:
score = 50 + (effective_bpp - 0.1) * 200
else:
score = min(effective_bpp * 500, 50)
return min(round(score, 1), 100)
def will_degrade_quality(self, source_quality: VideoQuality,
target_bitrate: int,
target_codec: str,
threshold: float = 10.0) -> Tuple[bool, str]:
"""
Check if encoding will significantly degrade quality
Args:
source_quality: Source video quality metrics
target_bitrate: Target encoding bitrate
target_codec: Target codec
threshold: Quality score drop threshold (default 10 points)
Returns:
(will_degrade: bool, reason: str)
"""
# Calculate target quality score
target_quality_score = self._calculate_quality_score(
target_bitrate,
source_quality.resolution[0],
source_quality.resolution[1],
target_codec,
source_quality.fps
)
quality_drop = source_quality.quality_score - target_quality_score
if quality_drop > threshold:
reason = (
f"Encoding will degrade quality by {quality_drop:.1f} points "
f"(from {source_quality.quality_score:.1f} to {target_quality_score:.1f}). "
f"Source bitrate: {source_quality.bitrate/1000000:.1f} Mbps, "
f"Target bitrate: {target_bitrate/1000000:.1f} Mbps"
)
return True, reason
return False, ""
def estimate_target_bitrate(self, profile: Dict, resolution: Tuple[int, int],
fps: float) -> int:
"""
Estimate target bitrate based on profile settings
This uses common CRF-to-bitrate approximations for different codecs
"""
width, height = resolution
pixels = width * height
crf = profile.get('quality', 23)
codec = profile.get('encoder', 'cpu_x265')
# Determine codec type
if 'x265' in codec or 'h265' in codec or 'hevc' in codec:
codec_type = 'h265'
elif 'av1' in codec:
codec_type = 'av1'
else:
codec_type = 'h264'
# Base bitrate estimation (Mbps per megapixel)
# These are approximations for CRF encoding
base_bitrates = {
'h264': {
18: 0.20, # Near lossless
21: 0.15,
23: 0.10, # Good quality
26: 0.06,
28: 0.04
},
'h265': {
18: 0.10, # H.265 is ~50% more efficient
21: 0.075,
23: 0.05,
26: 0.03,
28: 0.02
},
'av1': {
18: 0.07, # AV1 is ~70% more efficient
21: 0.05,
23: 0.035,
26: 0.02,
28: 0.015
}
}
# Get closest CRF value
crf_values = sorted(base_bitrates[codec_type].keys())
closest_crf = min(crf_values, key=lambda x: abs(x - crf))
mbps_per_megapixel = base_bitrates[codec_type][closest_crf]
# Calculate target bitrate
megapixels = pixels / 1000000
target_mbps = mbps_per_megapixel * megapixels * (fps / 24) # Normalize to 24fps
target_bitrate = int(target_mbps * 1000000) # Convert to bps
return target_bitrate
def check_before_encode(self, filepath: Path, profile: Dict,
warn_threshold: float = 10.0,
error_threshold: float = 20.0) -> Dict:
"""
Comprehensive quality check before encoding
Returns:
{
'ok': bool,
'warning': bool,
'error': bool,
'message': str,
'source_quality': VideoQuality,
'estimated_target_bitrate': int,
'quality_drop': float
}
"""
result = {
'ok': True,
'warning': False,
'error': False,
'message': '',
'source_quality': None,
'estimated_target_bitrate': 0,
'quality_drop': 0
}
# Analyze source quality
source_quality = self.analyze_quality(filepath)
if not source_quality:
result['ok'] = False
result['error'] = True
result['message'] = "Failed to analyze source video quality"
return result
result['source_quality'] = source_quality
# Estimate target bitrate
target_bitrate = self.estimate_target_bitrate(
profile,
source_quality.resolution,
source_quality.fps
)
result['estimated_target_bitrate'] = target_bitrate
# Check for quality degradation
target_codec = profile.get('encoder', 'cpu_x265')
will_degrade, reason = self.will_degrade_quality(
source_quality,
target_bitrate,
target_codec,
warn_threshold
)
if will_degrade:
quality_drop = source_quality.quality_score - self._calculate_quality_score(
target_bitrate,
source_quality.resolution[0],
source_quality.resolution[1],
target_codec,
source_quality.fps
)
result['quality_drop'] = quality_drop
if quality_drop >= error_threshold:
result['ok'] = False
result['error'] = True
result['message'] = f"⚠️ CRITICAL: {reason}"
else:
result['warning'] = True
result['message'] = f"⚠️ WARNING: {reason}"
# Check for HDR content
if source_quality.is_hdr and profile.get('hdr_handling') != 'preserve':
result['warning'] = True
result['message'] += "\n⚠️ HDR content detected but HDR handling is not set to 'preserve'"
return result
if __name__ == '__main__':
# Test module
import sys
logging.basicConfig(level=logging.INFO)
checker = QualityChecker()
if len(sys.argv) > 1:
filepath = Path(sys.argv[1])
quality = checker.analyze_quality(filepath)
if quality:
print(f"\nVideo Quality Analysis:")
print(f" Resolution: {quality.resolution[0]}x{quality.resolution[1]}")
print(f" Bitrate: {quality.bitrate/1000000:.2f} Mbps")
print(f" Codec: {quality.codec}")
print(f" FPS: {quality.fps:.2f}")
print(f" HDR: {quality.is_hdr}")
print(f" Quality Score: {quality.quality_score}/100")
else:
print("Usage: python quality_checker.py <video_file>")