--- name: building-threat-intelligence-feed-integration description: 'Builds automated threat intelligence feed integration pipelines connecting STIX/TAXII feeds, open-source threat intel, and commercial TI platforms into SIEM and security tools for real-time IOC matching and alerting. Use when SOC teams need to operationalize threat intelligence by automating feed ingestion, normalization, scoring, and distribution to detection systems. ' domain: cybersecurity subdomain: soc-operations tags: - soc - threat-intelligence - stix - taxii - misp - feeds - ioc - siem-integration version: '1.0' author: mahipal license: Apache-2.0 nist_csf: - DE.CM-01 - DE.AE-02 - RS.MA-01 - DE.AE-06 --- # Building Threat Intelligence Feed Integration ## When to Use Use this skill when: - SOC teams need automated ingestion of threat intelligence feeds into SIEM platforms - Multiple TI sources require normalization into a common format (STIX 2.1) - Detection systems need real-time IOC matching against network and endpoint telemetry - TI feed quality assessment and deduplication processes need to be established **Do not use** for manual IOC lookup — use dedicated enrichment tools (VirusTotal, AbuseIPDB) for ad-hoc queries. ## Prerequisites - MISP instance or Threat Intelligence Platform (TIP) for feed aggregation - STIX/TAXII client library (`taxii2-client`, `stix2` Python packages) - SIEM platform (Splunk ES, Elastic Security, or Sentinel) with TI framework configured - API keys for commercial and open-source feeds (AlienVault OTX, Abuse.ch, CISA AIS) - Python 3.8+ for feed processing automation ## Workflow ### Step 1: Identify and Catalog Intelligence Sources Map available feeds by type, format, and update frequency: | Feed Source | Format | IOC Types | Update Freq | Cost | |-------------|--------|-----------|-------------|------| | AlienVault OTX | STIX/JSON | IP, Domain, Hash, URL | Real-time | Free | | Abuse.ch URLhaus | CSV/JSON | URL, Domain | Every 5 min | Free | | Abuse.ch MalwareBazaar | JSON API | File Hash | Real-time | Free | | CISA AIS | STIX/TAXII 2.1 | All types | Daily | Free (US Gov) | | CrowdStrike Intel | STIX/JSON | All types + Actor TTP | Real-time | Commercial | | Mandiant Advantage | STIX 2.1 | All types + Reports | Real-time | Commercial | ### Step 2: Ingest STIX/TAXII Feeds Connect to a TAXII 2.1 server and download indicators: ```python from taxii2client.v21 import Server, Collection from stix2 import parse # Connect to TAXII server (example: CISA AIS) server = Server( "https://taxii.cisa.gov/taxii2/", user="your_username", password="your_password" ) # List available collections for api_root in server.api_roots: print(f"API Root: {api_root.title}") for collection in api_root.collections: print(f" Collection: {collection.title} (ID: {collection.id})") # Fetch indicators from a collection collection = Collection( "https://taxii.cisa.gov/taxii2/collections/COLLECTION_ID/", user="your_username", password="your_password" ) # Get indicators added in last 24 hours from datetime import datetime, timedelta added_after = (datetime.utcnow() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S.000Z") response = collection.get_objects(added_after=added_after, type=["indicator"]) for obj in response.get("objects", []): indicator = parse(obj) print(f"Type: {indicator.type}") print(f"Pattern: {indicator.pattern}") print(f"Valid Until: {indicator.valid_until}") print(f"Confidence: {indicator.confidence}") print("---") ``` ### Step 3: Ingest Open-Source Feeds **Abuse.ch URLhaus Feed:** ```python import requests import csv from io import StringIO # Download URLhaus recent URLs response = requests.get("https://urlhaus.abuse.ch/downloads/csv_recent/") reader = csv.reader(StringIO(response.text), delimiter=',') indicators = [] for row in reader: if row[0].startswith("#"): continue indicators.append({ "id": row[0], "dateadded": row[1], "url": row[2], "url_status": row[3], "threat": row[5], "tags": row[6] }) print(f"Ingested {len(indicators)} URLs from URLhaus") # Filter for active threats only active = [i for i in indicators if i["url_status"] == "online"] print(f"Active threats: {len(active)}") ``` **AlienVault OTX Pulse Feed:** ```python from OTXv2 import OTXv2, IndicatorTypes otx = OTXv2("YOUR_OTX_API_KEY") # Get subscribed pulses (last 24 hours) pulses = otx.getall(modified_since="2024-03-14T00:00:00") for pulse in pulses: print(f"Pulse: {pulse['name']}") print(f"Tags: {pulse['tags']}") for indicator in pulse["indicators"]: print(f" IOC: {indicator['indicator']} ({indicator['type']})") ``` **Abuse.ch Feodo Tracker (C2 IPs):** ```python response = requests.get("https://feodotracker.abuse.ch/downloads/ipblocklist_recommended.json") c2_data = response.json() for entry in c2_data: print(f"IP: {entry['ip_address']}:{entry['port']}") print(f"Malware: {entry['malware']}") print(f"First Seen: {entry['first_seen']}") print(f"Last Online: {entry['last_online']}") ``` ### Step 4: Normalize and Deduplicate Convert all feeds to STIX 2.1 format for standardization: ```python from stix2 import Indicator, Bundle import hashlib def create_stix_indicator(ioc_value, ioc_type, source, confidence=50): """Convert raw IOC to STIX 2.1 indicator""" pattern_map = { "ipv4": f"[ipv4-addr:value = '{ioc_value}']", "domain": f"[domain-name:value = '{ioc_value}']", "url": f"[url:value = '{ioc_value}']", "sha256": f"[file:hashes.'SHA-256' = '{ioc_value}']", "md5": f"[file:hashes.MD5 = '{ioc_value}']", } return Indicator( name=f"{ioc_type}: {ioc_value}", pattern=pattern_map[ioc_type], pattern_type="stix", valid_from="2024-03-15T00:00:00Z", confidence=confidence, labels=[source], custom_properties={"x_source_feed": source} ) # Deduplicate across sources seen_iocs = set() unique_indicators = [] for ioc in all_collected_iocs: ioc_hash = hashlib.sha256(f"{ioc['type']}:{ioc['value']}".encode()).hexdigest() if ioc_hash not in seen_iocs: seen_iocs.add(ioc_hash) unique_indicators.append( create_stix_indicator(ioc["value"], ioc["type"], ioc["source"]) ) bundle = Bundle(objects=unique_indicators) print(f"Unique indicators: {len(unique_indicators)}") ``` ### Step 5: Push to SIEM Threat Intelligence Framework **Push to Splunk ES Threat Intelligence:** ```python import requests splunk_url = "https://splunk.company.com:8089" headers = {"Authorization": f"Bearer {splunk_token}"} for indicator in unique_indicators: # Extract IOC value from STIX pattern ioc_value = indicator.pattern.split("'")[1] # Upload to Splunk ES threat intel collection data = { "ip": ioc_value, "description": indicator.name, "weight": indicator.confidence // 10, "threat_key": indicator.id, "source_feed": indicator.get("x_source_feed", "unknown") } requests.post( f"{splunk_url}/services/data/threat_intel/item/ip_intel", headers=headers, data=data, verify=not os.environ.get("SKIP_TLS_VERIFY", "").lower() == "true", # Set SKIP_TLS_VERIFY=true for self-signed certs in lab environments ) ``` **Push to MISP for centralized management:** ```python from pymisp import PyMISP, MISPEvent, MISPAttribute misp = PyMISP("https://misp.company.com", "YOUR_MISP_API_KEY") # Create event for feed batch event = MISPEvent() event.info = f"TI Feed Import - {datetime.now().strftime('%Y-%m-%d')}" event.threat_level_id = 2 # Medium event.analysis = 2 # Completed # Add indicators as attributes for ioc in unique_indicators: attr = MISPAttribute() attr.type = "ip-dst" if "ipv4" in ioc.pattern else "domain" attr.value = ioc.pattern.split("'")[1] attr.to_ids = True attr.comment = f"Source: {ioc.get('x_source_feed', 'mixed')}" event.add_attribute(**attr) result = misp.add_event(event) print(f"MISP Event created: {result['Event']['id']}") ``` ### Step 6: Monitor Feed Health and Quality Track feed effectiveness metrics: ```spl index=threat_intel sourcetype="threat_intel_manager" | stats count AS total_iocs, dc(threat_key) AS unique_iocs, dc(source_feed) AS feed_count by source_feed | join source_feed [ search index=notable source="Threat Intelligence" | stats count AS matches by source_feed ] | eval match_rate = round(matches / unique_iocs * 100, 2) | sort - match_rate | table source_feed, unique_iocs, matches, match_rate ``` ## Key Concepts | Term | Definition | |------|-----------| | **STIX 2.1** | Structured Threat Information Expression — standardized JSON format for sharing threat intelligence objects | | **TAXII** | Trusted Automated eXchange of Indicator Information — transport protocol for sharing STIX data via REST API | | **TIP** | Threat Intelligence Platform — centralized system for aggregating, scoring, and distributing threat intelligence | | **IOC Scoring** | Process of assigning confidence values to indicators based on source reliability and corroboration | | **Feed Deduplication** | Removing duplicate IOCs across multiple sources while preserving multi-source attribution | | **IOC Expiration** | Time-to-live policy removing aged indicators (IP: 30 days, Domain: 90 days, Hash: 1 year) | ## Tools & Systems - **MISP**: Open-source threat intelligence platform for feed aggregation, correlation, and sharing - **AlienVault OTX**: Free threat intelligence sharing platform with community pulse feeds - **Abuse.ch**: Suite of free threat feeds (URLhaus, MalwareBazaar, Feodo Tracker, ThreatFox) - **OpenCTI**: Open-source cyber threat intelligence platform supporting STIX 2.1 native storage - **TAXII2 Client**: Python library for connecting to STIX/TAXII 2.1 servers for automated indicator retrieval ## Common Scenarios - **New Feed Onboarding**: Evaluate feed quality, map fields to STIX, configure automated ingestion pipeline - **Multi-SIEM Distribution**: Push normalized IOCs from MISP to Splunk, Elastic, and Sentinel simultaneously - **False Positive Reduction**: Score IOCs by source count and age, expire stale indicators automatically - **Feed Quality Audit**: Compare detection match rates across feeds to identify highest-value sources - **Incident IOC Sharing**: Package investigation IOCs as STIX bundle and share with ISACs via TAXII ## Output Format ``` THREAT INTEL FEED STATUS — Daily Report ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ Date: 2024-03-15 Total IOCs: 45,892 active indicators Feed Health: Feed IOCs Matches Match Rate Status Abuse.ch URLhaus 12,340 47 0.38% HEALTHY AlienVault OTX 18,567 23 0.12% HEALTHY Abuse.ch Feodo 1,203 12 1.00% HEALTHY CISA AIS 8,945 8 0.09% HEALTHY CrowdStrike Intel 4,837 31 0.64% HEALTHY Actions Today: New IOCs ingested: 1,247 IOCs expired: 892 Duplicates removed: 156 SIEM matches: 121 notable events generated False positives: 3 (CDN IPs removed from feed) ```