--- name: performing-malware-hash-enrichment-with-virustotal description: Enrich malware file hashes using the VirusTotal API to retrieve detection rates, behavioral analysis, YARA matches, and contextual threat intelligence for incident triage and IOC validation. domain: cybersecurity subdomain: threat-intelligence tags: - virustotal - malware-analysis - hash-enrichment - ioc - threat-intelligence - triage - api - detection version: '1.0' author: mahipal license: Apache-2.0 nist_csf: - ID.RA-01 - ID.RA-05 - DE.CM-01 - DE.AE-02 --- # Performing Malware Hash Enrichment with VirusTotal ## Overview VirusTotal is the world's largest crowdsourced malware corpus, scanning files with 70+ antivirus engines and providing behavioral analysis, YARA rule matches, network indicators, and community intelligence. This skill covers using the VirusTotal API v3 to enrich file hashes (MD5, SHA-1, SHA-256) with detection verdicts, sandbox reports, related indicators, and contextual intelligence for SOC triage, incident response, and threat intelligence enrichment workflows. ## When to Use - When conducting security assessments that involve performing malware hash enrichment with virustotal - When following incident response procedures for related security events - When performing scheduled security testing or auditing activities - When validating security controls through hands-on testing ## Prerequisites - Python 3.9+ with `vt-py` (official VirusTotal Python client) or `requests` - VirusTotal API key (free tier: 4 requests/minute, 500/day; premium for higher limits) - Understanding of file hash types: MD5, SHA-1, SHA-256 - Familiarity with AV detection naming conventions - STIX 2.1 knowledge for IOC representation ## Key Concepts ### VirusTotal API v3 The API provides RESTful endpoints for file reports (`/files/{hash}`), URL scanning, domain reports, IP address intelligence, and advanced hunting with VirusTotal Intelligence (VTI). Each file report includes detection results from 70+ AV engines, behavioral analysis from sandboxes, YARA rule matches, sigma rule matches, file metadata (PE headers, imports, sections), network indicators (contacted IPs, domains, URLs), and community votes and comments. ### Hash Enrichment Workflow The typical enrichment flow is: receive hash from alert/EDR -> query VT API -> parse detection ratio -> extract behavioral indicators -> correlate with existing intelligence -> make triage decision. The API returns a `last_analysis_stats` object with `malicious`, `suspicious`, `undetected`, and `harmless` counts. ### Pivoting from Hashes VirusTotal enables pivoting from a single hash to related intelligence: similar files (ITW/in-the-wild samples), contacted domains and IPs (C2 infrastructure), dropped files, embedded URLs, YARA rule matches, and threat actor attribution through crowdsourced intelligence. ## Workflow ### Step 1: Query VirusTotal for Hash Report ```python import vt import json import hashlib from datetime import datetime class VTEnricher: def __init__(self, api_key): self.client = vt.Client(api_key) def enrich_hash(self, file_hash): """Enrich a file hash with VirusTotal intelligence.""" try: file_obj = self.client.get_object(f"/files/{file_hash}") stats = file_obj.last_analysis_stats report = { "hash": file_hash, "sha256": file_obj.sha256, "sha1": file_obj.sha1, "md5": file_obj.md5, "file_type": getattr(file_obj, "type_description", "Unknown"), "file_size": getattr(file_obj, "size", 0), "first_submission": str(getattr(file_obj, "first_submission_date", "")), "last_analysis_date": str(getattr(file_obj, "last_analysis_date", "")), "detection_stats": { "malicious": stats.get("malicious", 0), "suspicious": stats.get("suspicious", 0), "undetected": stats.get("undetected", 0), "harmless": stats.get("harmless", 0), }, "detection_ratio": f"{stats.get('malicious', 0)}/{sum(stats.values())}", "popular_threat_names": getattr(file_obj, "popular_threat_classification", {}), "tags": getattr(file_obj, "tags", []), "names": getattr(file_obj, "names", []), } total_engines = sum(stats.values()) mal_count = stats.get("malicious", 0) report["threat_level"] = ( "critical" if mal_count > total_engines * 0.7 else "high" if mal_count > total_engines * 0.4 else "medium" if mal_count > total_engines * 0.1 else "low" if mal_count > 0 else "clean" ) print(f"[+] {file_hash[:16]}... -> {report['detection_ratio']} " f"({report['threat_level'].upper()})") return report except vt.error.APIError as e: print(f"[-] VT API error for {file_hash}: {e}") return None def get_behavior_report(self, file_hash): """Get sandbox behavioral analysis for a file.""" try: behaviors = self.client.get_object(f"/files/{file_hash}/behaviours") behavior_data = { "processes_created": [], "files_written": [], "registry_keys_set": [], "dns_lookups": [], "http_conversations": [], "mutexes_created": [], "commands_executed": [], } for sandbox in getattr(behaviors, "data", []): attrs = sandbox.get("attributes", {}) behavior_data["processes_created"].extend( attrs.get("processes_created", [])) behavior_data["files_written"].extend( [f.get("path", "") for f in attrs.get("files_written", [])]) behavior_data["registry_keys_set"].extend( [r.get("key", "") for r in attrs.get("registry_keys_set", [])]) behavior_data["dns_lookups"].extend( [d.get("hostname", "") for d in attrs.get("dns_lookups", [])]) behavior_data["commands_executed"].extend( attrs.get("command_executions", [])) return behavior_data except Exception as e: print(f"[-] Behavior report error: {e}") return {} def close(self): self.client.close() # Usage enricher = VTEnricher("YOUR_VT_API_KEY") report = enricher.enrich_hash("275a021bbfb6489e54d471899f7db9d1663fc695ec2fe2a2c4538aabf651fd0f") print(json.dumps(report, indent=2, default=str)) enricher.close() ``` ### Step 2: Batch Hash Enrichment with Rate Limiting ```python import time import csv def batch_enrich(api_key, hash_file, output_file, rate_limit=4): """Enrich a list of hashes from a file with rate limiting.""" enricher = VTEnricher(api_key) results = [] with open(hash_file, "r") as f: hashes = [line.strip() for line in f if line.strip()] print(f"[*] Enriching {len(hashes)} hashes (rate: {rate_limit}/min)") for i, file_hash in enumerate(hashes): report = enricher.enrich_hash(file_hash) if report: results.append(report) if (i + 1) % rate_limit == 0: print(f" [{i+1}/{len(hashes)}] Rate limit pause (60s)...") time.sleep(60) # Export to CSV with open(output_file, "w", newline="") as f: if results: writer = csv.DictWriter(f, fieldnames=results[0].keys()) writer.writeheader() for r in results: flat = {k: str(v) for k, v in r.items()} writer.writerow(flat) print(f"[+] Enrichment complete: {len(results)}/{len(hashes)} hashes") print(f"[+] Results saved to {output_file}") enricher.close() return results batch_enrich("YOUR_API_KEY", "hashes.txt", "enrichment_results.csv") ``` ### Step 3: Extract Network Indicators for Pivoting ```python def extract_network_iocs(api_key, file_hash): """Extract network-based IOCs from VT for C2 identification.""" client = vt.Client(api_key) network_iocs = { "contacted_ips": [], "contacted_domains": [], "contacted_urls": [], "embedded_urls": [], } try: # Get contacted IPs it = client.iterator(f"/files/{file_hash}/contacted_ips") for ip_obj in it: network_iocs["contacted_ips"].append({ "ip": ip_obj.id, "country": getattr(ip_obj, "country", ""), "asn": getattr(ip_obj, "asn", 0), "as_owner": getattr(ip_obj, "as_owner", ""), }) # Get contacted domains it = client.iterator(f"/files/{file_hash}/contacted_domains") for domain_obj in it: network_iocs["contacted_domains"].append({ "domain": domain_obj.id, "registrar": getattr(domain_obj, "registrar", ""), "creation_date": str(getattr(domain_obj, "creation_date", "")), }) # Get contacted URLs it = client.iterator(f"/files/{file_hash}/contacted_urls") for url_obj in it: network_iocs["contacted_urls"].append({ "url": url_obj.url, "last_http_response_code": getattr(url_obj, "last_http_response_content_length", 0), }) except Exception as e: print(f"[-] Error extracting network IOCs: {e}") finally: client.close() print(f"[+] Network IOCs: {len(network_iocs['contacted_ips'])} IPs, " f"{len(network_iocs['contacted_domains'])} domains, " f"{len(network_iocs['contacted_urls'])} URLs") return network_iocs ``` ### Step 4: YARA Rule Matching and Threat Classification ```python def get_yara_matches(api_key, file_hash): """Retrieve YARA rule matches for threat classification.""" client = vt.Client(api_key) try: file_obj = client.get_object(f"/files/{file_hash}") crowdsourced_yara = getattr(file_obj, "crowdsourced_yara_results", []) matches = [] for rule in crowdsourced_yara: matches.append({ "rule_name": rule.get("rule_name", ""), "ruleset_name": rule.get("ruleset_name", ""), "author": rule.get("author", ""), "description": rule.get("description", ""), "source": rule.get("source", ""), }) # Classify based on YARA matches classifications = set() for m in matches: rule_lower = m["rule_name"].lower() if any(k in rule_lower for k in ["apt", "nation", "state"]): classifications.add("apt") if any(k in rule_lower for k in ["ransom", "crypto"]): classifications.add("ransomware") if any(k in rule_lower for k in ["trojan", "rat", "backdoor"]): classifications.add("trojan") if any(k in rule_lower for k in ["loader", "dropper"]): classifications.add("loader") print(f"[+] YARA: {len(matches)} rules matched") print(f"[+] Classifications: {classifications or {'unclassified'}}") return {"matches": matches, "classifications": list(classifications)} finally: client.close() ``` ### Step 5: Generate Enrichment Report ```python def generate_enrichment_report(hash_report, behavior, network, yara_data): """Generate comprehensive enrichment report.""" report = { "metadata": { "generated": datetime.now().isoformat(), "hash": hash_report.get("sha256", ""), }, "verdict": { "threat_level": hash_report.get("threat_level", "unknown"), "detection_ratio": hash_report.get("detection_ratio", "0/0"), "classifications": yara_data.get("classifications", []), "threat_names": hash_report.get("popular_threat_names", {}), }, "behavioral_indicators": { "processes": behavior.get("processes_created", [])[:10], "dns_queries": behavior.get("dns_lookups", [])[:10], "commands": behavior.get("commands_executed", [])[:10], }, "network_indicators": { "c2_candidates": network.get("contacted_ips", [])[:10], "domains": network.get("contacted_domains", [])[:10], }, "yara_matches": yara_data.get("matches", [])[:10], "recommendation": ( "BLOCK and investigate" if hash_report.get("threat_level") in ("critical", "high") else "Monitor and analyze" if hash_report.get("threat_level") == "medium" else "Low risk - continue monitoring" ), } with open(f"enrichment_{hash_report.get('sha256', 'unknown')[:16]}.json", "w") as f: json.dump(report, f, indent=2, default=str) return report ``` ## Validation Criteria - VT API v3 queried successfully with proper authentication - File hash enriched with detection stats, behavioral data, and network indicators - Batch enrichment handles rate limiting correctly - Network IOCs extracted for C2 identification - YARA matches retrieved and used for classification - Enrichment report generated with actionable verdict ## References - [VirusTotal API v3 Documentation](https://docs.virustotal.com/reference/overview) - [vt-py Official Python Client](https://github.com/VirusTotal/vt-py) - [VirusTotal Intelligence](https://www.virustotal.com/gui/intelligence-overview) - [Torq: VT Hash Enrichment Workflow](https://kb.torq.io/en/articles/9350251-virustotal-file-hash-enrichment-with-cache-workflow-template) - [Dynatrace: Enrich Observables with VT](https://www.dynatrace.com/news/blog/enrich-observables-with-virustotal-threat-intelligence/) - [Penligent: VT in Incident Response](https://www.penligent.ai/hackinglabs/virustotal-in-incident-response-how-to-identify-malware-fast-and-pivot-without-leaking-data/)