408 lines
13 KiB
Python
408 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Quality Checker Module for encoderPro
|
|
======================================
|
|
Detects source video quality and warns if encoding will degrade quality.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
import subprocess
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, Tuple
|
|
|
|
|
|
@dataclass
|
|
class VideoQuality:
|
|
"""Video quality metrics"""
|
|
bitrate: int # bits per second
|
|
resolution: Tuple[int, int] # (width, height)
|
|
codec: str
|
|
fps: float
|
|
is_hdr: bool
|
|
quality_score: float # 0-100, estimated quality
|
|
|
|
def to_dict(self) -> Dict:
|
|
return {
|
|
'bitrate': self.bitrate,
|
|
'resolution': f"{self.resolution[0]}x{self.resolution[1]}",
|
|
'codec': self.codec,
|
|
'fps': self.fps,
|
|
'is_hdr': self.is_hdr,
|
|
'quality_score': self.quality_score
|
|
}
|
|
|
|
|
|
class QualityChecker:
|
|
"""Analyzes video quality before encoding"""
|
|
|
|
def __init__(self, logger: Optional[logging.Logger] = None):
|
|
self.logger = logger or logging.getLogger(__name__)
|
|
|
|
def get_video_info(self, filepath: Path) -> Optional[Dict]:
|
|
"""Extract detailed video information using ffprobe"""
|
|
try:
|
|
cmd = [
|
|
'ffprobe',
|
|
'-v', 'quiet',
|
|
'-print_format', 'json',
|
|
'-show_format',
|
|
'-show_streams',
|
|
'-select_streams', 'v:0',
|
|
str(filepath)
|
|
]
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
|
|
|
if result.returncode != 0:
|
|
self.logger.error(f"ffprobe failed for {filepath}: {result.stderr}")
|
|
return None
|
|
|
|
data = json.loads(result.stdout)
|
|
return data
|
|
|
|
except subprocess.TimeoutExpired:
|
|
self.logger.error(f"ffprobe timeout for {filepath}")
|
|
return None
|
|
except json.JSONDecodeError as e:
|
|
self.logger.error(f"Failed to parse ffprobe output: {e}")
|
|
return None
|
|
except Exception as e:
|
|
self.logger.error(f"Error getting video info: {e}")
|
|
return None
|
|
|
|
def analyze_quality(self, filepath: Path) -> Optional[VideoQuality]:
|
|
"""Analyze video quality metrics"""
|
|
info = self.get_video_info(filepath)
|
|
if not info:
|
|
return None
|
|
|
|
try:
|
|
# Get video stream
|
|
video_stream = None
|
|
if 'streams' in info and len(info['streams']) > 0:
|
|
video_stream = info['streams'][0]
|
|
|
|
if not video_stream:
|
|
self.logger.error(f"No video stream found in {filepath}")
|
|
return None
|
|
|
|
# Extract metrics
|
|
width = int(video_stream.get('width', 0))
|
|
height = int(video_stream.get('height', 0))
|
|
codec = video_stream.get('codec_name', 'unknown')
|
|
|
|
# Get bitrate
|
|
bitrate = 0
|
|
if 'bit_rate' in video_stream:
|
|
bitrate = int(video_stream['bit_rate'])
|
|
elif 'format' in info and 'bit_rate' in info['format']:
|
|
bitrate = int(info['format']['bit_rate'])
|
|
|
|
# Get FPS
|
|
fps_str = video_stream.get('r_frame_rate', '0/1')
|
|
try:
|
|
num, den = fps_str.split('/')
|
|
fps = float(num) / float(den) if float(den) != 0 else 0
|
|
except:
|
|
fps = 0
|
|
|
|
# Detect HDR
|
|
is_hdr = self._detect_hdr(video_stream)
|
|
|
|
# Calculate quality score
|
|
quality_score = self._calculate_quality_score(
|
|
bitrate, width, height, codec, fps
|
|
)
|
|
|
|
return VideoQuality(
|
|
bitrate=bitrate,
|
|
resolution=(width, height),
|
|
codec=codec,
|
|
fps=fps,
|
|
is_hdr=is_hdr,
|
|
quality_score=quality_score
|
|
)
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error analyzing quality: {e}")
|
|
return None
|
|
|
|
def _detect_hdr(self, stream: Dict) -> bool:
|
|
"""Detect if video has HDR"""
|
|
# Check for HDR transfer characteristics
|
|
transfer = stream.get('color_transfer', '').lower()
|
|
if 'smpte2084' in transfer or 'arib-std-b67' in transfer:
|
|
return True
|
|
|
|
# Check for HDR color primaries
|
|
primaries = stream.get('color_primaries', '').lower()
|
|
if 'bt2020' in primaries:
|
|
return True
|
|
|
|
# Check tags
|
|
tags = stream.get('tags', {})
|
|
if any('hdr' in str(v).lower() for v in tags.values()):
|
|
return True
|
|
|
|
return False
|
|
|
|
def _calculate_quality_score(self, bitrate: int, width: int, height: int,
|
|
codec: str, fps: float) -> float:
|
|
"""
|
|
Calculate a quality score (0-100) based on video metrics
|
|
|
|
This is a heuristic score based on:
|
|
- Bitrate per pixel
|
|
- Resolution
|
|
- Codec efficiency
|
|
- Frame rate
|
|
"""
|
|
if width == 0 or height == 0:
|
|
return 0
|
|
|
|
pixels = width * height
|
|
|
|
# Bitrate per pixel per frame
|
|
if fps > 0:
|
|
bits_per_pixel = bitrate / (pixels * fps)
|
|
else:
|
|
bits_per_pixel = bitrate / pixels
|
|
|
|
# Codec efficiency multiplier
|
|
codec_multiplier = {
|
|
'hevc': 1.5, # H.265 is more efficient
|
|
'h265': 1.5,
|
|
'av1': 1.8, # AV1 is very efficient
|
|
'h264': 1.0, # Baseline
|
|
'avc': 1.0,
|
|
'mpeg2': 0.5, # Less efficient
|
|
'mpeg4': 0.7,
|
|
}.get(codec.lower(), 1.0)
|
|
|
|
# Effective bits per pixel (adjusted for codec)
|
|
effective_bpp = bits_per_pixel * codec_multiplier
|
|
|
|
# Quality score based on bits per pixel
|
|
# Typical ranges:
|
|
# < 0.1 bpp: Poor quality (score < 50)
|
|
# 0.1-0.2 bpp: Acceptable (score 50-70)
|
|
# 0.2-0.3 bpp: Good (score 70-85)
|
|
# 0.3-0.5 bpp: Excellent (score 85-95)
|
|
# > 0.5 bpp: Near lossless (score 95-100)
|
|
|
|
if effective_bpp >= 0.5:
|
|
score = 95 + min(effective_bpp * 10, 5)
|
|
elif effective_bpp >= 0.3:
|
|
score = 85 + (effective_bpp - 0.3) * 50
|
|
elif effective_bpp >= 0.2:
|
|
score = 70 + (effective_bpp - 0.2) * 150
|
|
elif effective_bpp >= 0.1:
|
|
score = 50 + (effective_bpp - 0.1) * 200
|
|
else:
|
|
score = min(effective_bpp * 500, 50)
|
|
|
|
return min(round(score, 1), 100)
|
|
|
|
def will_degrade_quality(self, source_quality: VideoQuality,
|
|
target_bitrate: int,
|
|
target_codec: str,
|
|
threshold: float = 10.0) -> Tuple[bool, str]:
|
|
"""
|
|
Check if encoding will significantly degrade quality
|
|
|
|
Args:
|
|
source_quality: Source video quality metrics
|
|
target_bitrate: Target encoding bitrate
|
|
target_codec: Target codec
|
|
threshold: Quality score drop threshold (default 10 points)
|
|
|
|
Returns:
|
|
(will_degrade: bool, reason: str)
|
|
"""
|
|
# Calculate target quality score
|
|
target_quality_score = self._calculate_quality_score(
|
|
target_bitrate,
|
|
source_quality.resolution[0],
|
|
source_quality.resolution[1],
|
|
target_codec,
|
|
source_quality.fps
|
|
)
|
|
|
|
quality_drop = source_quality.quality_score - target_quality_score
|
|
|
|
if quality_drop > threshold:
|
|
reason = (
|
|
f"Encoding will degrade quality by {quality_drop:.1f} points "
|
|
f"(from {source_quality.quality_score:.1f} to {target_quality_score:.1f}). "
|
|
f"Source bitrate: {source_quality.bitrate/1000000:.1f} Mbps, "
|
|
f"Target bitrate: {target_bitrate/1000000:.1f} Mbps"
|
|
)
|
|
return True, reason
|
|
|
|
return False, ""
|
|
|
|
def estimate_target_bitrate(self, profile: Dict, resolution: Tuple[int, int],
|
|
fps: float) -> int:
|
|
"""
|
|
Estimate target bitrate based on profile settings
|
|
|
|
This uses common CRF-to-bitrate approximations for different codecs
|
|
"""
|
|
width, height = resolution
|
|
pixels = width * height
|
|
|
|
crf = profile.get('quality', 23)
|
|
codec = profile.get('encoder', 'cpu_x265')
|
|
|
|
# Determine codec type
|
|
if 'x265' in codec or 'h265' in codec or 'hevc' in codec:
|
|
codec_type = 'h265'
|
|
elif 'av1' in codec:
|
|
codec_type = 'av1'
|
|
else:
|
|
codec_type = 'h264'
|
|
|
|
# Base bitrate estimation (Mbps per megapixel)
|
|
# These are approximations for CRF encoding
|
|
base_bitrates = {
|
|
'h264': {
|
|
18: 0.20, # Near lossless
|
|
21: 0.15,
|
|
23: 0.10, # Good quality
|
|
26: 0.06,
|
|
28: 0.04
|
|
},
|
|
'h265': {
|
|
18: 0.10, # H.265 is ~50% more efficient
|
|
21: 0.075,
|
|
23: 0.05,
|
|
26: 0.03,
|
|
28: 0.02
|
|
},
|
|
'av1': {
|
|
18: 0.07, # AV1 is ~70% more efficient
|
|
21: 0.05,
|
|
23: 0.035,
|
|
26: 0.02,
|
|
28: 0.015
|
|
}
|
|
}
|
|
|
|
# Get closest CRF value
|
|
crf_values = sorted(base_bitrates[codec_type].keys())
|
|
closest_crf = min(crf_values, key=lambda x: abs(x - crf))
|
|
|
|
mbps_per_megapixel = base_bitrates[codec_type][closest_crf]
|
|
|
|
# Calculate target bitrate
|
|
megapixels = pixels / 1000000
|
|
target_mbps = mbps_per_megapixel * megapixels * (fps / 24) # Normalize to 24fps
|
|
target_bitrate = int(target_mbps * 1000000) # Convert to bps
|
|
|
|
return target_bitrate
|
|
|
|
def check_before_encode(self, filepath: Path, profile: Dict,
|
|
warn_threshold: float = 10.0,
|
|
error_threshold: float = 20.0) -> Dict:
|
|
"""
|
|
Comprehensive quality check before encoding
|
|
|
|
Returns:
|
|
{
|
|
'ok': bool,
|
|
'warning': bool,
|
|
'error': bool,
|
|
'message': str,
|
|
'source_quality': VideoQuality,
|
|
'estimated_target_bitrate': int,
|
|
'quality_drop': float
|
|
}
|
|
"""
|
|
result = {
|
|
'ok': True,
|
|
'warning': False,
|
|
'error': False,
|
|
'message': '',
|
|
'source_quality': None,
|
|
'estimated_target_bitrate': 0,
|
|
'quality_drop': 0
|
|
}
|
|
|
|
# Analyze source quality
|
|
source_quality = self.analyze_quality(filepath)
|
|
if not source_quality:
|
|
result['ok'] = False
|
|
result['error'] = True
|
|
result['message'] = "Failed to analyze source video quality"
|
|
return result
|
|
|
|
result['source_quality'] = source_quality
|
|
|
|
# Estimate target bitrate
|
|
target_bitrate = self.estimate_target_bitrate(
|
|
profile,
|
|
source_quality.resolution,
|
|
source_quality.fps
|
|
)
|
|
result['estimated_target_bitrate'] = target_bitrate
|
|
|
|
# Check for quality degradation
|
|
target_codec = profile.get('encoder', 'cpu_x265')
|
|
will_degrade, reason = self.will_degrade_quality(
|
|
source_quality,
|
|
target_bitrate,
|
|
target_codec,
|
|
warn_threshold
|
|
)
|
|
|
|
if will_degrade:
|
|
quality_drop = source_quality.quality_score - self._calculate_quality_score(
|
|
target_bitrate,
|
|
source_quality.resolution[0],
|
|
source_quality.resolution[1],
|
|
target_codec,
|
|
source_quality.fps
|
|
)
|
|
result['quality_drop'] = quality_drop
|
|
|
|
if quality_drop >= error_threshold:
|
|
result['ok'] = False
|
|
result['error'] = True
|
|
result['message'] = f"⚠️ CRITICAL: {reason}"
|
|
else:
|
|
result['warning'] = True
|
|
result['message'] = f"⚠️ WARNING: {reason}"
|
|
|
|
# Check for HDR content
|
|
if source_quality.is_hdr and profile.get('hdr_handling') != 'preserve':
|
|
result['warning'] = True
|
|
result['message'] += "\n⚠️ HDR content detected but HDR handling is not set to 'preserve'"
|
|
|
|
return result
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Test module
|
|
import sys
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
checker = QualityChecker()
|
|
|
|
if len(sys.argv) > 1:
|
|
filepath = Path(sys.argv[1])
|
|
quality = checker.analyze_quality(filepath)
|
|
|
|
if quality:
|
|
print(f"\nVideo Quality Analysis:")
|
|
print(f" Resolution: {quality.resolution[0]}x{quality.resolution[1]}")
|
|
print(f" Bitrate: {quality.bitrate/1000000:.2f} Mbps")
|
|
print(f" Codec: {quality.codec}")
|
|
print(f" FPS: {quality.fps:.2f}")
|
|
print(f" HDR: {quality.is_hdr}")
|
|
print(f" Quality Score: {quality.quality_score}/100")
|
|
else:
|
|
print("Usage: python quality_checker.py <video_file>")
|