#!/usr/bin/env python3 """ Quality Checker Module for encoderPro ====================================== Detects source video quality and warns if encoding will degrade quality. """ import json import logging import re import subprocess from dataclasses import dataclass from pathlib import Path from typing import Optional, Dict, Tuple @dataclass class VideoQuality: """Video quality metrics""" bitrate: int # bits per second resolution: Tuple[int, int] # (width, height) codec: str fps: float is_hdr: bool quality_score: float # 0-100, estimated quality def to_dict(self) -> Dict: return { 'bitrate': self.bitrate, 'resolution': f"{self.resolution[0]}x{self.resolution[1]}", 'codec': self.codec, 'fps': self.fps, 'is_hdr': self.is_hdr, 'quality_score': self.quality_score } class QualityChecker: """Analyzes video quality before encoding""" def __init__(self, logger: Optional[logging.Logger] = None): self.logger = logger or logging.getLogger(__name__) def get_video_info(self, filepath: Path) -> Optional[Dict]: """Extract detailed video information using ffprobe""" try: cmd = [ 'ffprobe', '-v', 'quiet', '-print_format', 'json', '-show_format', '-show_streams', '-select_streams', 'v:0', str(filepath) ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) if result.returncode != 0: self.logger.error(f"ffprobe failed for {filepath}: {result.stderr}") return None data = json.loads(result.stdout) return data except subprocess.TimeoutExpired: self.logger.error(f"ffprobe timeout for {filepath}") return None except json.JSONDecodeError as e: self.logger.error(f"Failed to parse ffprobe output: {e}") return None except Exception as e: self.logger.error(f"Error getting video info: {e}") return None def analyze_quality(self, filepath: Path) -> Optional[VideoQuality]: """Analyze video quality metrics""" info = self.get_video_info(filepath) if not info: return None try: # Get video stream video_stream = None if 'streams' in info and len(info['streams']) > 0: video_stream = info['streams'][0] if not video_stream: self.logger.error(f"No video stream found in {filepath}") return None # Extract metrics width = int(video_stream.get('width', 0)) height = int(video_stream.get('height', 0)) codec = video_stream.get('codec_name', 'unknown') # Get bitrate bitrate = 0 if 'bit_rate' in video_stream: bitrate = int(video_stream['bit_rate']) elif 'format' in info and 'bit_rate' in info['format']: bitrate = int(info['format']['bit_rate']) # Get FPS fps_str = video_stream.get('r_frame_rate', '0/1') try: num, den = fps_str.split('/') fps = float(num) / float(den) if float(den) != 0 else 0 except: fps = 0 # Detect HDR is_hdr = self._detect_hdr(video_stream) # Calculate quality score quality_score = self._calculate_quality_score( bitrate, width, height, codec, fps ) return VideoQuality( bitrate=bitrate, resolution=(width, height), codec=codec, fps=fps, is_hdr=is_hdr, quality_score=quality_score ) except Exception as e: self.logger.error(f"Error analyzing quality: {e}") return None def _detect_hdr(self, stream: Dict) -> bool: """Detect if video has HDR""" # Check for HDR transfer characteristics transfer = stream.get('color_transfer', '').lower() if 'smpte2084' in transfer or 'arib-std-b67' in transfer: return True # Check for HDR color primaries primaries = stream.get('color_primaries', '').lower() if 'bt2020' in primaries: return True # Check tags tags = stream.get('tags', {}) if any('hdr' in str(v).lower() for v in tags.values()): return True return False def _calculate_quality_score(self, bitrate: int, width: int, height: int, codec: str, fps: float) -> float: """ Calculate a quality score (0-100) based on video metrics This is a heuristic score based on: - Bitrate per pixel - Resolution - Codec efficiency - Frame rate """ if width == 0 or height == 0: return 0 pixels = width * height # Bitrate per pixel per frame if fps > 0: bits_per_pixel = bitrate / (pixels * fps) else: bits_per_pixel = bitrate / pixels # Codec efficiency multiplier codec_multiplier = { 'hevc': 1.5, # H.265 is more efficient 'h265': 1.5, 'av1': 1.8, # AV1 is very efficient 'h264': 1.0, # Baseline 'avc': 1.0, 'mpeg2': 0.5, # Less efficient 'mpeg4': 0.7, }.get(codec.lower(), 1.0) # Effective bits per pixel (adjusted for codec) effective_bpp = bits_per_pixel * codec_multiplier # Quality score based on bits per pixel # Typical ranges: # < 0.1 bpp: Poor quality (score < 50) # 0.1-0.2 bpp: Acceptable (score 50-70) # 0.2-0.3 bpp: Good (score 70-85) # 0.3-0.5 bpp: Excellent (score 85-95) # > 0.5 bpp: Near lossless (score 95-100) if effective_bpp >= 0.5: score = 95 + min(effective_bpp * 10, 5) elif effective_bpp >= 0.3: score = 85 + (effective_bpp - 0.3) * 50 elif effective_bpp >= 0.2: score = 70 + (effective_bpp - 0.2) * 150 elif effective_bpp >= 0.1: score = 50 + (effective_bpp - 0.1) * 200 else: score = min(effective_bpp * 500, 50) return min(round(score, 1), 100) def will_degrade_quality(self, source_quality: VideoQuality, target_bitrate: int, target_codec: str, threshold: float = 10.0) -> Tuple[bool, str]: """ Check if encoding will significantly degrade quality Args: source_quality: Source video quality metrics target_bitrate: Target encoding bitrate target_codec: Target codec threshold: Quality score drop threshold (default 10 points) Returns: (will_degrade: bool, reason: str) """ # Calculate target quality score target_quality_score = self._calculate_quality_score( target_bitrate, source_quality.resolution[0], source_quality.resolution[1], target_codec, source_quality.fps ) quality_drop = source_quality.quality_score - target_quality_score if quality_drop > threshold: reason = ( f"Encoding will degrade quality by {quality_drop:.1f} points " f"(from {source_quality.quality_score:.1f} to {target_quality_score:.1f}). " f"Source bitrate: {source_quality.bitrate/1000000:.1f} Mbps, " f"Target bitrate: {target_bitrate/1000000:.1f} Mbps" ) return True, reason return False, "" def estimate_target_bitrate(self, profile: Dict, resolution: Tuple[int, int], fps: float) -> int: """ Estimate target bitrate based on profile settings This uses common CRF-to-bitrate approximations for different codecs """ width, height = resolution pixels = width * height crf = profile.get('quality', 23) codec = profile.get('encoder', 'cpu_x265') # Determine codec type if 'x265' in codec or 'h265' in codec or 'hevc' in codec: codec_type = 'h265' elif 'av1' in codec: codec_type = 'av1' else: codec_type = 'h264' # Base bitrate estimation (Mbps per megapixel) # These are approximations for CRF encoding base_bitrates = { 'h264': { 18: 0.20, # Near lossless 21: 0.15, 23: 0.10, # Good quality 26: 0.06, 28: 0.04 }, 'h265': { 18: 0.10, # H.265 is ~50% more efficient 21: 0.075, 23: 0.05, 26: 0.03, 28: 0.02 }, 'av1': { 18: 0.07, # AV1 is ~70% more efficient 21: 0.05, 23: 0.035, 26: 0.02, 28: 0.015 } } # Get closest CRF value crf_values = sorted(base_bitrates[codec_type].keys()) closest_crf = min(crf_values, key=lambda x: abs(x - crf)) mbps_per_megapixel = base_bitrates[codec_type][closest_crf] # Calculate target bitrate megapixels = pixels / 1000000 target_mbps = mbps_per_megapixel * megapixels * (fps / 24) # Normalize to 24fps target_bitrate = int(target_mbps * 1000000) # Convert to bps return target_bitrate def check_before_encode(self, filepath: Path, profile: Dict, warn_threshold: float = 10.0, error_threshold: float = 20.0) -> Dict: """ Comprehensive quality check before encoding Returns: { 'ok': bool, 'warning': bool, 'error': bool, 'message': str, 'source_quality': VideoQuality, 'estimated_target_bitrate': int, 'quality_drop': float } """ result = { 'ok': True, 'warning': False, 'error': False, 'message': '', 'source_quality': None, 'estimated_target_bitrate': 0, 'quality_drop': 0 } # Analyze source quality source_quality = self.analyze_quality(filepath) if not source_quality: result['ok'] = False result['error'] = True result['message'] = "Failed to analyze source video quality" return result result['source_quality'] = source_quality # Estimate target bitrate target_bitrate = self.estimate_target_bitrate( profile, source_quality.resolution, source_quality.fps ) result['estimated_target_bitrate'] = target_bitrate # Check for quality degradation target_codec = profile.get('encoder', 'cpu_x265') will_degrade, reason = self.will_degrade_quality( source_quality, target_bitrate, target_codec, warn_threshold ) if will_degrade: quality_drop = source_quality.quality_score - self._calculate_quality_score( target_bitrate, source_quality.resolution[0], source_quality.resolution[1], target_codec, source_quality.fps ) result['quality_drop'] = quality_drop if quality_drop >= error_threshold: result['ok'] = False result['error'] = True result['message'] = f"⚠️ CRITICAL: {reason}" else: result['warning'] = True result['message'] = f"⚠️ WARNING: {reason}" # Check for HDR content if source_quality.is_hdr and profile.get('hdr_handling') != 'preserve': result['warning'] = True result['message'] += "\n⚠️ HDR content detected but HDR handling is not set to 'preserve'" return result if __name__ == '__main__': # Test module import sys logging.basicConfig(level=logging.INFO) checker = QualityChecker() if len(sys.argv) > 1: filepath = Path(sys.argv[1]) quality = checker.analyze_quality(filepath) if quality: print(f"\nVideo Quality Analysis:") print(f" Resolution: {quality.resolution[0]}x{quality.resolution[1]}") print(f" Bitrate: {quality.bitrate/1000000:.2f} Mbps") print(f" Codec: {quality.codec}") print(f" FPS: {quality.fps:.2f}") print(f" HDR: {quality.is_hdr}") print(f" Quality Score: {quality.quality_score}/100") else: print("Usage: python quality_checker.py ")