#!/usr/bin/env python3 """ analyze_triple_cve.py - Analyze Triple CVE Covert Channel Traffic This script analyzes pcap files captured from the triple CVE covert channel and generates detailed visualizations of: - CVE-2023-1206: IPv6 hash collision trigger bursts - CVE-2025-40040: KSM timing-based key agreement sync - Phase 3 signals for CVE-2024-49882 hugepage transfer Usage: python3 analyze_triple_cve.py capture.pcap python3 analyze_triple_cve.py capture.pcap --output analysis.png """ import sys import os import subprocess import argparse from pathlib import Path try: import pandas as pd import matplotlib.pyplot as plt import matplotlib.patches as mpatches import numpy as np except ImportError as e: print(f"Missing dependency: {e}") print("Install with: pip3 install pandas matplotlib numpy") sys.exit(1) # Magic numbers (little-endian in packet data) TRIGGER_MAGIC = ['0612feca', 'cafe1206'] # 0xCAFE1206 SYNC_MAGIC = ['4000adde', 'dead0040'] # 0xDEAD0040 MSG_MAGIC = ['3713dec0', 'c0de1337'] # 0xC0DE1337 # Ports TRIGGER_PORT = 31336 SYNC_PORT = 31337 def extract_packets_from_pcap(pcap_file): """Extract packet data from pcap using tshark.""" cmd = [ 'tshark', '-r', pcap_file, '-T', 'fields', '-e', 'frame.number', '-e', 'frame.time_relative', '-e', 'frame.time_delta', '-e', 'ip.src', '-e', 'ip.dst', '-e', 'udp.srcport', '-e', 'udp.dstport', '-e', 'udp.length', '-e', 'data.data', '-E', 'header=y', '-E', 'separator=,' ] try: result = subprocess.run(cmd, capture_output=True, text=True, check=True) # Parse CSV output lines = result.stdout.strip().split('\n') if len(lines) < 2: return pd.DataFrame() # Create DataFrame import io df = pd.read_csv(io.StringIO(result.stdout)) df.columns = ['frame_num', 'time_rel', 'time_delta', 'src_ip', 'dst_ip', 'src_port', 'dst_port', 'length', 'data'] return df except subprocess.CalledProcessError as e: print(f"tshark error: {e.stderr}") return pd.DataFrame() except FileNotFoundError: print("tshark not found. Install with: sudo apt install wireshark-cli") sys.exit(1) def classify_packet(row): """Classify packet based on magic numbers and ports.""" data = str(row.get('data', '')).lower().replace(':', '') dst_port = row.get('dst_port', 0) # Check magic numbers for magic in TRIGGER_MAGIC: if magic in data: return 'TRIGGER', 'CVE-2023-1206' for magic in SYNC_MAGIC: if magic in data: # Check if it's key agreement (rounds 0-255) or phase3 signal (0xFF) return 'SYNC', 'Key Agreement' for magic in MSG_MAGIC: if magic in data: return 'MESSAGE', 'CVE-2024-49882' # Fallback to port-based classification if dst_port == TRIGGER_PORT: return 'TRIGGER', 'CVE-2023-1206' elif dst_port == SYNC_PORT: return 'SYNC', 'Key Agreement' return 'OTHER', 'Unknown' def extract_sync_round(row): """Extract round number from sync packet data.""" data = str(row.get('data', '')).lower().replace(':', '') # Sync packet format: magic(4) + round(1) + value(1) # In hex: 8 chars magic + 2 chars round + 2 chars value for magic in SYNC_MAGIC: idx = data.find(magic) if idx >= 0: try: round_hex = data[idx + 8:idx + 10] return int(round_hex, 16) except: pass return -1 def analyze_phases(df): """Analyze the three phases of communication.""" phases = {} # Phase 1: Trigger detection trigger_df = df[df['type'] == 'TRIGGER'] if len(trigger_df) > 0: phases['trigger'] = { 'start': trigger_df['time_rel'].min(), 'end': trigger_df['time_rel'].max(), 'count': len(trigger_df), 'duration_ms': (trigger_df['time_rel'].max() - trigger_df['time_rel'].min()) * 1000, 'rate_pps': len(trigger_df) / max(trigger_df['time_rel'].max() - trigger_df['time_rel'].min(), 0.001) } # Phase 2: Key agreement sync_df = df[df['type'] == 'SYNC'] if len(sync_df) > 0: phases['key_agreement'] = { 'start': sync_df['time_rel'].min(), 'end': sync_df['time_rel'].max(), 'count': len(sync_df), 'duration_s': sync_df['time_rel'].max() - sync_df['time_rel'].min(), 'rounds_detected': sync_df['round'].nunique() if 'round' in sync_df else 0 } return phases def plot_analysis(df, output_file, phases): """Generate comprehensive visualization.""" fig = plt.figure(figsize=(18, 14)) fig.suptitle('Triple CVE Covert Channel Analysis\n' + 'CVE-2023-1206 (Trigger) → CVE-2025-40040 (Key Agreement) → CVE-2024-49882 (Data)', fontsize=14, fontweight='bold') colors = { 'TRIGGER': '#FF4444', 'SYNC': '#4444FF', 'MESSAGE': '#44AA44', 'OTHER': '#888888' } # 1. Full timeline ax1 = fig.add_subplot(4, 2, 1) for ptype in df['type'].unique(): mask = df['type'] == ptype y_val = {'TRIGGER': 3, 'SYNC': 2, 'MESSAGE': 1, 'OTHER': 0}.get(ptype, 0) ax1.scatter(df[mask]['time_rel'], [y_val] * sum(mask), c=colors.get(ptype, '#888888'), label=ptype, alpha=0.6, s=30) ax1.set_xlabel('Time (seconds)') ax1.set_ylabel('Packet Type') ax1.set_yticks([0, 1, 2, 3]) ax1.set_yticklabels(['Other', 'Message', 'Sync', 'Trigger']) ax1.set_title('Complete Packet Timeline') ax1.legend(loc='upper right') ax1.grid(True, alpha=0.3) # Add phase markers if 'trigger' in phases: ax1.axvspan(phases['trigger']['start'], phases['trigger']['end'], alpha=0.2, color='red', label='Phase 1') if 'key_agreement' in phases: ax1.axvspan(phases['key_agreement']['start'], phases['key_agreement']['end'], alpha=0.2, color='blue', label='Phase 2') # 2. Packet rate histogram ax2 = fig.add_subplot(4, 2, 2) max_time = df['time_rel'].max() bins = np.arange(0, max_time + 0.05, 0.05) # 50ms bins for ptype in ['TRIGGER', 'SYNC']: mask = df['type'] == ptype if sum(mask) > 0: ax2.hist(df[mask]['time_rel'], bins=bins, alpha=0.7, label=ptype, color=colors.get(ptype)) ax2.set_xlabel('Time (seconds)') ax2.set_ylabel('Packets per 50ms') ax2.set_title('Packet Rate - CVE-2023-1206 Burst Detection') ax2.legend() ax2.grid(True, alpha=0.3) # 3. Trigger burst detail ax3 = fig.add_subplot(4, 2, 3) trigger_df = df[df['type'] == 'TRIGGER'] if len(trigger_df) > 1: times = trigger_df['time_rel'].values # Zoom to trigger period ax3.scatter(range(len(times)), (times - times.min()) * 1000, c='red', s=10, alpha=0.6) ax3.set_xlabel('Packet Number') ax3.set_ylabel('Time from first trigger (ms)') ax3.set_title(f'CVE-2023-1206 Trigger Burst Detail ({len(trigger_df)} packets)') # Add burst rate annotation if len(times) > 1: duration = times.max() - times.min() rate = len(times) / duration if duration > 0 else 0 ax3.annotate(f'Rate: {rate:.0f} pkt/s\nDuration: {duration*1000:.1f}ms', xy=(0.95, 0.95), xycoords='axes fraction', ha='right', va='top', fontsize=10, bbox=dict(boxstyle='round', facecolor='wheat')) else: ax3.text(0.5, 0.5, 'No trigger packets', ha='center', va='center') ax3.grid(True, alpha=0.3) # 4. Trigger inter-arrival times ax4 = fig.add_subplot(4, 2, 4) if len(trigger_df) > 1: times = trigger_df['time_rel'].values inter_arrival = np.diff(times) * 1000000 # microseconds ax4.hist(inter_arrival, bins=50, color='red', alpha=0.7, edgecolor='darkred') ax4.set_xlabel('Inter-arrival Time (μs)') ax4.set_ylabel('Count') ax4.set_title('Trigger Packet Inter-arrival Distribution') ax4.axvline(np.median(inter_arrival), color='black', linestyle='--', label=f'Median: {np.median(inter_arrival):.1f}μs') ax4.legend() ax4.grid(True, alpha=0.3) # 5. Key agreement sync timeline ax5 = fig.add_subplot(4, 2, 5) sync_df = df[df['type'] == 'SYNC'] if len(sync_df) > 0: times = sync_df['time_rel'].values rounds = sync_df['round'].values if 'round' in sync_df else range(len(times)) ax5.scatter(times, rounds, c='blue', s=15, alpha=0.6) ax5.set_xlabel('Time (seconds)') ax5.set_ylabel('Round Number') ax5.set_title(f'Key Agreement Sync Packets ({len(sync_df)} packets)') ax5.set_ylim(-5, 260) ax5.axhline(255, color='gray', linestyle='--', alpha=0.5, label='Round 255') else: ax5.text(0.5, 0.5, 'No sync packets', ha='center', va='center') ax5.grid(True, alpha=0.3) # 6. Sync inter-arrival times ax6 = fig.add_subplot(4, 2, 6) if len(sync_df) > 1: times = sync_df['time_rel'].values inter_arrival = np.diff(times) * 1000 # milliseconds ax6.plot(range(len(inter_arrival)), inter_arrival, 'b-', alpha=0.5, linewidth=0.5) ax6.scatter(range(len(inter_arrival)), inter_arrival, c='blue', s=5, alpha=0.5) ax6.set_xlabel('Packet Number') ax6.set_ylabel('Inter-arrival Time (ms)') ax6.set_title('Key Agreement Timing (CVE-2025-40040 rounds)') ax6.axhline(30, color='red', linestyle='--', alpha=0.5, label='KSM wait (30ms)') ax6.legend() ax6.grid(True, alpha=0.3) # 7. Bidirectional flow ax7 = fig.add_subplot(4, 2, 7) # Determine direction src_ips = df['src_ip'].unique() if len(src_ips) >= 2: ip1, ip2 = sorted(src_ips)[:2] mask1 = df['src_ip'] == ip1 mask2 = df['src_ip'] == ip2 ax7.scatter(df[mask1]['time_rel'], [1] * sum(mask1), c=df[mask1]['type'].map(colors), s=20, alpha=0.6, label=f'From {ip1}') ax7.scatter(df[mask2]['time_rel'], [2] * sum(mask2), c=df[mask2]['type'].map(colors), s=20, alpha=0.6, label=f'From {ip2}') ax7.set_xlabel('Time (seconds)') ax7.set_ylabel('Direction') ax7.set_yticks([1, 2]) ax7.set_yticklabels([ip1, ip2]) ax7.set_title('Bidirectional Communication Flow') ax7.legend(loc='upper right') else: ax7.text(0.5, 0.5, 'Single direction only', ha='center', va='center') ax7.grid(True, alpha=0.3) # 8. Statistics ax8 = fig.add_subplot(4, 2, 8) ax8.axis('off') stats = f""" ╔══════════════════════════════════════════════════════════╗ ║ ANALYSIS SUMMARY ║ ╠══════════════════════════════════════════════════════════╣ ║ Total Packets: {len(df):<43}║ ║ Duration: {df['time_rel'].max():.2f} seconds{' '*35}║ ╠══════════════════════════════════════════════════════════╣ ║ PHASE 1: CVE-2023-1206 (IPv6 Hash Collision Trigger) ║ ║ ────────────────────────────────────────────────────── ║""" if 'trigger' in phases: p = phases['trigger'] stats += f""" ║ Packets: {p['count']:<46}║ ║ Duration: {p['duration_ms']:.1f}ms{' '*40}║ ║ Rate: {p['rate_pps']:.0f} packets/second{' '*33}║""" else: stats += f""" ║ No trigger packets detected{' '*27}║""" stats += f""" ╠══════════════════════════════════════════════════════════╣ ║ PHASE 2: CVE-2025-40040 (KSM Timing Key Agreement) ║ ║ ────────────────────────────────────────────────────── ║""" if 'key_agreement' in phases: p = phases['key_agreement'] stats += f""" ║ Sync Packets: {p['count']:<41}║ ║ Duration: {p['duration_s']:.1f}s{' '*41}║ ║ Expected: ~512 (256 rounds × 2 parties){' '*16}║""" else: stats += f""" ║ No sync packets detected{' '*30}║""" stats += f""" ╠══════════════════════════════════════════════════════════╣ ║ PHASE 3: CVE-2024-49882 (Hugepage Data Transfer) ║ ║ ────────────────────────────────────────────────────── ║ ║ (Data transfer via memory, minimal network traffic) ║ ╚══════════════════════════════════════════════════════════╝ """ ax8.text(0.02, 0.98, stats, transform=ax8.transAxes, fontfamily='monospace', fontsize=9, verticalalignment='top', bbox=dict(boxstyle='round', facecolor='lightyellow', alpha=0.8)) plt.tight_layout() plt.savefig(output_file, dpi=150, bbox_inches='tight') print(f"\nPlot saved to: {output_file}") return fig def main(): parser = argparse.ArgumentParser( description='Analyze Triple CVE Covert Channel Traffic', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python3 analyze_triple_cve.py capture.pcap python3 analyze_triple_cve.py capture.pcap --output report.png python3 analyze_triple_cve.py capture.pcap --show """ ) parser.add_argument('pcap', help='Input pcap file') parser.add_argument('--output', '-o', help='Output plot file (default: analysis_.png)') parser.add_argument('--show', '-s', action='store_true', help='Show plot interactively') parser.add_argument('--csv', '-c', help='Export packet data to CSV') args = parser.parse_args() if not os.path.exists(args.pcap): print(f"Error: File not found: {args.pcap}") sys.exit(1) print(f"Analyzing: {args.pcap}") print("=" * 60) # Extract packets print("Extracting packets with tshark...") df = extract_packets_from_pcap(args.pcap) if len(df) == 0: print("No packets found in capture!") sys.exit(1) print(f"Found {len(df)} packets") # Classify packets print("Classifying packets...") df[['type', 'cve']] = df.apply(lambda row: pd.Series(classify_packet(row)), axis=1) df['round'] = df.apply(extract_sync_round, axis=1) # Print summary print("\nPacket breakdown:") for ptype in df['type'].unique(): count = sum(df['type'] == ptype) print(f" {ptype}: {count}") # Analyze phases print("\nAnalyzing phases...") phases = analyze_phases(df) # Export CSV if requested if args.csv: df.to_csv(args.csv, index=False) print(f"\nCSV exported to: {args.csv}") # Generate plot output_file = args.output or f"analysis_{Path(args.pcap).stem}.png" plot_analysis(df, output_file, phases) if args.show: plt.show() if __name__ == '__main__': main()