!/usr/bin/env python3
"""
Media Integrity Verification Script - Azure Edition
Verifies audio files haven't been tampered with by comparing local vs Azure downloads
"""
import os
import sys
import json
import time
import hashlib
import subprocess
import tempfile
import shutil
import urllib.parse
from datetime import datetime
from azure.identity import AzureCliCredential
from azure.mgmt.compute import ComputeManagementClient
from azure.mgmt.network import NetworkManagementClient
from azure.mgmt.resource import ResourceManagementClient
class MediaIntegrityChecker:
def check_dependency_versions(self):
"""Print versions of key dependencies to verify alignment with Azure VM"""
print("\n๐ง Dependency Versions Check (Local):\n")
commands = {
"yt-dlp": ["yt-dlp", "--version"],
"ffmpeg": ["ffmpeg", "-version"],
"ffprobe": ["ffprobe", "-version"],
"python3": ["python3", "--version"],
"system": ["uname", "-a"]
}
for name, cmd in commands.items():
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
print(f"{name}:\n{result.stdout.strip()}\n")
except Exception as e:
print(f"{name}: โ Failed to retrieve version ({e})\n")
def init(self):
self.results = {
'timestamp': datetime.now().isoformat(),
'checks': []
}
self.temp_dir = tempfile.mkdtemp()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 | # Persistent archive of local downloads
self.archive_dir = os.path.expanduser('~/IntegrityCheckerArchive')
os.makedirs(self.archive_dir, exist_ok=True)
# Azure Configuration
self.location = 'eastus'
self.resource_group_base = 'media-checker-rg'
# Get Azure subscription
self.setup_azure_credentials()
def setup_azure_credentials(self):
"""Ensure Azure CLI is logged in and get subscription"""
try:
# Test Azure CLI login
result = subprocess.run(
['az', 'account', 'show'],
capture_output=True,
text=True
)
if result.returncode != 0:
print("โ Azure CLI not logged in")
print("Please run: az login")
sys.exit(1)
account_info = json.loads(result.stdout)
self.subscription_id = account_info['id']
print(f"โ Using Azure subscription: {account_info['name']}")
# Initialize Azure clients
self.credential = AzureCliCredential()
self.resource_client = ResourceManagementClient(
self.credential, self.subscription_id
)
self.compute_client = ComputeManagementClient(
self.credential, self.subscription_id
)
self.network_client = NetworkManagementClient(
self.credential, self.subscription_id
)
except Exception as e:
print(f"โ Azure setup failed: {str(e)}")
print("\nPlease ensure:")
print("1. Azure CLI is installed: curl -sL https://aka.ms/InstallAzureCLIDeb | sudo bash")
print("2. You're logged in: az login")
sys.exit(1)
# ==== Added: Torsocks detection helper ====
@staticmethod
def _needs_torsocks(url: str) -> bool:
"""Return True if URL likely points to an Invidious instance or .onion."""
try:
clean = MediaIntegrityChecker._clean_url(url)
host = urllib.parse.urlparse(clean).netloc.lower()
if not host:
return False
if host.endswith('.onion'):
return True
# Heuristics for invidious instances
if 'invidious' in host:
return True
if host in {'yewtu.be', 'vid.puffyan.us'}:
return True
return False
except Exception:
return False
# ==========================================
def calculate_checksums(self, filepath):
"""Calculate multiple checksums for a file, after stripping audio metadata and logging environment versions"""
# Log environment versions for reproducibility
version_log = os.path.join(self.temp_dir, 'env_versions.log')
with open(version_log, 'w') as f:
subprocess.run(['yt-dlp', '--version'], stdout=f, stderr=subprocess.DEVNULL)
subprocess.run(['ffmpeg', '-version'], stdout=f, stderr=subprocess.DEVNULL)
subprocess.run(['python3', '--version'], stdout=f, stderr=subprocess.DEVNULL)
subprocess.run(['uname', '-a'], stdout=f, stderr=subprocess.DEVNULL)
# Strip audio metadata before hashing
stripped_path = filepath.replace('.mp3', '_stripped.mp3')
subprocess.run([
'ffmpeg', '-y', '-i', filepath,
'-map_metadata', '-1', '-c', 'copy',
stripped_path
], check=True)
filepath = stripped_path
checksums = {}
algorithms = ['md5', 'sha256', 'sha512']
for algo in algorithms:
hash_func = hashlib.new(algo)
with open(filepath, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_func.update(chunk)
checksums[algo] = hash_func.hexdigest()
# Get file size
checksums['size'] = os.path.getsize(filepath)
# Get audio metadata
try:
result = subprocess.run([
'ffprobe', '-v', 'quiet', '-print_format', 'json',
'-show_format', filepath
], capture_output=True, text=True)
if result.returncode == 0:
metadata = json.loads(result.stdout)
format_info = metadata.get('format', {})
checksums['duration'] = format_info.get('duration', 'unknown')
checksums['bitrate'] = format_info.get('bit_rate', 'unknown')
except:
pass
return checksums
@staticmethod
def _clean_url(url: str) -> str:
"""Normalize URLs that were passed with escaped ? and = (e.g. \\?v\\=id)."""
# Drop backslashes and decode accidental % encodings
url = url.replace('\\?', '?').replace('\\=', '=').replace('\\&', '&').replace('\\', '')
return urllib.parse.unquote(url).strip()
def download_local(self, url):
"""Download media locally as MP3 using yt-dlp with cookies.txt (optionally via torsocks for Invidious)."""
output_path = os.path.join(self.temp_dir, 'local_%(title)s.%(ext)s')
try:
clean_url = self._clean_url(url)
cmd = []
if self._needs_torsocks(clean_url):
cmd.append('torsocks')
cmd += [
'yt-dlp',
'--cookies', 'cookies.txt',
'-x',
'--audio-format', 'mp3',
'-o', output_path,
clean_url
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
# Find the downloaded file
for file in os.listdir(self.temp_dir):
if file.startswith('local_') and file.endswith('.mp3'):
return os.path.join(self.temp_dir, file)
return None
except subprocess.CalledProcessError as e:
print(f"Local download failed: {e.stderr}")
return None
def archive_local_copy(self, local_file_path: str, source_url: str, checksums: dict):
"""Persistently archive the local file and write a small metadata sidecar."""
try:
ts = datetime.utcnow().strftime('%Y%m%d_%H%M%S')
base = os.path.basename(local_file_path)
archive_name = f"{ts}_{base}"
archive_path = os.path.join(self.archive_dir, archive_name)
shutil.copy2(local_file_path, archive_path)
meta_path = os.path.join(self.archive_dir, f"{ts}_{os.path.splitext(base)[0]}.meta")
with open(meta_path, 'w') as mf:
mf.write(f"URL: {source_url}\n")
mf.write(f"Timestamp (UTC): {ts}\n")
for k in ['md5', 'sha256', 'sha512', 'size', 'duration', 'bitrate']:
if k in checksums:
mf.write(f"{k.upper()}: {checksums[k]}\n")
print(f"๐ Archived local copy: {archive_path}")
print(f"๐ Metadata: {meta_path}")
except Exception as e:
print(f"โ ๏ธ Failed to archive local copy: {e}")
def setup_azure_vm(self):
"""Create a simple Azure VM for media checking"""
# Generate unique names
timestamp = int(time.time())
rg_name = f"{self.resource_group_base}-{timestamp}"
vm_name = f"vm{timestamp}"
print(f" Creating resource group: {rg_name}")
try:
# Create resource group
self.resource_client.resource_groups.create_or_update(
rg_name,
{"location": self.location}
)
# Create virtual network
vnet_name = f"vnet-{vm_name}"
subnet_name = "default"
vnet_params = {
"location": self.location,
"address_space": {"address_prefixes": ["10.0.0.0/16"]}
}
vnet_creation = self.network_client.virtual_networks.begin_create_or_update(
rg_name, vnet_name, vnet_params
)
vnet = vnet_creation.result()
# Create subnet
subnet_params = {"address_prefix": "10.0.0.0/24"}
subnet_creation = self.network_client.subnets.begin_create_or_update(
rg_name, vnet_name, subnet_name, subnet_params
)
subnet = subnet_creation.result()
# Create public IP
public_ip_name = f"ip-{vm_name}"
public_ip_params = {
"location": self.location,
"public_ip_allocation_method": "Static",
"sku": {"name": "Standard"}
}
ip_creation = self.network_client.public_ip_addresses.begin_create_or_update(
rg_name, public_ip_name, public_ip_params
)
public_ip = ip_creation.result()
# Create network security group with SSH access
nsg_name = f"nsg-{vm_name}"
nsg_params = {
"location": self.location,
"security_rules": [{
"name": "SSH",
"priority": 300,
"protocol": "Tcp",
"access": "Allow",
"direction": "Inbound",
"source_address_prefix": "*",
"source_port_range": "*",
"destination_address_prefix": "*",
"destination_port_range": "22"
}]
}
nsg_creation = self.network_client.network_security_groups.begin_create_or_update(
rg_name, nsg_name, nsg_params
)
nsg = nsg_creation.result()
# Create network interface
nic_name = f"nic-{vm_name}"
nic_params = {
"location": self.location,
"ip_configurations": [{
"name": "ipconfig1",
"subnet": {"id": subnet.id},
"public_ip_address": {"id": public_ip.id}
}],
"network_security_group": {"id": nsg.id}
}
nic_creation = self.network_client.network_interfaces.begin_create_or_update(
rg_name, nic_name, nic_params
)
nic = nic_creation.result()
# Create VM
print(f" Creating VM: {vm_name}")
# VM creation script
# ==== Modified: install tor + torsocks and wait for Tor ====
custom_data = """#!/bin/bash
|
set -euxo pipefail
Small retry helper for apt
apt_retry() {
for i in $(seq 1 10); do
DEBIAN_FRONTEND=noninteractive apt-get -y "$@" && return 0
sleep 5
done
return 1
}
Refresh and base deps
apt-get update || true
apt_retry update
apt_retry install ca-certificates curl wget ffmpeg tor torsocks python3-pip software-properties-common iproute2
Ensure Python >= 3.9 on Ubuntu 20.04 (Focal) via deadsnakes; Jammy+ already has 3.10
. /etc/os-release
if [ "${VERSION_ID:-}" = "20.04" ]; then
add-apt-repository -y ppa:deadsnakes/ppa
apt_retry update
apt_retry install python3.10 python3.10-venv python3.10-distutils
update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.10 2
update-alternatives --set python3 /usr/bin/python3.10
fi
Enable Tor and wait for SOCKS (9050) to come up
systemctl enable --now tor || true
for i in $(seq 1 60); do
if ss -lnt | grep -q ':9050 '; then
break
fi
sleep 2
done
Install yt-dlp (standalone binary; no external Python needed)
curl -L https://github.com/yt-dlp/yt-dlp/releases/latest/download/yt-dlp -o /usr/local/bin/yt-dlp
chmod a+rx /usr/local/bin/yt-dlp
Sanity checks for logs
command -v yt-dlp
/usr/local/bin/yt-dlp --version || true
command -v torsocks
torsocks true || true
python3 --version || true
Signal completion
touch /tmp/setup_complete
"""
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 | # ===========================================================
import base64
custom_data_encoded = base64.b64encode(custom_data.encode()).decode()
vm_params = {
"location": self.location,
"storage_profile": {
"image_reference": {
"publisher": "Canonical",
"offer": "0001-com-ubuntu-server-focal",
"sku": "20_04-lts-gen2",
"version": "latest"
}
},
"hardware_profile": {"vm_size": "Standard_B1s"},
"os_profile": {
"computer_name": vm_name,
"admin_username": "azure_user",
"linux_configuration": {
"disable_password_authentication": True,
"ssh": {
"public_keys": [{
"path": "/home/azureuser/.ssh/authorized_keys",
"key_data": "azure_password"
}]
}
},
"custom_data": custom_data_encoded
},
"network_profile": {
"network_interfaces": [{"id": nic.id}]
}
}
vm_creation = self.compute_client.virtual_machines.begin_create_or_update(
rg_name, vm_name, vm_params
)
vm = vm_creation.result()
# Get public IP address
public_ip = self.network_client.public_ip_addresses.get(rg_name, public_ip_name)
print(f" VM created with IP: {public_ip.ip_address}")
return {
"rg_name": rg_name,
"vm_name": vm_name,
"public_ip": public_ip.ip_address,
"username": "azureuser"
}
except Exception as e:
print(f" VM creation failed: {str(e)}")
# Try to cleanup
try:
self.resource_client.resource_groups.begin_delete(rg_name)
except:
pass
return None
def get_or_create_ssh_key(self):
"""Get or create SSH key for Azure Media Checker VMs"""
ssh_key_path = os.path.expanduser("~/.ssh/azure_media_checker.pub")
ssh_key_private = os.path.expanduser("~/.ssh/azure_media_checker")
if not os.path.exists(ssh_key_path):
print(" Creating SSH key pair for azure_media_checker...")
subprocess.run([
'ssh-keygen', '-t', 'rsa', '-b', '2048',
'-f', ssh_key_private,
'-N', '', # No passphrase
'-C', 'azure-media-checker'
], check=True)
with open(ssh_key_path, 'r') as f:
return f.read().strip()
def download_on_vm(self, vm_info, url):
"""Download media on Azure Media Checker VM via SSH and return checksums"""
import paramiko
max_retries = 5
retry_delay = 20
for attempt in range(max_retries):
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh_key_path = os.path.expanduser("~/.ssh/azure_media_checker")
ssh.connect(
vm_info['public_ip'],
username=vm_info['username'],
key_filename=ssh_key_path,
timeout=30
)
# Upload cookies.txt from project root
sftp = ssh.open_sftp()
try:
sftp.put("cookies.txt", "cookies.txt")
except Exception as e:
print(f" Warning: Failed to upload cookies.txt: {e}")
sftp.close()
# Wait for setup to complete
print(" Waiting for VM setup to complete...")
for _ in range(30):
stdin, stdout, stderr = ssh.exec_command('test -f /tmp/setup_complete && echo "ready"')
if stdout.read().decode().strip() == "ready":
break
time.sleep(10)
# Download file as MP3 (normalize URL)
print(f" Downloading on Azure VM...")
clean_url = self._clean_url(url)
safe_url = clean_url.replace("'", "'\"'\"'")
prefix = "torsocks " if self._needs_torsocks(clean_url) else ""
cmd = (
f"{prefix}yt-dlp --cookies cookies.txt -x --audio-format mp3 -o \"audio.%(ext)s\" '{safe_url}'"
)
stdin, stdout, stderr = ssh.exec_command(cmd, timeout=300)
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
error = stderr.read().decode()
print(f" Download failed: {error}")
ssh.close()
return None
# Find the downloaded file
# We'll expect audio.mp3
remote_file = "audio.mp3"
# Calculate checksums on VM
checksums = {}
checksum_cmds = {
'md5': f'md5sum {remote_file} | cut -d" " -f1',
'sha256': f'sha256sum {remote_file} | cut -d" " -f1',
'sha512': f'sha512sum {remote_file} | cut -d" " -f1',
'size': f'stat -c%s {remote_file}'
}
for name, cmd2 in checksum_cmds.items():
stdin, stdout, stderr = ssh.exec_command(cmd2)
result = stdout.read().decode().strip()
if name == 'size':
checksums[name] = int(result)
else:
checksums[name] = result
# Get audio metadata
stdin, stdout, stderr = ssh.exec_command(
f'ffprobe -v quiet -print_format json -show_format {remote_file}'
)
try:
metadata = json.loads(stdout.read().decode())
format_info = metadata.get('format', {})
checksums['duration'] = format_info.get('duration', 'unknown')
checksums['bitrate'] = format_info.get('bit_rate', 'unknown')
except:
pass
# Cleanup remote file
ssh.exec_command(f'rm -f {remote_file}')
ssh.close()
return checksums
except Exception as e:
if attempt < max_retries - 1:
print(f" Connection attempt {attempt + 1} failed, retrying in {retry_delay}s...")
time.sleep(retry_delay)
else:
print(f" Azure Media Checker VM operation failed after {max_retries} attempts: {str(e)}")
return None
def cleanup_azure_vm(self, vm_info):
"""Delete Azure Media Checker resource group and all resources"""
if vm_info and vm_info.get('rg_name'):
try:
print(f" Deleting resource group {vm_info['rg_name']}...")
delete_operation = self.resource_client.resource_groups.begin_delete(
vm_info['rg_name']
)
# Don't wait for completion, it happens in background
print(f" Cleanup initiated for {vm_info['rg_name']}")
except:
pass
def verify_media(self, url):
"""Main verification process"""
print(f"\n๐ Verifying: {url}")
check_result = {
'url': url,
'timestamp': datetime.now().isoformat(),
'local': None,
'azure': None,
'verdict': 'UNKNOWN'
}
# Download locally
print("๐ฅ Downloading locally...")
local_file = self.download_local(url)
if local_file:
check_result['local'] = self.calculate_checksums(local_file)
print(f"โ Local download complete: {os.path.basename(local_file)}")
# Persist a local archive copy + metadata
self.archive_local_copy(local_file, self._clean_url(url), check_result['local'])
else:
print("โ Local download failed")
return check_result
# Azure Download
azure_vm = None
try:
print("โ๏ธ Setting up Azure VM...")
azure_vm = self.setup_azure_vm()
if azure_vm:
result = self.download_on_vm(azure_vm, url)
if result:
check_result['azure'] = result
print("โ Azure download complete")
else:
print("โ Azure download failed")
finally:
# Cleanup VM
print("๐งน Cleaning up Azure VM...")
self.cleanup_azure_vm(azure_vm)
# Compare checksums
check_result['verdict'] = self.analyze_results(check_result)
self.results['checks'].append(check_result)
# Save to history
self.save_check_history(check_result)
return check_result
def analyze_results(self, check):
"""Compare checksums between local and Azure"""
if not check['local'] or not check['azure']:
return 'โ INSUFFICIENT_DATA - Need both local and Azure downloads'
# Compare checksums
mismatches = []
for algo in ['md5', 'sha256', 'sha512']:
if check['local'].get(algo) != check['azure'].get(algo):
mismatches.append(algo)
# Compare sizes
if check['local'].get('size') != check['azure'].get('size'):
mismatches.append('size')
if mismatches:
return f'โ ๏ธ TAMPERED - Mismatches: {", ".join(mismatches)}'
else:
return 'โ
VERIFIED - All checksums match'
def save_check_history(self, check_result):
"""Save check result to history file"""
history_file = "media_check_history.json"
# Load existing history
history = {"checks": []}
if os.path.exists(history_file):
try:
with open(history_file, 'r') as f:
history = json.load(f)
except:
pass
# Add new check
history["checks"].append(check_result)
# Keep last 1000 checks
history["checks"] = history["checks"][-1000:]
# Save
with open(history_file, 'w') as f:
json.dump(history, f, indent=2)
def generate_report(self):
"""Generate detailed report"""
report = f"""
|
Media Integrity Verification Report
Generated: {self.results['timestamp']}
Method: Local vs Azure VM Comparison
=====================================
Summary:
--------"""
Total URLs checked: {len(self.results['checks'])}
โ
Verified: {verified_count}
โ ๏ธ Tampered: {tampered_count}
โ Failed: {failed_count}
Detailed Results:
-----------------"""
def main():
print("""
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Media Integrity Checker - Azure Edition โ
โ Simple & Effective โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
""")
if name == "main":
main()