--- name: threat-mitigation-mapping description: Map identified threats to appropriate security controls and mitigations. Use when prioritizing security investments, creating remediation plans, or validating control effectiveness. --- # Threat Mitigation Mapping Connect threats to controls for effective security planning. ## When to Use This Skill - Prioritizing security investments - Creating remediation roadmaps - Validating control coverage - Designing defense-in-depth - Security architecture review - Risk treatment planning ## Core Concepts ### 1. Control Categories ``` Preventive ────► Stop attacks before they occur │ (Firewall, Input validation) │ Detective ─────► Identify attacks in progress │ (IDS, Log monitoring) │ Corrective ────► Respond and recover from attacks (Incident response, Backup restore) ``` ### 2. Control Layers | Layer | Examples | |-------|----------| | **Network** | Firewall, WAF, DDoS protection | | **Application** | Input validation, authentication | | **Data** | Encryption, access controls | | **Endpoint** | EDR, patch management | | **Process** | Security training, incident response | ### 3. Defense in Depth ``` ┌──────────────────────┐ │ Perimeter │ ← Firewall, WAF │ ┌──────────────┐ │ │ │ Network │ │ ← Segmentation, IDS │ │ ┌────────┐ │ │ │ │ │ Host │ │ │ ← EDR, Hardening │ │ │ ┌────┐ │ │ │ │ │ │ │App │ │ │ │ ← Auth, Validation │ │ │ │Data│ │ │ │ ← Encryption │ │ │ └────┘ │ │ │ │ │ └────────┘ │ │ │ └──────────────┘ │ └──────────────────────┘ ``` ## Templates ### Template 1: Mitigation Model ```python from dataclasses import dataclass, field from enum import Enum from typing import List, Dict, Optional, Set from datetime import datetime class ControlType(Enum): PREVENTIVE = "preventive" DETECTIVE = "detective" CORRECTIVE = "corrective" class ControlLayer(Enum): NETWORK = "network" APPLICATION = "application" DATA = "data" ENDPOINT = "endpoint" PROCESS = "process" PHYSICAL = "physical" class ImplementationStatus(Enum): NOT_IMPLEMENTED = "not_implemented" PARTIAL = "partial" IMPLEMENTED = "implemented" VERIFIED = "verified" class Effectiveness(Enum): NONE = 0 LOW = 1 MEDIUM = 2 HIGH = 3 VERY_HIGH = 4 @dataclass class SecurityControl: id: str name: str description: str control_type: ControlType layer: ControlLayer effectiveness: Effectiveness implementation_cost: str # Low, Medium, High maintenance_cost: str status: ImplementationStatus = ImplementationStatus.NOT_IMPLEMENTED mitigates_threats: List[str] = field(default_factory=list) dependencies: List[str] = field(default_factory=list) technologies: List[str] = field(default_factory=list) compliance_refs: List[str] = field(default_factory=list) def coverage_score(self) -> float: """Calculate coverage score based on status and effectiveness.""" status_multiplier = { ImplementationStatus.NOT_IMPLEMENTED: 0.0, ImplementationStatus.PARTIAL: 0.5, ImplementationStatus.IMPLEMENTED: 0.8, ImplementationStatus.VERIFIED: 1.0, } return self.effectiveness.value * status_multiplier[self.status] @dataclass class Threat: id: str name: str category: str # STRIDE category description: str impact: str # Critical, High, Medium, Low likelihood: str risk_score: float @dataclass class MitigationMapping: threat: Threat controls: List[SecurityControl] residual_risk: str = "Unknown" notes: str = "" def calculate_coverage(self) -> float: """Calculate how well controls cover the threat.""" if not self.controls: return 0.0 total_score = sum(c.coverage_score() for c in self.controls) max_possible = len(self.controls) * Effectiveness.VERY_HIGH.value return (total_score / max_possible) * 100 if max_possible > 0 else 0 def has_defense_in_depth(self) -> bool: """Check if multiple layers are covered.""" layers = set(c.layer for c in self.controls if c.status != ImplementationStatus.NOT_IMPLEMENTED) return len(layers) >= 2 def has_control_diversity(self) -> bool: """Check if multiple control types are present.""" types = set(c.control_type for c in self.controls if c.status != ImplementationStatus.NOT_IMPLEMENTED) return len(types) >= 2 @dataclass class MitigationPlan: name: str threats: List[Threat] = field(default_factory=list) controls: List[SecurityControl] = field(default_factory=list) mappings: List[MitigationMapping] = field(default_factory=list) def get_unmapped_threats(self) -> List[Threat]: """Find threats without mitigations.""" mapped_ids = {m.threat.id for m in self.mappings} return [t for t in self.threats if t.id not in mapped_ids] def get_control_coverage(self) -> Dict[str, float]: """Get coverage percentage for each threat.""" return { m.threat.id: m.calculate_coverage() for m in self.mappings } def get_gaps(self) -> List[Dict]: """Identify mitigation gaps.""" gaps = [] for mapping in self.mappings: coverage = mapping.calculate_coverage() if coverage < 50: gaps.append({ "threat": mapping.threat.id, "threat_name": mapping.threat.name, "coverage": coverage, "issue": "Insufficient control coverage", "recommendation": "Add more controls or improve existing ones" }) if not mapping.has_defense_in_depth(): gaps.append({ "threat": mapping.threat.id, "threat_name": mapping.threat.name, "coverage": coverage, "issue": "No defense in depth", "recommendation": "Add controls at different layers" }) if not mapping.has_control_diversity(): gaps.append({ "threat": mapping.threat.id, "threat_name": mapping.threat.name, "coverage": coverage, "issue": "No control diversity", "recommendation": "Add detective/corrective controls" }) return gaps ``` ### Template 2: Control Library ```python class ControlLibrary: """Library of standard security controls.""" STANDARD_CONTROLS = { # Authentication Controls "AUTH-001": SecurityControl( id="AUTH-001", name="Multi-Factor Authentication", description="Require MFA for all user authentication", control_type=ControlType.PREVENTIVE, layer=ControlLayer.APPLICATION, effectiveness=Effectiveness.HIGH, implementation_cost="Medium", maintenance_cost="Low", mitigates_threats=["SPOOFING"], technologies=["TOTP", "WebAuthn", "SMS OTP"], compliance_refs=["PCI-DSS 8.3", "NIST 800-63B"] ), "AUTH-002": SecurityControl( id="AUTH-002", name="Account Lockout Policy", description="Lock accounts after failed authentication attempts", control_type=ControlType.PREVENTIVE, layer=ControlLayer.APPLICATION, effectiveness=Effectiveness.MEDIUM, implementation_cost="Low", maintenance_cost="Low", mitigates_threats=["SPOOFING"], technologies=["Custom implementation"], compliance_refs=["PCI-DSS 8.1.6"] ), # Input Validation Controls "VAL-001": SecurityControl( id="VAL-001", name="Input Validation Framework", description="Validate and sanitize all user input", control_type=ControlType.PREVENTIVE, layer=ControlLayer.APPLICATION, effectiveness=Effectiveness.HIGH, implementation_cost="Medium", maintenance_cost="Medium", mitigates_threats=["TAMPERING", "INJECTION"], technologies=["Joi", "Yup", "Pydantic"], compliance_refs=["OWASP ASVS V5"] ), "VAL-002": SecurityControl( id="VAL-002", name="Web Application Firewall", description="Deploy WAF to filter malicious requests", control_type=ControlType.PREVENTIVE, layer=ControlLayer.NETWORK, effectiveness=Effectiveness.MEDIUM, implementation_cost="Medium", maintenance_cost="Medium", mitigates_threats=["TAMPERING", "INJECTION", "DOS"], technologies=["AWS WAF", "Cloudflare", "ModSecurity"], compliance_refs=["PCI-DSS 6.6"] ), # Encryption Controls "ENC-001": SecurityControl( id="ENC-001", name="Data Encryption at Rest", description="Encrypt sensitive data in storage", control_type=ControlType.PREVENTIVE, layer=ControlLayer.DATA, effectiveness=Effectiveness.HIGH, implementation_cost="Medium", maintenance_cost="Low", mitigates_threats=["INFORMATION_DISCLOSURE"], technologies=["AES-256", "KMS", "HSM"], compliance_refs=["PCI-DSS 3.4", "GDPR Art. 32"] ), "ENC-002": SecurityControl( id="ENC-002", name="TLS Encryption", description="Encrypt data in transit using TLS 1.3", control_type=ControlType.PREVENTIVE, layer=ControlLayer.NETWORK, effectiveness=Effectiveness.HIGH, implementation_cost="Low", maintenance_cost="Low", mitigates_threats=["INFORMATION_DISCLOSURE", "TAMPERING"], technologies=["TLS 1.3", "Certificate management"], compliance_refs=["PCI-DSS 4.1", "HIPAA"] ), # Logging Controls "LOG-001": SecurityControl( id="LOG-001", name="Security Event Logging", description="Log all security-relevant events", control_type=ControlType.DETECTIVE, layer=ControlLayer.APPLICATION, effectiveness=Effectiveness.MEDIUM, implementation_cost="Low", maintenance_cost="Medium", mitigates_threats=["REPUDIATION"], technologies=["ELK Stack", "Splunk", "CloudWatch"], compliance_refs=["PCI-DSS 10.2", "SOC2"] ), "LOG-002": SecurityControl( id="LOG-002", name="Log Integrity Protection", description="Protect logs from tampering", control_type=ControlType.PREVENTIVE, layer=ControlLayer.DATA, effectiveness=Effectiveness.MEDIUM, implementation_cost="Medium", maintenance_cost="Low", mitigates_threats=["REPUDIATION", "TAMPERING"], technologies=["Immutable storage", "Log signing"], compliance_refs=["PCI-DSS 10.5"] ), # Access Control "ACC-001": SecurityControl( id="ACC-001", name="Role-Based Access Control", description="Implement RBAC for authorization", control_type=ControlType.PREVENTIVE, layer=ControlLayer.APPLICATION, effectiveness=Effectiveness.HIGH, implementation_cost="Medium", maintenance_cost="Medium", mitigates_threats=["ELEVATION_OF_PRIVILEGE", "INFORMATION_DISCLOSURE"], technologies=["RBAC", "ABAC", "Policy engines"], compliance_refs=["PCI-DSS 7.1", "SOC2"] ), # Availability Controls "AVL-001": SecurityControl( id="AVL-001", name="Rate Limiting", description="Limit request rates to prevent abuse", control_type=ControlType.PREVENTIVE, layer=ControlLayer.APPLICATION, effectiveness=Effectiveness.MEDIUM, implementation_cost="Low", maintenance_cost="Low", mitigates_threats=["DENIAL_OF_SERVICE"], technologies=["API Gateway", "Redis", "Token bucket"], compliance_refs=["OWASP API Security"] ), "AVL-002": SecurityControl( id="AVL-002", name="DDoS Protection", description="Deploy DDoS mitigation services", control_type=ControlType.PREVENTIVE, layer=ControlLayer.NETWORK, effectiveness=Effectiveness.HIGH, implementation_cost="High", maintenance_cost="Medium", mitigates_threats=["DENIAL_OF_SERVICE"], technologies=["Cloudflare", "AWS Shield", "Akamai"], compliance_refs=["NIST CSF"] ), } def get_controls_for_threat(self, threat_category: str) -> List[SecurityControl]: """Get all controls that mitigate a threat category.""" return [ c for c in self.STANDARD_CONTROLS.values() if threat_category in c.mitigates_threats ] def get_controls_by_layer(self, layer: ControlLayer) -> List[SecurityControl]: """Get controls for a specific layer.""" return [c for c in self.STANDARD_CONTROLS.values() if c.layer == layer] def get_control(self, control_id: str) -> Optional[SecurityControl]: """Get a specific control by ID.""" return self.STANDARD_CONTROLS.get(control_id) def recommend_controls( self, threat: Threat, existing_controls: List[str] ) -> List[SecurityControl]: """Recommend additional controls for a threat.""" available = self.get_controls_for_threat(threat.category) return [c for c in available if c.id not in existing_controls] ``` ### Template 3: Mitigation Analysis ```python class MitigationAnalyzer: """Analyze and optimize mitigation strategies.""" def __init__(self, plan: MitigationPlan, library: ControlLibrary): self.plan = plan self.library = library def calculate_overall_risk_reduction(self) -> float: """Calculate overall risk reduction percentage.""" if not self.plan.mappings: return 0.0 weighted_coverage = 0 total_weight = 0 for mapping in self.plan.mappings: # Weight by threat risk score weight = mapping.threat.risk_score coverage = mapping.calculate_coverage() weighted_coverage += weight * coverage total_weight += weight return weighted_coverage / total_weight if total_weight > 0 else 0 def get_critical_gaps(self) -> List[Dict]: """Find critical gaps that need immediate attention.""" gaps = self.plan.get_gaps() critical_threats = {t.id for t in self.plan.threats if t.impact == "Critical"} return [g for g in gaps if g["threat"] in critical_threats] def optimize_budget( self, budget: float, cost_map: Dict[str, float] ) -> List[SecurityControl]: """Select controls that maximize risk reduction within budget.""" # Simple greedy approach - can be replaced with optimization algorithm recommended = [] remaining_budget = budget unmapped = self.plan.get_unmapped_threats() # Sort controls by effectiveness/cost ratio all_controls = list(self.library.STANDARD_CONTROLS.values()) controls_with_value = [] for control in all_controls: if control.status == ImplementationStatus.NOT_IMPLEMENTED: cost = cost_map.get(control.id, float('inf')) if cost <= remaining_budget: # Calculate value as threats covered * effectiveness / cost threats_covered = len([ t for t in unmapped if t.category in control.mitigates_threats ]) if threats_covered > 0: value = (threats_covered * control.effectiveness.value) / cost controls_with_value.append((control, value, cost)) # Sort by value (higher is better) controls_with_value.sort(key=lambda x: x[1], reverse=True) for control, value, cost in controls_with_value: if cost <= remaining_budget: recommended.append(control) remaining_budget -= cost return recommended def generate_roadmap(self) -> List[Dict]: """Generate implementation roadmap by priority.""" roadmap = [] gaps = self.plan.get_gaps() # Phase 1: Critical threats with low coverage phase1 = [] for gap in gaps: mapping = next( (m for m in self.plan.mappings if m.threat.id == gap["threat"]), None ) if mapping and mapping.threat.impact == "Critical": controls = self.library.get_controls_for_threat(mapping.threat.category) phase1.extend([ { "threat": gap["threat"], "control": c.id, "control_name": c.name, "phase": 1, "priority": "Critical" } for c in controls if c.status == ImplementationStatus.NOT_IMPLEMENTED ]) roadmap.extend(phase1[:5]) # Top 5 for phase 1 # Phase 2: High impact threats phase2 = [] for gap in gaps: mapping = next( (m for m in self.plan.mappings if m.threat.id == gap["threat"]), None ) if mapping and mapping.threat.impact == "High": controls = self.library.get_controls_for_threat(mapping.threat.category) phase2.extend([ { "threat": gap["threat"], "control": c.id, "control_name": c.name, "phase": 2, "priority": "High" } for c in controls if c.status == ImplementationStatus.NOT_IMPLEMENTED ]) roadmap.extend(phase2[:5]) # Top 5 for phase 2 return roadmap def defense_in_depth_analysis(self) -> Dict[str, List[str]]: """Analyze defense in depth coverage.""" layer_coverage = {layer.value: [] for layer in ControlLayer} for mapping in self.plan.mappings: for control in mapping.controls: if control.status in [ImplementationStatus.IMPLEMENTED, ImplementationStatus.VERIFIED]: layer_coverage[control.layer.value].append(control.id) return layer_coverage def generate_report(self) -> str: """Generate comprehensive mitigation report.""" risk_reduction = self.calculate_overall_risk_reduction() gaps = self.plan.get_gaps() critical_gaps = self.get_critical_gaps() layer_coverage = self.defense_in_depth_analysis() report = f""" # Threat Mitigation Report ## Executive Summary - **Overall Risk Reduction:** {risk_reduction:.1f}% - **Total Threats:** {len(self.plan.threats)} - **Total Controls:** {len(self.plan.controls)} - **Identified Gaps:** {len(gaps)} - **Critical Gaps:** {len(critical_gaps)} ## Defense in Depth Coverage {self._format_layer_coverage(layer_coverage)} ## Critical Gaps Requiring Immediate Action {self._format_gaps(critical_gaps)} ## Recommendations {self._format_recommendations()} ## Implementation Roadmap {self._format_roadmap()} """ return report def _format_layer_coverage(self, coverage: Dict[str, List[str]]) -> str: lines = [] for layer, controls in coverage.items(): status = "✓" if controls else "✗" lines.append(f"- {layer}: {status} ({len(controls)} controls)") return "\n".join(lines) def _format_gaps(self, gaps: List[Dict]) -> str: if not gaps: return "No critical gaps identified." lines = [] for gap in gaps: lines.append(f"- **{gap['threat_name']}**: {gap['issue']}") lines.append(f" - Coverage: {gap['coverage']:.1f}%") lines.append(f" - Recommendation: {gap['recommendation']}") return "\n".join(lines) def _format_recommendations(self) -> str: recommendations = [] layer_coverage = self.defense_in_depth_analysis() for layer, controls in layer_coverage.items(): if not controls: recommendations.append(f"- Add {layer} layer controls") gaps = self.plan.get_gaps() if any(g["issue"] == "No control diversity" for g in gaps): recommendations.append("- Add more detective and corrective controls") return "\n".join(recommendations) if recommendations else "Current coverage is adequate." def _format_roadmap(self) -> str: roadmap = self.generate_roadmap() if not roadmap: return "No additional controls recommended at this time." lines = [] current_phase = 0 for item in roadmap: if item["phase"] != current_phase: current_phase = item["phase"] lines.append(f"\n### Phase {current_phase}") lines.append(f"- [{item['priority']}] {item['control_name']} (for {item['threat']})") return "\n".join(lines) ``` ### Template 4: Control Effectiveness Testing ```python from dataclasses import dataclass from typing import List, Callable, Any import asyncio @dataclass class ControlTest: control_id: str test_name: str test_function: Callable[[], bool] expected_result: bool description: str class ControlTester: """Test control effectiveness.""" def __init__(self): self.tests: List[ControlTest] = [] self.results: List[Dict] = [] def add_test(self, test: ControlTest) -> None: self.tests.append(test) async def run_tests(self) -> List[Dict]: """Run all control tests.""" self.results = [] for test in self.tests: try: result = test.test_function() passed = result == test.expected_result self.results.append({ "control_id": test.control_id, "test_name": test.test_name, "passed": passed, "actual_result": result, "expected_result": test.expected_result, "description": test.description, "error": None }) except Exception as e: self.results.append({ "control_id": test.control_id, "test_name": test.test_name, "passed": False, "actual_result": None, "expected_result": test.expected_result, "description": test.description, "error": str(e) }) return self.results def get_effectiveness_score(self, control_id: str) -> float: """Calculate effectiveness score for a control.""" control_results = [r for r in self.results if r["control_id"] == control_id] if not control_results: return 0.0 passed = sum(1 for r in control_results if r["passed"]) return (passed / len(control_results)) * 100 def generate_test_report(self) -> str: """Generate test results report.""" if not self.results: return "No tests have been run." total = len(self.results) passed = sum(1 for r in self.results if r["passed"]) report = f""" # Control Effectiveness Test Report ## Summary - **Total Tests:** {total} - **Passed:** {passed} - **Failed:** {total - passed} - **Pass Rate:** {(passed/total)*100:.1f}% ## Results by Control """ # Group by control controls = {} for result in self.results: cid = result["control_id"] if cid not in controls: controls[cid] = [] controls[cid].append(result) for control_id, results in controls.items(): score = self.get_effectiveness_score(control_id) report += f"\n### {control_id} (Effectiveness: {score:.1f}%)\n" for r in results: status = "✓" if r["passed"] else "✗" report += f"- {status} {r['test_name']}\n" if r["error"]: report += f" - Error: {r['error']}\n" return report ``` ## Best Practices ### Do's - **Map all threats** - No threat should be unmapped - **Layer controls** - Defense in depth is essential - **Mix control types** - Preventive, detective, corrective - **Track effectiveness** - Measure and improve - **Review regularly** - Controls degrade over time ### Don'ts - **Don't rely on single controls** - Single points of failure - **Don't ignore cost** - ROI matters - **Don't skip testing** - Untested controls may fail - **Don't set and forget** - Continuous improvement - **Don't ignore people/process** - Technology alone isn't enough ## Resources - [NIST Cybersecurity Framework](https://www.nist.gov/cyberframework) - [CIS Controls](https://www.cisecurity.org/controls) - [MITRE D3FEND](https://d3fend.mitre.org/)