--- model: claude-sonnet-4-0 --- # Security Scan and Vulnerability Assessment You are a security expert specializing in application security, vulnerability assessment, and secure coding practices. Perform comprehensive security audits to identify vulnerabilities, provide remediation guidance, and implement security best practices. ## Context The user needs a thorough security analysis to identify vulnerabilities, assess risks, and implement protection measures. Focus on OWASP Top 10, dependency vulnerabilities, and security misconfigurations with actionable remediation steps. ## Requirements $ARGUMENTS ## Instructions ### 1. Security Scanning Tool Selection Choose appropriate security scanning tools based on your technology stack and requirements: **Tool Selection Matrix** ```python security_tools = { 'python': { 'sast': { 'bandit': { 'strengths': ['Built for Python', 'Fast', 'Good defaults', 'AST-based'], 'best_for': ['Python codebases', 'CI/CD pipelines', 'Quick scans'], 'command': 'bandit -r . -f json -o bandit-report.json', 'config_file': '.bandit' }, 'semgrep': { 'strengths': ['Multi-language', 'Custom rules', 'Low false positives'], 'best_for': ['Complex projects', 'Custom security patterns', 'Enterprise'], 'command': 'semgrep --config=auto --json --output=semgrep-report.json', 'config_file': '.semgrep.yml' } }, 'dependency_scan': { 'safety': { 'command': 'safety check --json --output safety-report.json', 'database': 'PyUp.io vulnerability database', 'best_for': 'Python package vulnerabilities' }, 'pip_audit': { 'command': 'pip-audit --format=json --output=pip-audit-report.json', 'database': 'OSV database', 'best_for': 'Comprehensive Python vulnerability scanning' } } }, 'javascript': { 'sast': { 'eslint_security': { 'command': 'eslint . --ext .js,.jsx,.ts,.tsx --format json > eslint-security.json', 'plugins': ['@eslint/plugin-security', 'eslint-plugin-no-secrets'], 'best_for': 'JavaScript/TypeScript security linting' }, 'sonarjs': { 'command': 'sonar-scanner -Dsonar.projectKey=myproject', 'best_for': 'Comprehensive code quality and security', 'features': ['Vulnerability detection', 'Code smells', 'Technical debt'] } }, 'dependency_scan': { 'npm_audit': { 'command': 'npm audit --json > npm-audit-report.json', 'fix': 'npm audit fix', 'best_for': 'NPM package vulnerabilities' }, 'yarn_audit': { 'command': 'yarn audit --json > yarn-audit-report.json', 'best_for': 'Yarn package vulnerabilities' }, 'snyk': { 'command': 'snyk test --json > snyk-report.json', 'fix': 'snyk wizard', 'best_for': 'Comprehensive vulnerability management' } } }, 'container': { 'trivy': { 'image_scan': 'trivy image --format json --output trivy-image.json myimage:latest', 'fs_scan': 'trivy fs --format json --output trivy-fs.json .', 'repo_scan': 'trivy repo --format json --output trivy-repo.json .', 'strengths': ['Fast', 'Accurate', 'Multiple targets', 'SBOM generation'], 'best_for': 'Container and filesystem vulnerability scanning' }, 'grype': { 'command': 'grype dir:. -o json > grype-report.json', 'strengths': ['Fast', 'Accurate vulnerability detection'], 'best_for': 'Container image and filesystem scanning' }, 'clair': { 'api_based': True, 'strengths': ['API-driven', 'Continuous monitoring'], 'best_for': 'Registry integration, automated scanning' } }, 'infrastructure': { 'checkov': { 'command': 'checkov -d . --framework terraform --output json > checkov-report.json', 'supports': ['Terraform', 'CloudFormation', 'Kubernetes', 'Helm', 'Serverless'], 'best_for': 'Infrastructure as Code security' }, 'tfsec': { 'command': 'tfsec . --format json > tfsec-report.json', 'supports': ['Terraform'], 'best_for': 'Terraform-specific security scanning' }, 'kube_score': { 'command': 'kube-score score *.yaml --output-format json > kube-score.json', 'supports': ['Kubernetes'], 'best_for': 'Kubernetes manifest security and best practices' } }, 'secrets': { 'truffleHog': { 'command': 'trufflehog git file://. --json > trufflehog-report.json', 'strengths': ['Git history scanning', 'High accuracy', 'Custom regex'], 'best_for': 'Secret detection in git repositories' }, 'gitleaks': { 'command': 'gitleaks detect --report-format json --report-path gitleaks-report.json', 'strengths': ['Fast', 'Configurable', 'Pre-commit hooks'], 'best_for': 'Real-time secret detection' }, 'detect_secrets': { 'command': 'detect-secrets scan --all-files . > .secrets.baseline', 'strengths': ['Baseline management', 'False positive reduction'], 'best_for': 'Enterprise secret management' } } } ``` **Multi-Tool Security Scanner** ```python import json import subprocess import os from pathlib import Path from typing import Dict, List, Any from dataclasses import dataclass from datetime import datetime @dataclass class VulnerabilityFinding: tool: str severity: str category: str title: str description: str file_path: str line_number: int cve: str cwe: str remediation: str confidence: str class SecurityScanner: def __init__(self, project_path: str): self.project_path = Path(project_path) self.findings = [] self.scan_results = {} def detect_project_type(self) -> List[str]: """Detect project technologies to choose appropriate scanners""" technologies = [] # Python if (self.project_path / 'requirements.txt').exists() or \ (self.project_path / 'setup.py').exists() or \ (self.project_path / 'pyproject.toml').exists(): technologies.append('python') # JavaScript/Node.js if (self.project_path / 'package.json').exists(): technologies.append('javascript') # Go if (self.project_path / 'go.mod').exists(): technologies.append('golang') # Docker if (self.project_path / 'Dockerfile').exists(): technologies.append('container') # Terraform if list(self.project_path.glob('*.tf')): technologies.append('terraform') # Kubernetes if list(self.project_path.glob('*.yaml')) or list(self.project_path.glob('*.yml')): technologies.append('kubernetes') return technologies def run_comprehensive_scan(self) -> Dict[str, Any]: """Run all applicable security scanners""" technologies = self.detect_project_type() scan_plan = { 'timestamp': datetime.now().isoformat(), 'technologies': technologies, 'scanners_used': [], 'findings': [] } # Always run secret detection self.run_secret_scan() scan_plan['scanners_used'].append('secret_detection') # Technology-specific scans if 'python' in technologies: self.run_python_scans() scan_plan['scanners_used'].extend(['bandit', 'safety', 'pip_audit']) if 'javascript' in technologies: self.run_javascript_scans() scan_plan['scanners_used'].extend(['eslint_security', 'npm_audit']) if 'container' in technologies: self.run_container_scans() scan_plan['scanners_used'].append('trivy') if 'terraform' in technologies: self.run_terraform_scans() scan_plan['scanners_used'].extend(['checkov', 'tfsec']) # Generate unified report scan_plan['findings'] = self.findings scan_plan['summary'] = self.generate_summary() return scan_plan def run_secret_scan(self): """Run secret detection tools""" try: # TruffleHog result = subprocess.run([ 'trufflehog', 'filesystem', str(self.project_path), '--json', '--no-update' ], capture_output=True, text=True, timeout=300) if result.stdout: for line in result.stdout.strip().split('\n'): if line: finding = json.loads(line) self.findings.append(VulnerabilityFinding( tool='trufflehog', severity='CRITICAL', category='secrets', title=f"Secret detected: {finding.get('DetectorName', 'Unknown')}", description=finding.get('Raw', ''), file_path=finding.get('SourceMetadata', {}).get('Data', {}).get('Filesystem', {}).get('file', ''), line_number=finding.get('SourceMetadata', {}).get('Data', {}).get('Filesystem', {}).get('line', 0), cve='', cwe='CWE-798', remediation='Remove secret and rotate credentials', confidence=str(finding.get('Verified', False)) )) except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError): print("TruffleHog not available or scan failed") try: # GitLeaks result = subprocess.run([ 'gitleaks', 'detect', '--source', str(self.project_path), '--report-format', 'json', '--no-git' ], capture_output=True, text=True, timeout=300) if result.stdout: findings = json.loads(result.stdout) for finding in findings: self.findings.append(VulnerabilityFinding( tool='gitleaks', severity='HIGH', category='secrets', title=f"Secret pattern: {finding.get('RuleID', 'Unknown')}", description=finding.get('Description', ''), file_path=finding.get('File', ''), line_number=finding.get('StartLine', 0), cve='', cwe='CWE-798', remediation='Remove secret and add to .gitignore', confidence='high' )) except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError): print("GitLeaks not available or scan failed") def run_python_scans(self): """Run Python-specific security scanners""" # Bandit try: result = subprocess.run([ 'bandit', '-r', str(self.project_path), '-f', 'json', '--severity-level', 'medium' ], capture_output=True, text=True, timeout=300) if result.stdout: bandit_results = json.loads(result.stdout) for result_item in bandit_results.get('results', []): self.findings.append(VulnerabilityFinding( tool='bandit', severity=result_item.get('issue_severity', 'MEDIUM'), category='sast', title=result_item.get('test_name', ''), description=result_item.get('issue_text', ''), file_path=result_item.get('filename', ''), line_number=result_item.get('line_number', 0), cve='', cwe=result_item.get('test_id', ''), remediation=result_item.get('more_info', ''), confidence=result_item.get('issue_confidence', 'MEDIUM') )) except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError): print("Bandit not available or scan failed") # Safety try: result = subprocess.run([ 'safety', 'check', '--json' ], capture_output=True, text=True, timeout=300, cwd=self.project_path) if result.stdout: safety_results = json.loads(result.stdout) for vuln in safety_results: self.findings.append(VulnerabilityFinding( tool='safety', severity='HIGH', category='dependencies', title=f"Vulnerable package: {vuln.get('package_name', '')}", description=vuln.get('advisory', ''), file_path='requirements.txt', line_number=0, cve=vuln.get('cve', ''), cwe='', remediation=f"Update to version {vuln.get('analyzed_version', 'latest')}", confidence='high' )) except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError): print("Safety not available or scan failed") def generate_summary(self) -> Dict[str, Any]: """Generate summary statistics""" severity_counts = {'CRITICAL': 0, 'HIGH': 0, 'MEDIUM': 0, 'LOW': 0} category_counts = {} for finding in self.findings: severity_counts[finding.severity] = severity_counts.get(finding.severity, 0) + 1 category_counts[finding.category] = category_counts.get(finding.category, 0) + 1 return { 'total_findings': len(self.findings), 'severity_breakdown': severity_counts, 'category_breakdown': category_counts, 'risk_score': self.calculate_risk_score(severity_counts) } def calculate_risk_score(self, severity_counts: Dict[str, int]) -> int: """Calculate overall risk score (0-100)""" weights = {'CRITICAL': 10, 'HIGH': 7, 'MEDIUM': 4, 'LOW': 1} total_score = sum(weights[severity] * count for severity, count in severity_counts.items()) max_possible = 100 # Arbitrary ceiling return min(100, int((total_score / max_possible) * 100)) ``` **SAST (Static Application Security Testing)** ```python # Enhanced code vulnerability patterns with tool-specific implementations security_rules = { "sql_injection": { "patterns": [ r"query\s*\(\s*[\"'].*\+.*[\"']\s*\)", r"execute\s*\(\s*[\"'].*%[s|d].*[\"']\s*%", r"f[\"'].*SELECT.*{.*}.*FROM" ], "severity": "CRITICAL", "cwe": "CWE-89", "fix": "Use parameterized queries or prepared statements" "xss": { "patterns": [ r"innerHTML\s*=\s*[^\"']*\+", r"document\.write\s*\([^\"']*\+", r"dangerouslySetInnerHTML", r"v-html\s*=\s*[\"'][^\"']*\{" ], "severity": "HIGH", "cwe": "CWE-79", "fix": "Sanitize user input and use safe rendering methods" }, "hardcoded_secrets": { "patterns": [ r"(?i)(api[_-]?key|apikey|secret|password)\s*[:=]\s*[\"'][^\"']{8,}[\"']", r"(?i)bearer\s+[a-zA-Z0-9\-\._~\+\/]{20,}", r"(?i)(aws[_-]?access[_-]?key[_-]?id|aws[_-]?secret)\s*[:=]", r"private[_-]?key\s*[:=]\s*[\"'][^\"']+[\"']" ], "severity": "CRITICAL", "cwe": "CWE-798", "fix": "Use environment variables or secure key management service" }, "path_traversal": { "patterns": [ r"\.\.\/", r"readFile\s*\([^\"']*\+", r"include\s*\([^\"']*\$", r"require\s*\([^\"']*\+" ], "severity": "HIGH", "cwe": "CWE-22", "fix": "Validate and sanitize file paths" }, "insecure_random": { "patterns": [ r"Math\.random\(\)", r"rand\(\)", r"mt_rand\(\)" ], "severity": "MEDIUM", "cwe": "CWE-330", "fix": "Use cryptographically secure random functions" } } def scan_code_vulnerabilities(file_path, content): """ Enhanced code vulnerability scanning with framework-specific patterns """ vulnerabilities = [] for vuln_type, rule in security_rules.items(): for pattern in rule['patterns']: matches = re.finditer(pattern, content, re.MULTILINE) for match in matches: line_num = content[:match.start()].count('\n') + 1 vulnerabilities.append({ 'type': vuln_type, 'severity': rule['severity'], 'file': file_path, 'line': line_num, 'code': match.group(0), 'cwe': rule['cwe'], 'fix': rule['fix'], 'confidence': rule.get('confidence', 'medium'), 'owasp_category': rule.get('owasp', 'A03:2021-Injection') }) return vulnerabilities # Framework-specific security patterns framework_security_patterns = { 'django': { 'csrf_exempt': { 'pattern': r'@csrf_exempt', 'severity': 'HIGH', 'description': 'CSRF protection disabled', 'fix': 'Remove @csrf_exempt decorator and implement proper CSRF protection' }, 'raw_sql': { 'pattern': r'\.raw\(["\'][^"\']\*["\']\)', 'severity': 'HIGH', 'description': 'Raw SQL query detected', 'fix': 'Use Django ORM or parameterized queries' }, 'eval_usage': { 'pattern': r'eval\(', 'severity': 'CRITICAL', 'description': 'Code evaluation detected', 'fix': 'Remove eval() usage and use safe alternatives' } }, 'flask': { 'debug_mode': { 'pattern': r'debug\s*=\s*True', 'severity': 'MEDIUM', 'description': 'Debug mode enabled in production', 'fix': 'Set debug=False in production' }, 'render_template_string': { 'pattern': r'render_template_string\([^)]*\+', 'severity': 'HIGH', 'description': 'Template injection vulnerability', 'fix': 'Use render_template with static templates' } }, 'react': { 'dangerous_html': { 'pattern': r'dangerouslySetInnerHTML', 'severity': 'HIGH', 'description': 'XSS vulnerability through innerHTML', 'fix': 'Sanitize HTML content or use safe rendering' }, 'eval_usage': { 'pattern': r'\beval\(', 'severity': 'CRITICAL', 'description': 'Code evaluation detected', 'fix': 'Remove eval() usage' } }, 'express': { 'missing_helmet': { 'pattern': r'express\(\)', 'negative_pattern': r'helmet\(\)', 'severity': 'MEDIUM', 'description': 'Security headers middleware missing', 'fix': 'Add helmet() middleware for security headers' }, 'cors_wildcard': { 'pattern': r'origin:\s*["\']\*["\']', 'severity': 'HIGH', 'description': 'CORS configured with wildcard origin', 'fix': 'Specify exact allowed origins' } } } def scan_framework_vulnerabilities(framework, file_path, content): """Scan for framework-specific security issues""" vulnerabilities = [] if framework not in framework_security_patterns: return vulnerabilities patterns = framework_security_patterns[framework] for vuln_type, rule in patterns.items(): matches = re.finditer(rule['pattern'], content, re.MULTILINE) # Check for negative patterns (e.g., missing security middleware) if 'negative_pattern' in rule: if not re.search(rule['negative_pattern'], content): vulnerabilities.append({ 'type': f'{framework}_{vuln_type}', 'severity': rule['severity'], 'file': file_path, 'description': rule['description'], 'fix': rule['fix'], 'framework': framework }) else: for match in matches: line_num = content[:match.start()].count('\n') + 1 vulnerabilities.append({ 'type': f'{framework}_{vuln_type}', 'severity': rule['severity'], 'file': file_path, 'line': line_num, 'code': match.group(0), 'description': rule['description'], 'fix': rule['fix'], 'framework': framework }) return vulnerabilities ``` **Advanced Dependency Vulnerability Scanning** ```python import subprocess import json import requests from typing import Dict, List, Any from datetime import datetime, timedelta class DependencyScanner: def __init__(self): self.vulnerability_databases = { 'osv': 'https://api.osv.dev/v1/query', 'snyk': 'https://api.snyk.io/v1/test', 'github': 'https://api.github.com/advisories' } def scan_all_ecosystems(self, project_path: str) -> Dict[str, Any]: """Comprehensive dependency scanning across all package managers""" results = { 'timestamp': datetime.now().isoformat(), 'ecosystems': {}, 'summary': {'total_vulnerabilities': 0, 'critical': 0, 'high': 0, 'medium': 0, 'low': 0} } # Detect and scan each ecosystem ecosystems = self.detect_ecosystems(project_path) for ecosystem in ecosystems: results['ecosystems'][ecosystem] = self.scan_ecosystem(ecosystem, project_path) self.update_summary(results['summary'], results['ecosystems'][ecosystem]) return results def detect_ecosystems(self, project_path: str) -> List[str]: """Detect package managers and dependency files""" ecosystems = [] ecosystem_files = { 'npm': ['package.json', 'package-lock.json', 'yarn.lock'], 'pip': ['requirements.txt', 'setup.py', 'pyproject.toml', 'Pipfile'], 'maven': ['pom.xml'], 'gradle': ['build.gradle', 'build.gradle.kts'], 'gem': ['Gemfile', 'Gemfile.lock'], 'composer': ['composer.json', 'composer.lock'], 'nuget': ['*.csproj', 'packages.config'], 'go': ['go.mod', 'go.sum'], 'rust': ['Cargo.toml', 'Cargo.lock'] } for ecosystem, files in ecosystem_files.items(): if any(Path(project_path).glob(f) for f in files): ecosystems.append(ecosystem) return ecosystems def scan_npm_dependencies(self, project_path: str) -> Dict[str, Any]: """Scan NPM dependencies using multiple tools""" results = { 'tool_results': {}, 'vulnerabilities': [], 'total_packages': 0, 'outdated_packages': [] } # NPM Audit try: npm_result = subprocess.run( ['npm', 'audit', '--json'], cwd=project_path, capture_output=True, text=True, timeout=120 ) if npm_result.stdout: audit_data = json.loads(npm_result.stdout) results['tool_results']['npm_audit'] = audit_data for vuln_id, vuln in audit_data.get('vulnerabilities', {}).items(): results['vulnerabilities'].append({ 'id': vuln_id, 'severity': vuln.get('severity', 'unknown'), 'title': vuln.get('title', ''), 'package': vuln.get('name', ''), 'version': vuln.get('range', ''), 'cwe': vuln.get('cwe', []), 'cve': vuln.get('cves', []), 'fixed_in': vuln.get('fixAvailable', ''), 'source': 'npm_audit' }) except (subprocess.TimeoutExpired, subprocess.CalledProcessError, json.JSONDecodeError): results['tool_results']['npm_audit'] = {'error': 'Failed to run npm audit'} # Snyk scan (if available) try: snyk_result = subprocess.run( ['snyk', 'test', '--json'], cwd=project_path, capture_output=True, text=True, timeout=180 ) if snyk_result.stdout: snyk_data = json.loads(snyk_result.stdout) results['tool_results']['snyk'] = snyk_data for vuln in snyk_data.get('vulnerabilities', []): results['vulnerabilities'].append({ 'id': vuln.get('id', ''), 'severity': vuln.get('severity', 'unknown'), 'title': vuln.get('title', ''), 'package': vuln.get('packageName', ''), 'version': vuln.get('version', ''), 'cve': vuln.get('identifiers', {}).get('CVE', []), 'cwe': vuln.get('identifiers', {}).get('CWE', []), 'upgrade_path': vuln.get('upgradePath', []), 'source': 'snyk' }) except (subprocess.TimeoutExpired, subprocess.CalledProcessError, json.JSONDecodeError): results['tool_results']['snyk'] = {'error': 'Snyk not available or failed'} return results def scan_python_dependencies(self, project_path: str) -> Dict[str, Any]: """Comprehensive Python dependency scanning""" results = { 'tool_results': {}, 'vulnerabilities': [], 'license_issues': [] } # Safety scan try: safety_result = subprocess.run( ['safety', 'check', '--json'], cwd=project_path, capture_output=True, text=True, timeout=120 ) if safety_result.stdout: safety_data = json.loads(safety_result.stdout) results['tool_results']['safety'] = safety_data for vuln in safety_data: results['vulnerabilities'].append({ 'package': vuln.get('package_name', ''), 'version': vuln.get('analyzed_version', ''), 'vulnerability_id': vuln.get('vulnerability_id', ''), 'advisory': vuln.get('advisory', ''), 'cve': vuln.get('cve', ''), 'severity': self.map_safety_severity(vuln.get('vulnerability_id', '')), 'source': 'safety' }) except (subprocess.TimeoutExpired, subprocess.CalledProcessError, json.JSONDecodeError): results['tool_results']['safety'] = {'error': 'Safety scan failed'} # pip-audit scan try: pip_audit_result = subprocess.run( ['pip-audit', '--format=json'], cwd=project_path, capture_output=True, text=True, timeout=120 ) if pip_audit_result.stdout: pip_audit_data = json.loads(pip_audit_result.stdout) results['tool_results']['pip_audit'] = pip_audit_data for vuln in pip_audit_data.get('vulnerabilities', []): results['vulnerabilities'].append({ 'package': vuln.get('package', ''), 'version': vuln.get('version', ''), 'vulnerability_id': vuln.get('id', ''), 'description': vuln.get('description', ''), 'aliases': vuln.get('aliases', []), 'fix_versions': vuln.get('fix_versions', []), 'source': 'pip_audit' }) except (subprocess.TimeoutExpired, subprocess.CalledProcessError, json.JSONDecodeError): results['tool_results']['pip_audit'] = {'error': 'pip-audit not available'} return results def generate_remediation_plan(self, vulnerabilities: List[Dict]) -> Dict[str, Any]: """Generate prioritized remediation plan""" plan = { 'immediate_actions': [], 'short_term': [], 'long_term': [], 'automation_scripts': {} } # Sort by severity critical_high = [v for v in vulnerabilities if v.get('severity', '').upper() in ['CRITICAL', 'HIGH']] medium = [v for v in vulnerabilities if v.get('severity', '').upper() == 'MEDIUM'] low = [v for v in vulnerabilities if v.get('severity', '').upper() == 'LOW'] # Immediate actions for critical/high for vuln in critical_high: plan['immediate_actions'].append({ 'package': vuln.get('package', ''), 'current_version': vuln.get('version', ''), 'fixed_version': vuln.get('fixed_in', vuln.get('fix_versions', ['latest'])[0] if vuln.get('fix_versions') else 'latest'), 'action': f"Update {vuln.get('package', '')} to {vuln.get('fixed_in', 'latest')}", 'priority': 1, 'effort': 'Low' }) # Auto-update script plan['automation_scripts']['npm_auto_update'] = """ #!/bin/bash # Automated npm dependency updates npm audit fix --force npm update npm audit """ plan['automation_scripts']['pip_auto_update'] = """ #!/bin/bash # Automated Python dependency updates pip install --upgrade pip pip-audit --fix safety check """ return plan # Example usage with specific package managers npm_audit_example = { "dependencies": { "lodash": { "version": "4.17.15", "vulnerabilities": [{ "severity": "HIGH", "cve": "CVE-2021-23337", "description": "Command Injection in lodash", "fixed_in": "4.17.21", "recommendation": "npm install lodash@4.17.21", "automated_fix": "npm audit fix" }] }, "@types/node": { "version": "14.0.0", "vulnerabilities": [], "outdated": True, "latest_version": "20.0.0", "recommendation": "npm install @types/node@latest" } }, "summary": { "total_packages": 847, "vulnerable_packages": 12, "outdated_packages": 45, "license_issues": 3 } } # Python requirements scan # Container Image Vulnerability Scanning def scan_container_vulnerabilities(image_name: str) -> Dict[str, Any]: """ Comprehensive container vulnerability scanning using multiple tools """ results = { 'image': image_name, 'scan_results': {}, 'vulnerabilities': [], 'sbom': {}, 'compliance_checks': {} } # Trivy scan try: trivy_result = subprocess.run([ 'trivy', 'image', '--format', 'json', '--security-checks', 'vuln,config,secret', image_name ], capture_output=True, text=True, timeout=300) if trivy_result.stdout: trivy_data = json.loads(trivy_result.stdout) results['scan_results']['trivy'] = trivy_data for result in trivy_data.get('Results', []): for vuln in result.get('Vulnerabilities', []): results['vulnerabilities'].append({ 'package': vuln.get('PkgName', ''), 'version': vuln.get('InstalledVersion', ''), 'vulnerability_id': vuln.get('VulnerabilityID', ''), 'severity': vuln.get('Severity', 'UNKNOWN'), 'title': vuln.get('Title', ''), 'description': vuln.get('Description', ''), 'fixed_version': vuln.get('FixedVersion', ''), 'source': 'trivy' }) except (subprocess.TimeoutExpired, subprocess.CalledProcessError, json.JSONDecodeError): results['scan_results']['trivy'] = {'error': 'Trivy scan failed'} # Generate SBOM (Software Bill of Materials) try: sbom_result = subprocess.run([ 'trivy', 'image', '--format', 'spdx-json', image_name ], capture_output=True, text=True, timeout=180) if sbom_result.stdout: results['sbom'] = json.loads(sbom_result.stdout) except (subprocess.TimeoutExpired, subprocess.CalledProcessError, json.JSONDecodeError): results['sbom'] = {'error': 'SBOM generation failed'} return results # Multi-ecosystem scanner class UniversalDependencyScanner: def __init__(self): self.scanners = { 'python': self.scan_python_dependencies, 'javascript': self.scan_npm_dependencies, 'java': self.scan_java_dependencies, 'go': self.scan_go_dependencies, 'rust': self.scan_rust_dependencies, 'container': self.scan_container_image } def scan_python_dependencies(self, project_path: str) -> Dict[str, Any]: """ Enhanced Python dependency scanning with multiple tools """ results = { 'tools_used': ['safety', 'pip-audit', 'bandit'], 'vulnerabilities': [], 'license_compliance': [], 'outdated_packages': [] } # Safety check try: safety_cmd = ['safety', 'check', '--json', '--full-report'] result = subprocess.run(safety_cmd, capture_output=True, text=True, timeout=120) if result.stdout: safety_data = json.loads(result.stdout) for vuln in safety_data: results['vulnerabilities'].append({ 'tool': 'safety', 'package': vuln.get('package_name'), 'version': vuln.get('analyzed_version'), 'vulnerability_id': vuln.get('vulnerability_id'), 'severity': self._map_safety_severity(vuln.get('vulnerability_id')), 'advisory': vuln.get('advisory'), 'cve': vuln.get('cve'), 'remediation': f"Update to {vuln.get('fixed_version', 'latest version')}" }) except Exception as e: results['safety_error'] = str(e) # pip-audit try: pip_audit_cmd = ['pip-audit', '--format=json', '--desc'] result = subprocess.run(pip_audit_cmd, capture_output=True, text=True, timeout=120) if result.stdout: pip_audit_data = json.loads(result.stdout) for vuln in pip_audit_data.get('vulnerabilities', []): results['vulnerabilities'].append({ 'tool': 'pip-audit', 'package': vuln.get('package'), 'version': vuln.get('version'), 'vulnerability_id': vuln.get('id'), 'severity': self._calculate_severity_from_cvss(vuln.get('fix_versions', [])), 'description': vuln.get('description'), 'aliases': vuln.get('aliases', []), 'fix_versions': vuln.get('fix_versions', []), 'remediation': f"Update to one of: {', '.join(vuln.get('fix_versions', ['latest']))}" }) except Exception as e: results['pip_audit_error'] = str(e) # License compliance check try: pip_licenses_result = subprocess.run( ['pip-licenses', '--format=json'], capture_output=True, text=True, timeout=60 ) if pip_licenses_result.stdout: licenses_data = json.loads(pip_licenses_result.stdout) problematic_licenses = ['GPL', 'AGPL', 'SSPL', 'BUSL'] for package in licenses_data: license_name = package.get('License', 'Unknown') if any(prob in license_name.upper() for prob in problematic_licenses): results['license_compliance'].append({ 'package': package.get('Name'), 'version': package.get('Version'), 'license': license_name, 'issue': 'Potentially problematic license for commercial use', 'action': 'Review license compatibility' }) except Exception as e: results['license_error'] = str(e) return results def _map_safety_severity(self, vuln_id: str) -> str: """Map Safety vulnerability ID to severity level""" # Safety uses numeric IDs, we can implement CVSS mapping # This is a simplified mapping - in practice, use CVSS scores high_risk_patterns = ['injection', 'rce', 'deserialization'] if any(pattern in vuln_id.lower() for pattern in high_risk_patterns): return 'CRITICAL' return 'HIGH' # Default for Safety findings def _calculate_severity_from_cvss(self, fix_versions: list) -> str: """Calculate severity based on fix version availability""" if not fix_versions: return 'HIGH' # No fix available return 'MEDIUM' # Fix available ``` ### 2. OWASP Top 10 Assessment Check for OWASP Top 10 vulnerabilities: **A01: Broken Access Control** ```python # Check for missing authentication def check_access_control(): findings = [] # API endpoints without auth unprotected_endpoints = [ {'path': '/api/admin/*', 'method': 'GET', 'auth': False}, {'path': '/api/users/delete', 'method': 'POST', 'auth': False} ] # Insecure direct object references idor_patterns = [ r"user_id\s*=\s*request\.(GET|POST)\[", r"WHERE\s+id\s*=\s*\$_GET\[", r"findById\(req\.params\.id\)" ] # Missing authorization checks missing_authz = [ {'file': 'routes/admin.js', 'line': 45, 'issue': 'No role check'}, {'file': 'api/delete.py', 'line': 12, 'issue': 'No ownership validation'} ] return findings ``` **A02: Cryptographic Failures** ```python # Check encryption and hashing crypto_issues = { "weak_hashing": [ {"algorithm": "MD5", "usage": "password hashing", "severity": "CRITICAL"}, {"algorithm": "SHA1", "usage": "token generation", "severity": "HIGH"} ], "insecure_storage": [ {"data": "credit cards", "storage": "plain text in database"}, {"data": "SSN", "storage": "base64 encoded only"} ], "missing_encryption": [ {"connection": "database", "protocol": "unencrypted TCP"}, {"api": "payment service", "protocol": "HTTP"} ], "weak_tls": [ {"version": "TLS 1.0", "recommendation": "Use TLS 1.2+"}, {"cipher": "DES-CBC3-SHA", "recommendation": "Use ECDHE-RSA-AES256-GCM-SHA384"} ] } ``` **A03: Injection** ```python # SQL Injection detection sql_injection_tests = [ {"payload": "' OR '1'='1", "vulnerable": True}, {"payload": "'; DROP TABLE users; --", "vulnerable": True}, {"payload": "1' UNION SELECT * FROM users--", "vulnerable": False} ] # NoSQL Injection nosql_injection = { "mongodb": [ {"query": "db.users.find({username: req.body.username})", "vulnerable": True}, {"fix": "db.users.find({username: {$eq: req.body.username}})"} ] } # Command Injection command_injection = [ { "code": "exec('ping ' + user_input)", "vulnerability": "Direct command execution with user input", "fix": "Use subprocess with shell=False and validate input" } ] ``` ### 3. Infrastructure Security Scan infrastructure and configuration: **Container Security** ```dockerfile # Dockerfile security scan FROM node:14 # ISSUE: Using non-specific tag USER root # ISSUE: Running as root # ISSUE: Installing packages without version pinning RUN apt-get update && apt-get install -y curl # ISSUE: Copying sensitive files COPY . /app COPY .env /app/.env # CRITICAL: Copying secrets # ISSUE: Not dropping privileges CMD ["node", "server.js"] # Secure version: FROM node:14.17.6-alpine AS builder RUN apk add --no-cache python3 make g++ WORKDIR /app COPY package*.json ./ RUN npm ci --only=production FROM node:14.17.6-alpine RUN addgroup -g 1001 -S nodejs && adduser -S nodejs -u 1001 USER nodejs WORKDIR /app COPY --from=builder --chown=nodejs:nodejs /app/node_modules ./node_modules COPY --chown=nodejs:nodejs . . EXPOSE 3000 CMD ["node", "server.js"] ``` **Kubernetes Security** ```yaml # Pod Security Policy apiVersion: policy/v1beta1 kind: PodSecurityPolicy metadata: name: restricted spec: privileged: false allowPrivilegeEscalation: false requiredDropCapabilities: - ALL volumes: - 'configMap' - 'emptyDir' - 'projected' - 'secret' - 'downwardAPI' - 'persistentVolumeClaim' runAsUser: rule: 'MustRunAsNonRoot' seLinux: rule: 'RunAsAny' fsGroup: rule: 'RunAsAny' readOnlyRootFilesystem: true ``` ### 4. API Security Comprehensive API security testing: **Authentication & Authorization** ```python # JWT Security Issues jwt_vulnerabilities = { "weak_secret": { "issue": "JWT signed with weak secret 'secret123'", "severity": "CRITICAL", "fix": "Use strong 256-bit secret from environment" }, "algorithm_confusion": { "issue": "JWT accepts 'none' algorithm", "severity": "CRITICAL", "fix": "Explicitly verify algorithm: ['HS256', 'RS256']" }, "missing_expiration": { "issue": "JWT tokens never expire", "severity": "HIGH", "fix": "Set exp claim to reasonable duration (e.g., 1 hour)" } } # API Rate Limiting rate_limit_config = { "endpoints": { "/api/login": {"limit": 5, "window": "5m", "status": "NOT_CONFIGURED"}, "/api/password-reset": {"limit": 3, "window": "1h", "status": "NOT_CONFIGURED"}, "/api/data": {"limit": 100, "window": "1m", "status": "OK"} } } ``` **Input Validation** ```python # API Input Validation Issues validation_issues = [ { "endpoint": "/api/users", "method": "POST", "field": "email", "issue": "No email format validation", "exploit": "user@.com" }, { "endpoint": "/api/upload", "method": "POST", "field": "file", "issue": "No file type validation", "exploit": "shell.php renamed to image.jpg" } ] ``` ### 5. Secret Detection Scan for exposed secrets and credentials: **Secret Patterns** ```python secret_patterns = { "aws_access_key": r"AKIA[0-9A-Z]{16}", "aws_secret_key": r"[0-9a-zA-Z/+=]{40}", "github_token": r"ghp_[0-9a-zA-Z]{36}", "stripe_key": r"sk_live_[0-9a-zA-Z]{24}", "private_key": r"-----BEGIN (RSA |EC )?PRIVATE KEY-----", "google_api": r"AIza[0-9A-Za-z\-_]{35}", "jwt_token": r"eyJ[A-Za-z0-9-_=]+\.eyJ[A-Za-z0-9-_=]+\.[A-Za-z0-9-_.+/=]+", "slack_webhook": r"https://hooks\.slack\.com/services/[A-Z0-9]{9}/[A-Z0-9]{9}/[a-zA-Z0-9]{24}" } # Git history scan def scan_git_history(): """ Scan git history for accidentally committed secrets """ import subprocess # Get all commits commits = subprocess.run( ['git', 'log', '--pretty=format:%H'], capture_output=True, text=True ).stdout.split('\n') secrets_found = [] for commit in commits[:100]: # Last 100 commits diff = subprocess.run( ['git', 'show', commit], capture_output=True, text=True ).stdout for secret_type, pattern in secret_patterns.items(): if re.search(pattern, diff): secrets_found.append({ 'commit': commit, 'type': secret_type, 'action': 'Remove from history and rotate credential' }) return secrets_found ``` ### 6. Security Headers Check HTTP security headers: **Header Configuration** ```python security_headers = { "Strict-Transport-Security": { "required": True, "value": "max-age=31536000; includeSubDomains; preload", "missing_impact": "Vulnerable to protocol downgrade attacks" }, "X-Content-Type-Options": { "required": True, "value": "nosniff", "missing_impact": "Vulnerable to MIME type confusion attacks" }, "X-Frame-Options": { "required": True, "value": "DENY", "missing_impact": "Vulnerable to clickjacking" }, "Content-Security-Policy": { "required": True, "value": "default-src 'self'; script-src 'self' 'unsafe-inline'", "missing_impact": "Vulnerable to XSS attacks" }, "X-XSS-Protection": { "required": False, # Deprecated "value": "0", "note": "Modern browsers have built-in XSS protection" }, "Referrer-Policy": { "required": True, "value": "strict-origin-when-cross-origin", "missing_impact": "May leak sensitive URLs" }, "Permissions-Policy": { "required": True, "value": "geolocation=(), microphone=(), camera=()", "missing_impact": "Allows access to sensitive browser features" } } ``` ### 7. Automated Remediation Implementation Provide intelligent, automated fixes with safety validation: **Smart Remediation Engine** ```python import ast import re import subprocess from typing import Dict, List, Any, Optional from dataclasses import dataclass from pathlib import Path @dataclass class RemediationAction: vulnerability_id: str action_type: str # 'dependency_update', 'code_fix', 'config_change' description: str risk_level: str # 'safe', 'low_risk', 'medium_risk', 'high_risk' automated: bool manual_steps: List[str] validation_tests: List[str] rollback_plan: str class AutomatedRemediationEngine: def __init__(self, project_path: str): self.project_path = Path(project_path) self.backup_created = False self.applied_fixes = [] def create_safety_backup(self) -> str: """Create git branch backup before applying fixes""" timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') backup_branch = f'security_backup_{timestamp}' try: subprocess.run(['git', 'checkout', '-b', backup_branch], cwd=self.project_path, check=True) subprocess.run(['git', 'checkout', '-'], cwd=self.project_path, check=True) self.backup_created = True return backup_branch except subprocess.CalledProcessError: raise Exception("Failed to create safety backup branch") def apply_automated_fixes(self, vulnerabilities: List[Dict]) -> List[RemediationAction]: """Apply safe automated fixes""" if not self.backup_created: self.create_safety_backup() actions = [] for vuln in vulnerabilities: action = self.generate_remediation_action(vuln) if action.automated and action.risk_level in ['safe', 'low_risk']: try: success = self.apply_fix(action) if success: actions.append(action) self.applied_fixes.append(action) except Exception as e: print(f"Failed to apply fix for {action.vulnerability_id}: {e}") else: actions.append(action) return actions def generate_remediation_action(self, vulnerability: Dict) -> RemediationAction: """Generate specific remediation action for vulnerability""" vuln_type = vulnerability.get('type', '') severity = vulnerability.get('severity', 'MEDIUM') if vuln_type == 'vulnerable_dependency': return self._fix_vulnerable_dependency(vulnerability) elif vuln_type == 'sql_injection': return self._fix_sql_injection(vulnerability) elif vuln_type == 'hardcoded_secrets': return self._fix_hardcoded_secrets(vulnerability) elif vuln_type == 'missing_security_headers': return self._fix_security_headers(vulnerability) else: return self._generic_fix(vulnerability) def _fix_vulnerable_dependency(self, vuln: Dict) -> RemediationAction: """Fix vulnerable dependencies automatically""" package = vuln.get('package', '') current_version = vuln.get('version', '') fixed_version = vuln.get('fixed_version', 'latest') # Determine package manager if (self.project_path / 'package.json').exists(): update_command = f'npm install {package}@{fixed_version}' ecosystem = 'npm' elif (self.project_path / 'requirements.txt').exists(): update_command = f'pip install {package}=={fixed_version}' ecosystem = 'pip' else: ecosystem = 'unknown' update_command = f'# Update {package} to {fixed_version}' return RemediationAction( vulnerability_id=vuln.get('id', ''), action_type='dependency_update', description=f'Update {package} from {current_version} to {fixed_version}', risk_level='safe', # Dependency updates are generally safe automated=True, manual_steps=[ f'Run: {update_command}', 'Test application functionality', 'Update lock file if needed' ], validation_tests=[ f'Check {package} version is {fixed_version}', 'Run regression tests', 'Verify no new vulnerabilities introduced' ], rollback_plan=f'Revert to {package}@{current_version}' ) def _fix_sql_injection(self, vuln: Dict) -> RemediationAction: """Fix SQL injection vulnerabilities""" file_path = vuln.get('file_path', '') line_number = vuln.get('line_number', 0) # Read the vulnerable code try: with open(self.project_path / file_path, 'r') as f: lines = f.readlines() vulnerable_line = lines[line_number - 1] if line_number > 0 else '' # Generate fix based on language and framework if file_path.endswith('.py'): fixed_code = self._fix_python_sql_injection(vulnerable_line) elif file_path.endswith('.js'): fixed_code = self._fix_javascript_sql_injection(vulnerable_line) else: fixed_code = '# Manual fix required' return RemediationAction( vulnerability_id=vuln.get('id', ''), action_type='code_fix', description=f'Fix SQL injection in {file_path}:{line_number}', risk_level='medium_risk', # Code changes need testing automated=False, # Require manual review manual_steps=[ f'Replace line {line_number} in {file_path}', f'Original: {vulnerable_line.strip()}', f'Fixed: {fixed_code}', 'Add input validation', 'Test with malicious inputs' ], validation_tests=[ 'SQL injection penetration testing', 'Unit tests for the affected function', 'Integration tests for the endpoint' ], rollback_plan=f'Revert changes to {file_path}' ) except Exception as e: return self._generic_fix(vuln) def _fix_python_sql_injection(self, vulnerable_line: str) -> str: """Generate Python SQL injection fix""" # Simple pattern matching for common cases if 'cursor.execute(' in vulnerable_line and '{}' in vulnerable_line: return vulnerable_line.replace('.format(', ', (').replace('{}', '?') elif 'query(' in vulnerable_line and '+' in vulnerable_line: return '# Use parameterized query: query("SELECT * FROM table WHERE id = ?", (user_id,))' return '# Replace with parameterized query' def _fix_hardcoded_secrets(self, vuln: Dict) -> RemediationAction: """Fix hardcoded secrets""" file_path = vuln.get('file_path', '') secret_type = vuln.get('secret_type', 'credential') return RemediationAction( vulnerability_id=vuln.get('id', ''), action_type='code_fix', description=f'Remove hardcoded {secret_type} from {file_path}', risk_level='high_risk', # Secrets need immediate attention automated=False, # Never automate secret removal manual_steps=[ f'Remove hardcoded secret from {file_path}', 'Add secret to environment variables or secret manager', 'Update code to read from environment', 'Rotate the exposed credential', 'Add {file_path} to .gitignore if needed', 'Scan git history for credential exposure' ], validation_tests=[ 'Verify application works with environment variable', 'Confirm no secrets in code', 'Test with invalid/missing environment variable' ], rollback_plan='Use temporary hardcoded value until proper secret management' ) def apply_fix(self, action: RemediationAction) -> bool: """Apply an automated fix""" if action.action_type == 'dependency_update': return self._apply_dependency_update(action) elif action.action_type == 'config_change': return self._apply_config_change(action) return False def _apply_dependency_update(self, action: RemediationAction) -> bool: """Apply dependency update""" try: # Extract update command from manual steps update_command = None for step in action.manual_steps: if step.startswith('Run: '): update_command = step[5:].split() break if update_command: result = subprocess.run( update_command, cwd=self.project_path, capture_output=True, text=True, timeout=300 ) if result.returncode == 0: print(f"Successfully applied: {action.description}") return True else: print(f"Failed to apply {action.description}: {result.stderr}") return False return False except Exception as e: print(f"Error applying fix: {e}") return False def generate_remediation_report(self, actions: List[RemediationAction]) -> str: """Generate comprehensive remediation report""" report = [] report.append("# Security Remediation Report\n") report.append(f"**Generated**: {datetime.now().isoformat()}\n") report.append(f"**Total Actions**: {len(actions)}\n") automated_count = sum(1 for a in actions if a.automated and a.risk_level in ['safe', 'low_risk']) manual_count = len(actions) - automated_count report.append(f"**Automated Fixes Applied**: {automated_count}\n") report.append(f"**Manual Actions Required**: {manual_count}\n\n") # Group by action type by_type = {} for action in actions: if action.action_type not in by_type: by_type[action.action_type] = [] by_type[action.action_type].append(action) for action_type, type_actions in by_type.items(): report.append(f"## {action_type.replace('_', ' ').title()}\n") for action in type_actions: report.append(f"### {action.description}\n") report.append(f"**Risk Level**: {action.risk_level}\n") report.append(f"**Automated**: {'✅' if action.automated else '❌'}\n") if action.manual_steps: report.append("**Manual Steps**:\n") for step in action.manual_steps: report.append(f"- {step}\n") if action.validation_tests: report.append("**Validation Tests**:\n") for test in action.validation_tests: report.append(f"- {test}\n") report.append(f"**Rollback**: {action.rollback_plan}\n\n") return ''.join(report) # Security Middleware Templates security_middleware_templates = { 'express': """ // Enhanced Express.js security middleware const helmet = require('helmet'); const rateLimit = require('express-rate-limit'); const mongoSanitize = require('express-mongo-sanitize'); const hpp = require('hpp'); const cors = require('cors'); // Content Security Policy app.use(helmet({ contentSecurityPolicy: { directives: { defaultSrc: ["'self'"], scriptSrc: ["'self'", "'unsafe-inline'", "https://trusted-cdn.com"], styleSrc: ["'self'", "'unsafe-inline'", "https://fonts.googleapis.com"], imgSrc: ["'self'", "data:", "https:"], connectSrc: ["'self'"], fontSrc: ["'self'", "https://fonts.gstatic.com"], objectSrc: ["'none'"], mediaSrc: ["'self'"], frameSrc: ["'none'"], baseUri: ["'self'"], formAction: ["'self'"] }, }, hsts: { maxAge: 31536000, includeSubDomains: true, preload: true }, noSniff: true, xssFilter: true, referrerPolicy: { policy: 'same-origin' } })); // Advanced rate limiting const createRateLimiter = (windowMs, max, message) => rateLimit({ windowMs, max, message: { error: message }, standardHeaders: true, legacyHeaders: false, handler: (req, res) => { res.status(429).json({ error: message, retryAfter: Math.round(windowMs / 1000) }); } }); // Different limits for different endpoints app.use('/api/auth/login', createRateLimiter(15 * 60 * 1000, 5, 'Too many login attempts')); app.use('/api/auth/register', createRateLimiter(60 * 60 * 1000, 3, 'Too many registration attempts')); app.use('/api/', createRateLimiter(15 * 60 * 1000, 100, 'Too many API requests')); // CORS configuration app.use(cors({ origin: process.env.ALLOWED_ORIGINS?.split(',') || ['http://localhost:3000'], credentials: true, optionsSuccessStatus: 200 })); // Input sanitization and validation app.use(express.json({ limit: '10mb', verify: (req, res, buf) => { if (buf.length > 10 * 1024 * 1024) { throw new Error('Request entity too large'); } } })); app.use(mongoSanitize()); // Prevent NoSQL injection app.use(hpp()); // Prevent HTTP Parameter Pollution // Custom security middleware app.use((req, res, next) => { // Remove sensitive headers res.removeHeader('X-Powered-By'); // Add security headers res.setHeader('X-Content-Type-Options', 'nosniff'); res.setHeader('X-Frame-Options', 'DENY'); res.setHeader('X-XSS-Protection', '1; mode=block'); next(); }); // Secure session configuration app.use(session({ secret: process.env.SESSION_SECRET || throwError('SESSION_SECRET required'), name: 'sessionId', // Don't use default 'connect.sid' resave: false, saveUninitialized: false, cookie: { secure: process.env.NODE_ENV === 'production', httpOnly: true, maxAge: 24 * 60 * 60 * 1000, // 24 hours sameSite: 'strict' }, store: new RedisStore({ /* Redis configuration */ }) })); // SQL injection prevention const db = require('better-sqlite3')('app.db', { verbose: process.env.NODE_ENV === 'development' ? console.log : null }); // Prepared statements const statements = { getUserByEmail: db.prepare('SELECT * FROM users WHERE email = ?'), getUserById: db.prepare('SELECT * FROM users WHERE id = ?'), createUser: db.prepare('INSERT INTO users (email, password_hash) VALUES (?, ?)') }; // Safe database operations app.post('/login', async (req, res) => { const { email, password } = req.body; // Input validation if (!email || !password) { return res.status(400).json({ error: 'Email and password required' }); } try { const user = statements.getUserByEmail.get(email); if (user && await bcrypt.compare(password, user.password_hash)) { req.session.userId = user.id; res.json({ success: true, user: { id: user.id, email: user.email } }); } else { res.status(401).json({ error: 'Invalid credentials' }); } } catch (error) { console.error('Login error:', error); res.status(500).json({ error: 'Internal server error' }); } }); """, 'flask': """ # Enhanced Flask security configuration from flask import Flask, request, session, jsonify from flask_talisman import Talisman from flask_limiter import Limiter from flask_limiter.util import get_remote_address from flask_seasurf import SeaSurf from flask_cors import CORS import bcrypt import sqlite3 import os import secrets app = Flask(__name__) # Security configuration app.config.update( SECRET_KEY=os.environ.get('SECRET_KEY') or secrets.token_urlsafe(32), SESSION_COOKIE_SECURE=True, SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SAMESITE='Lax', PERMANENT_SESSION_LIFETIME=timedelta(hours=24) ) # HTTPS enforcement and security headers Talisman(app, { 'force_https': app.config.get('ENV') == 'production', 'strict_transport_security': True, 'strict_transport_security_max_age': 31536000, 'content_security_policy': { 'default-src': "'self'", 'script-src': "'self' 'unsafe-inline'", 'style-src': "'self' 'unsafe-inline' https://fonts.googleapis.com", 'font-src': "'self' https://fonts.gstatic.com", 'img-src': "'self' data: https:", 'connect-src': "'self'", 'frame-src': "'none'", 'object-src': "'none'" }, 'referrer_policy': 'strict-origin-when-cross-origin' }) # CORS configuration CORS(app, { 'origins': os.environ.get('ALLOWED_ORIGINS', 'http://localhost:3000').split(','), 'supports_credentials': True }) # Rate limiting limiter = Limiter( app, key_func=get_remote_address, default_limits=["1000 per hour"] ) # CSRF protection SeaSurf(app) # Database connection with security def get_db_connection(): conn = sqlite3.connect('app.db') conn.row_factory = sqlite3.Row conn.execute('PRAGMA foreign_keys = ON') # Enable foreign key constraints return conn # Secure password hashing class PasswordManager: @staticmethod def hash_password(password: str) -> str: return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') @staticmethod def verify_password(password: str, hashed: str) -> bool: return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8')) # Input validation def validate_email(email: str) -> bool: import re pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' return re.match(pattern, email) is not None # Secure login endpoint @app.route('/api/login', methods=['POST']) @limiter.limit("5 per minute") def login(): data = request.get_json() if not data or 'email' not in data or 'password' not in data: return jsonify({'error': 'Email and password required'}), 400 email = data['email'].strip().lower() password = data['password'] if not validate_email(email): return jsonify({'error': 'Invalid email format'}), 400 try: conn = get_db_connection() user = conn.execute( 'SELECT id, email, password_hash FROM users WHERE email = ?', (email,) ).fetchone() conn.close() if user and PasswordManager.verify_password(password, user['password_hash']): session['user_id'] = user['id'] session.permanent = True return jsonify({ 'success': True, 'user': {'id': user['id'], 'email': user['email']} }) else: return jsonify({'error': 'Invalid credentials'}), 401 except Exception as e: app.logger.error(f'Login error: {e}') return jsonify({'error': 'Internal server error'}), 500 # Request logging middleware @app.before_request def log_request_info(): app.logger.info('Request: %s %s from %s', request.method, request.url, request.remote_addr) # Error handlers @app.errorhandler(429) def ratelimit_handler(e): return jsonify({'error': 'Rate limit exceeded', 'retry_after': e.retry_after}), 429 @app.errorhandler(500) def internal_error(error): app.logger.error(f'Server Error: {error}') return jsonify({'error': 'Internal server error'}), 500 if __name__ == '__main__': app.run( host='0.0.0.0' if app.config.get('ENV') == 'production' else '127.0.0.1', port=int(os.environ.get('PORT', 5000)), debug=False # Never enable debug in production ) """ } ``` **Authentication Implementation** ```python # Secure password handling import bcrypt from datetime import datetime, timedelta import jwt import secrets class SecureAuth: def __init__(self): self.jwt_secret = os.environ.get('JWT_SECRET', secrets.token_urlsafe(32)) self.password_min_length = 12 def hash_password(self, password): """ Securely hash password with bcrypt """ # Validate password strength if len(password) < self.password_min_length: raise ValueError(f"Password must be at least {self.password_min_length} characters") # Check common passwords if password.lower() in self.load_common_passwords(): raise ValueError("Password is too common") # Hash with bcrypt (cost factor 12) salt = bcrypt.gensalt(rounds=12) return bcrypt.hashpw(password.encode('utf-8'), salt) def verify_password(self, password, hashed): """ Verify password against hash """ return bcrypt.checkpw(password.encode('utf-8'), hashed) def generate_token(self, user_id, expires_in=3600): """ Generate secure JWT token """ payload = { 'user_id': user_id, 'exp': datetime.utcnow() + timedelta(seconds=expires_in), 'iat': datetime.utcnow(), 'jti': secrets.token_urlsafe(16) # Unique token ID } return jwt.encode( payload, self.jwt_secret, algorithm='HS256' ) def verify_token(self, token): """ Verify and decode JWT token """ try: payload = jwt.decode( token, self.jwt_secret, algorithms=['HS256'] ) return payload except jwt.ExpiredSignatureError: raise ValueError("Token has expired") except jwt.InvalidTokenError: raise ValueError("Invalid token") ``` ### 8. CI/CD Security Integration Integrate security scanning into your development pipeline: **GitHub Actions Security Workflow** ```yaml # .github/workflows/security.yml name: Security Scan on: push: branches: [ main, develop ] pull_request: branches: [ main ] schedule: - cron: '0 2 * * 1' # Weekly scan on Mondays jobs: security-scan: runs-on: ubuntu-latest permissions: contents: read security-events: write pull-requests: write steps: - uses: actions/checkout@v4 with: fetch-depth: 0 # Full history for secret scanning - name: Set up Node.js uses: actions/setup-node@v4 with: node-version: '18' cache: 'npm' - name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.11' - name: Install security tools run: | # Node.js tools npm install -g audit-ci @cyclonedx/cli # Python tools pip install safety bandit semgrep pip-audit # Container tools curl -sfL https://raw.githubusercontent.com/aquasecurity/trivy/main/contrib/install.sh | sh -s -- -b /usr/local/bin # Secret scanning curl -sSfL https://raw.githubusercontent.com/trufflesecurity/trufflehog/main/scripts/install.sh | sh -s -- -b /usr/local/bin - name: Run secret detection run: | trufflehog filesystem . --json --no-update > trufflehog-results.json - name: Upload secret scan results if: always() uses: github/codeql-action/upload-sarif@v2 with: sarif_file: trufflehog-results.json - name: JavaScript/TypeScript Security Scan if: hashFiles('package.json') != '' run: | npm ci # Dependency audit npm audit --audit-level moderate --json > npm-audit.json || true # SAST with ESLint Security npx eslint . --ext .js,.jsx,.ts,.tsx --format json --output-file eslint-security.json || true # Generate SBOM npx @cyclonedx/cli --type npm --output-format json --output-file sbom-npm.json - name: Python Security Scan if: hashFiles('requirements.txt', 'setup.py', 'pyproject.toml') != '' run: | # Install dependencies if [ -f requirements.txt ]; then pip install -r requirements.txt; fi if [ -f setup.py ]; then pip install -e .; fi # Dependency vulnerability scan safety check --json --output safety-results.json || true pip-audit --format=json --output=pip-audit-results.json || true # SAST with Bandit bandit -r . -f json -o bandit-results.json || true # Advanced SAST with Semgrep semgrep --config=auto --json --output=semgrep-results.json . || true - name: Container Security Scan if: hashFiles('Dockerfile', 'docker-compose.yml') != '' run: | # Build image for scanning if [ -f Dockerfile ]; then docker build -t security-scan:latest . # Trivy image scan trivy image --format sarif --output trivy-image.sarif security-scan:latest # Trivy filesystem scan trivy fs --format sarif --output trivy-fs.sarif . fi - name: Infrastructure as Code Scan if: hashFiles('*.tf', '*.yaml', '*.yml') != '' run: | # Install Checkov pip install checkov # Scan Terraform if ls *.tf 1> /dev/null 2>&1; then checkov -f *.tf --framework terraform --output sarif > checkov-terraform.sarif || true fi # Scan Kubernetes manifests if ls *.yaml *.yml 1> /dev/null 2>&1; then checkov -f *.yaml -f *.yml --framework kubernetes --output sarif > checkov-k8s.sarif || true fi - name: Upload scan results to Security tab if: always() uses: github/codeql-action/upload-sarif@v2 with: sarif_file: | trivy-image.sarif trivy-fs.sarif checkov-terraform.sarif checkov-k8s.sarif - name: Generate Security Report if: always() run: | python << 'EOF' import json import glob from datetime import datetime # Collect all scan results results = { 'timestamp': datetime.now().isoformat(), 'summary': {'total': 0, 'critical': 0, 'high': 0, 'medium': 0, 'low': 0}, 'tools': [], 'vulnerabilities': [] } # Process each result file result_files = glob.glob('*-results.json') + glob.glob('*.sarif') for file in result_files: try: with open(file, 'r') as f: data = json.load(f) results['tools'].append(file) # Process based on tool format # (Implementation would parse each tool's output format) except: continue # Generate markdown report with open('security-report.md', 'w') as f: f.write(f"# Security Scan Report\n\n") f.write(f"**Date**: {results['timestamp']}\n\n") f.write(f"## Summary\n\n") f.write(f"- Total Vulnerabilities: {results['summary']['total']}\n") f.write(f"- Critical: {results['summary']['critical']}\n") f.write(f"- High: {results['summary']['high']}\n") f.write(f"- Medium: {results['summary']['medium']}\n") f.write(f"- Low: {results['summary']['low']}\n\n") f.write(f"## Tools Used\n\n") for tool in results['tools']: f.write(f"- {tool}\n") print("Security report generated: security-report.md") EOF - name: Comment PR with Security Results if: github.event_name == 'pull_request' uses: actions/github-script@v6 with: script: | const fs = require('fs'); try { const report = fs.readFileSync('security-report.md', 'utf8'); await github.rest.issues.createComment({ issue_number: context.issue.number, owner: context.repo.owner, repo: context.repo.repo, body: '## 🔒 Security Scan Results\n\n' + report }); } catch (error) { console.log('Could not post security report:', error); } - name: Fail on Critical Vulnerabilities run: | # Check if any critical vulnerabilities found CRITICAL_COUNT=$(jq -r '.summary.critical // 0' security-report.json 2>/dev/null || echo "0") if [ "$CRITICAL_COUNT" -gt 0 ]; then echo "❌ Found $CRITICAL_COUNT critical vulnerabilities!" echo "Security scan failed due to critical vulnerabilities." exit 1 fi HIGH_COUNT=$(jq -r '.summary.high // 0' security-report.json 2>/dev/null || echo "0") if [ "$HIGH_COUNT" -gt 5 ]; then echo "⚠️ Found $HIGH_COUNT high-severity vulnerabilities!" echo "Consider addressing high-severity issues." # Don't fail for high-severity, just warn fi echo "✅ Security scan completed successfully!" ``` **Automated Remediation Workflow** ```yaml # .github/workflows/auto-remediation.yml name: Automated Security Remediation on: schedule: - cron: '0 6 * * 2' # Weekly on Tuesdays workflow_dispatch: inputs: fix_type: description: 'Type of fixes to apply' required: true default: 'dependencies' type: choice options: - dependencies - secrets - config - all jobs: auto-remediation: runs-on: ubuntu-latest permissions: contents: write pull-requests: write steps: - uses: actions/checkout@v4 with: token: ${{ secrets.GITHUB_TOKEN }} - name: Set up Node.js if: hashFiles('package.json') != '' uses: actions/setup-node@v4 with: node-version: '18' cache: 'npm' - name: Auto-fix npm dependencies if: contains(github.event.inputs.fix_type, 'dependencies') || contains(github.event.inputs.fix_type, 'all') run: | if [ -f package.json ]; then npm audit fix --force npm update fi - name: Auto-fix Python dependencies if: contains(github.event.inputs.fix_type, 'dependencies') || contains(github.event.inputs.fix_type, 'all') run: | if [ -f requirements.txt ]; then pip install pip-tools pip-compile --upgrade requirements.in fi - name: Remove detected secrets if: contains(github.event.inputs.fix_type, 'secrets') || contains(github.event.inputs.fix_type, 'all') run: | # Install git-filter-repo pip install git-filter-repo # Create backup branch git checkout -b security-remediation-$(date +%Y%m%d) # Remove common secret patterns (be very careful with this) echo "Warning: This would remove secrets from git history" echo "Manual review required for production use" - name: Update security configurations if: contains(github.event.inputs.fix_type, 'config') || contains(github.event.inputs.fix_type, 'all') run: | # Add .gitignore entries for common secret files cat >> .gitignore << 'EOF' # Security - ignore potential secret files .env .env.local .env.*.local *.pem *.key *.p12 *.pfx config/secrets.yml config/database.yml EOF # Update Docker security if [ -f Dockerfile ]; then # Add security improvements to Dockerfile echo "RUN addgroup -g 1001 -S appgroup && adduser -S appuser -u 1001 -G appgroup" >> Dockerfile.security echo "USER appuser" >> Dockerfile.security fi - name: Create Pull Request uses: peter-evans/create-pull-request@v5 with: token: ${{ secrets.GITHUB_TOKEN }} commit-message: 'security: automated vulnerability remediation' title: '🔒 Automated Security Fixes' body: | ## Automated Security Remediation This PR contains automated fixes for security vulnerabilities: ### Changes Made - ✅ Updated vulnerable dependencies - ✅ Added security configurations - ✅ Improved .gitignore for secrets ### Manual Review Required - [ ] Verify all dependency updates are compatible - [ ] Test application functionality - [ ] Review any secret removal changes **⚠️ Important**: Always test thoroughly before merging automated security fixes. branch: security/automated-fixes delete-branch: true ``` ### 9. Security Report Generation Generate comprehensive security reports with actionable insights: **Advanced Reporting System** ```python import json import jinja2 from datetime import datetime from typing import Dict, List, Any from dataclasses import dataclass @dataclass class SecurityMetrics: total_vulnerabilities: int critical_count: int high_count: int medium_count: int low_count: int tools_used: List[str] scan_duration: float coverage_percentage: float false_positive_rate: float class SecurityReportGenerator: def __init__(self): self.template_env = jinja2.Environment( loader=jinja2.DictLoader({ 'executive_summary': self.EXECUTIVE_TEMPLATE, 'detailed_report': self.DETAILED_TEMPLATE, 'dashboard': self.DASHBOARD_TEMPLATE }) ) EXECUTIVE_TEMPLATE = """ # Executive Security Assessment Report **Assessment Date**: {{ timestamp }} **Overall Risk Level**: {{ risk_level }} **Confidence Score**: {{ confidence_score }}% ## Summary - **Total Vulnerabilities**: {{ metrics.total_vulnerabilities }} - **Critical**: {{ metrics.critical_count }} ({{ critical_percentage }}%) - **High**: {{ metrics.high_count }} ({{ high_percentage }}%) - **Medium**: {{ metrics.medium_count }} ({{ medium_percentage }}%) - **Low**: {{ metrics.low_count }} ({{ low_percentage }}%) ## Risk Assessment | Risk Category | Current Level | Target Level | Priority | |---------------|---------------|--------------|----------| {% for risk in risk_categories %} | {{ risk.category }} | {{ risk.current }} | {{ risk.target }} | {{ risk.priority }} | {% endfor %} ## Immediate Actions Required {% for action in immediate_actions %} {{ loop.index }}. **{{ action.title }}** ({{ action.effort }}) - Impact: {{ action.impact }} - Timeline: {{ action.timeline }} - Owner: {{ action.owner }} {% endfor %} ## Compliance Status {% for framework in compliance_frameworks %} - **{{ framework.name }}**: {{ framework.status }} ({{ framework.score }}/100) {% endfor %} ## Investment Required - **Immediate (0-30 days)**: {{ costs.immediate }} - **Short-term (1-6 months)**: {{ costs.short_term }} - **Long-term (6+ months)**: {{ costs.long_term }} """ DETAILED_TEMPLATE = """ # Detailed Security Findings Report ## Vulnerability Details {% for vuln in vulnerabilities %} ### {{ loop.index }}. {{ vuln.title }} **Severity**: {{ vuln.severity }} | **Confidence**: {{ vuln.confidence }} | **Tool**: {{ vuln.tool }} **Location**: `{{ vuln.file_path }}:{{ vuln.line_number }}` **Description**: {{ vuln.description }} **Impact**: {{ vuln.impact }} **Remediation**: ```{{ vuln.language }} {{ vuln.remediation_code }} ``` **References**: {% for ref in vuln.references %} - [{{ ref.title }}]({{ ref.url }}) {% endfor %} --- {% endfor %} ## Tool Effectiveness Analysis {% for tool in tool_analysis %} ### {{ tool.name }} - **Vulnerabilities Found**: {{ tool.found_count }} - **False Positives**: {{ tool.false_positives }}% - **Execution Time**: {{ tool.execution_time }}s - **Coverage**: {{ tool.coverage }}% - **Recommendation**: {{ tool.recommendation }} {% endfor %} """ def generate_comprehensive_report(self, scan_results: Dict[str, Any]) -> Dict[str, str]: """Generate all report formats""" # Process scan results metrics = self._calculate_metrics(scan_results) risk_assessment = self._assess_risk(scan_results, metrics) compliance_status = self._check_compliance(scan_results) # Generate different report formats reports = { 'executive_summary': self._generate_executive_summary( metrics, risk_assessment, compliance_status ), 'detailed_report': self._generate_detailed_report(scan_results), 'json_report': json.dumps({ 'metadata': { 'timestamp': datetime.now().isoformat(), 'version': '2.0', 'format': 'sarif-2.1.0' }, 'metrics': metrics.__dict__, 'vulnerabilities': scan_results.get('vulnerabilities', []), 'risk_assessment': risk_assessment, 'compliance': compliance_status }, indent=2), 'sarif_report': self._generate_sarif_report(scan_results) } return reports def _calculate_metrics(self, scan_results: Dict[str, Any]) -> SecurityMetrics: """Calculate security metrics from scan results""" vulnerabilities = scan_results.get('vulnerabilities', []) severity_counts = {'CRITICAL': 0, 'HIGH': 0, 'MEDIUM': 0, 'LOW': 0} for vuln in vulnerabilities: severity = vuln.get('severity', 'UNKNOWN').upper() if severity in severity_counts: severity_counts[severity] += 1 return SecurityMetrics( total_vulnerabilities=len(vulnerabilities), critical_count=severity_counts['CRITICAL'], high_count=severity_counts['HIGH'], medium_count=severity_counts['MEDIUM'], low_count=severity_counts['LOW'], tools_used=scan_results.get('tools_used', []), scan_duration=scan_results.get('scan_duration', 0), coverage_percentage=scan_results.get('coverage', 0), false_positive_rate=scan_results.get('false_positive_rate', 0) ) def _assess_risk(self, scan_results: Dict[str, Any], metrics: SecurityMetrics) -> Dict[str, Any]: """Perform comprehensive risk assessment""" # Calculate risk score (0-100) risk_score = min(100, ( metrics.critical_count * 25 + metrics.high_count * 15 + metrics.medium_count * 5 + metrics.low_count * 1 )) # Determine risk level if risk_score >= 80: risk_level = 'CRITICAL' elif risk_score >= 60: risk_level = 'HIGH' elif risk_score >= 30: risk_level = 'MEDIUM' else: risk_level = 'LOW' # Business impact assessment business_impact = { 'data_breach_probability': min(95, risk_score + metrics.critical_count * 10), 'service_disruption_risk': min(90, risk_score * 0.8), 'compliance_violation_risk': min(100, risk_score + (metrics.critical_count * 5)), 'reputation_damage_potential': min(85, risk_score * 0.9) } return { 'score': risk_score, 'level': risk_level, 'business_impact': business_impact, 'trending': self._calculate_risk_trend(scan_results), 'peer_comparison': self._compare_with_industry_standards(risk_score) } def _generate_sarif_report(self, scan_results: Dict[str, Any]) -> str: """Generate SARIF 2.1.0 compliant report""" sarif_report = { "version": "2.1.0", "$schema": "https://raw.githubusercontent.com/oasis-tcs/sarif-spec/master/Schemata/sarif-schema-2.1.0.json", "runs": [] } # Group findings by tool tools_data = {} for vuln in scan_results.get('vulnerabilities', []): tool = vuln.get('tool', 'unknown') if tool not in tools_data: tools_data[tool] = [] tools_data[tool].append(vuln) # Create run for each tool for tool_name, vulnerabilities in tools_data.items(): run = { "tool": { "driver": { "name": tool_name, "version": "1.0.0", "informationUri": f"https://docs.{tool_name}.com" } }, "results": [] } for vuln in vulnerabilities: result = { "ruleId": vuln.get('type', 'unknown'), "message": { "text": vuln.get('description', vuln.get('title', 'Security issue detected')) }, "level": self._map_severity_to_sarif_level(vuln.get('severity', 'medium')), "locations": [{ "physicalLocation": { "artifactLocation": { "uri": vuln.get('file_path', 'unknown') }, "region": { "startLine": vuln.get('line_number', 1) } } }] } if vuln.get('cwe'): result["properties"] = { "cwe": vuln.get('cwe'), "confidence": vuln.get('confidence', 'medium') } run["results"].append(result) sarif_report["runs"].append(run) return json.dumps(sarif_report, indent=2) def _map_severity_to_sarif_level(self, severity: str) -> str: """Map severity to SARIF level""" mapping = { 'CRITICAL': 'error', 'HIGH': 'error', 'MEDIUM': 'warning', 'LOW': 'note' } return mapping.get(severity.upper(), 'warning') # Usage example report_generator = SecurityReportGenerator() # Sample scan results sample_results = { 'vulnerabilities': [ { 'tool': 'bandit', 'severity': 'HIGH', 'title': 'SQL Injection vulnerability', 'description': 'Parameterized query missing', 'file_path': 'api/users.py', 'line_number': 45, 'cwe': 'CWE-89' } ], 'tools_used': ['bandit', 'safety', 'trivy'], 'scan_duration': 120.5, 'coverage': 85.2 } reports = report_generator.generate_comprehensive_report(sample_results) ``` **Executive Summary** ```markdown ## Security Assessment Report **Date**: 2025-07-19 **Severity**: CRITICAL **Confidence**: 94% ### Summary - Total Vulnerabilities: 47 - Critical: 8 (17%) - High: 15 (32%) - Medium: 18 (38%) - Low: 6 (13%) ### Critical Findings 1. **SQL Injection** in user search endpoint (api/search.py:45) 2. **Hardcoded AWS credentials** in config.js:12 3. **Outdated dependencies** with known RCE vulnerabilities 4. **Missing authentication** on admin endpoints ### Business Impact | Risk Category | Probability | Impact | Priority | |---------------|-------------|--------|----------| | Data Breach | 85% | Critical | P0 | | Service Disruption | 60% | High | P1 | | Compliance Violation | 90% | Critical | P0 | | Reputation Damage | 70% | High | P1 | ### Immediate Actions Required (Next 24 Hours) 1. **Patch SQL injection vulnerability** (2 hours) - [@dev-team] 2. **Remove and rotate all hardcoded credentials** (1 hour) - [@security-team] 3. **Block admin endpoints** until auth is implemented (30 minutes) - [@ops-team] ### Short-term Actions (Next 30 Days) 1. **Update critical dependencies** (4 hours) 2. **Implement authentication middleware** (6 hours) 3. **Deploy security headers** (2 hours) 4. **Security training for development team** (8 hours) ### Investment Required - **Immediate fixes**: $5,000 (40 hours @ $125/hr) - **Security improvements**: $15,000 (120 hours) - **Training and processes**: $10,000 - **Total**: $30,000 ### Compliance Status - **OWASP Top 10**: 3/10 major issues - **SOC 2**: Non-compliant (authentication controls) - **PCI DSS**: Non-compliant (data protection) - **GDPR**: At risk (data breach potential) ``` **Detailed Findings with Remediation Code** ```json { "scan_metadata": { "timestamp": "2025-07-19T10:30:00Z", "version": "2.1", "tools_used": ["bandit", "safety", "trivy", "semgrep", "eslint-security"], "scan_duration_seconds": 127, "coverage_percentage": 94.2, "false_positive_rate": 3.1 }, "vulnerabilities": [ { "id": "VULN-001", "type": "SQL Injection", "severity": "CRITICAL", "cvss_score": 9.8, "cwe": "CWE-89", "owasp_category": "A03:2021-Injection", "tool": "semgrep", "confidence": "high", "location": { "file": "api/search.js", "line": 45, "column": 12, "code_snippet": "db.query(`SELECT * FROM users WHERE name LIKE '%${req.query.search}%'`)", "function": "searchUsers" }, "impact": { "description": "Complete database compromise, data exfiltration, potential RCE", "business_impact": "Critical - customer data exposure, regulatory violations", "affected_users": "All users with search functionality access" }, "remediation": { "effort_hours": 2, "priority": "P0", "description": "Replace string concatenation with parameterized queries", "fixed_code": "db.query('SELECT * FROM users WHERE name LIKE ?', [`%${req.query.search}%`])", "testing_required": "Unit tests for search functionality", "deployment_notes": "No breaking changes, safe to deploy immediately" }, "references": [ { "title": "OWASP SQL Injection Prevention", "url": "https://owasp.org/www-community/attacks/SQL_Injection" }, { "title": "Node.js Parameterized Queries", "url": "https://nodejs.org/en/docs/guides/security/" } ], "exploitability": { "ease_of_exploitation": "Very Easy", "attack_vector": "Remote", "authentication_required": false, "user_interaction": false } }, { "id": "VULN-002", "type": "Hardcoded Secrets", "severity": "CRITICAL", "cvss_score": 9.1, "cwe": "CWE-798", "tool": "trufflehog", "confidence": "verified", "location": { "file": "config/database.js", "line": 12, "code_snippet": "const password = 'MyS3cr3tP@ssw0rd123!'" }, "impact": { "description": "Database credentials exposure, unauthorized access", "business_impact": "Critical - full database access, data breach potential" }, "remediation": { "effort_hours": 1, "priority": "P0", "immediate_actions": [ "Rotate database password immediately", "Remove hardcoded credential from code", "Implement environment variable loading" ], "fixed_code": "const password = process.env.DATABASE_PASSWORD || throwError('Missing DATABASE_PASSWORD')", "additional_steps": [ "Add .env to .gitignore", "Update deployment scripts to use secrets management", "Scan git history for credential exposure" ] } }, { "id": "VULN-003", "type": "Vulnerable Dependency", "severity": "HIGH", "cvss_score": 8.5, "cve": "CVE-2024-1234", "tool": "npm-audit", "location": { "file": "package.json", "dependency": "express", "version": "4.17.1", "vulnerable_path": "express > body-parser > raw-body" }, "impact": { "description": "Remote code execution via malformed request body", "affected_endpoints": ["/api/upload", "/api/webhook"] }, "remediation": { "effort_hours": 0.5, "priority": "P1", "fixed_version": "4.18.2", "update_command": "npm install express@4.18.2", "breaking_changes": false, "testing_required": "Regression testing for API endpoints" } } ], "summary": { "total_vulnerabilities": 47, "by_severity": { "critical": 8, "high": 15, "medium": 18, "low": 6 }, "by_category": { "injection": 12, "broken_auth": 8, "sensitive_data": 6, "xml_entities": 2, "broken_access_control": 5, "security_misconfig": 9, "xss": 3, "insecure_deserialization": 1, "vulnerable_components": 15, "insufficient_logging": 4 }, "remediation_timeline": { "immediate_p0": 9, "urgent_p1": 18, "medium_p2": 15, "low_p3": 5 }, "total_effort_hours": 47.5, "estimated_cost": 5938, "risk_score": 89 }, "compliance_assessment": { "owasp_top_10_2021": { "a01_broken_access_control": "FAIL", "a02_cryptographic_failures": "PASS", "a03_injection": "FAIL", "a04_insecure_design": "WARNING", "a05_security_misconfiguration": "FAIL", "a06_vulnerable_components": "FAIL", "a07_identification_failures": "FAIL", "a08_software_integrity_failures": "PASS", "a09_logging_failures": "WARNING", "a10_ssrf": "PASS" }, "frameworks": { "nist_cybersecurity": 67, "iso_27001": 71, "pci_dss": 45, "sox_compliance": 78 } } } ``` ### 10. Cross-Command Integration ### Complete Security-First Development Workflow **Secure API Development Pipeline** ```bash # 1. Generate secure API scaffolding /api-scaffold framework: "fastapi" security_features: ["jwt_auth", "rate_limiting", "input_validation", "cors"] database: "postgresql" # 2. Run comprehensive security scan /security-scan scan_types: ["sast", "dependency", "secrets", "container", "iac"] autofix: true generate_report: true # 3. Generate security-aware tests /test-harness test_types: ["unit", "security", "penetration"] security_frameworks: ["bandit", "safety", "owasp-zap"] # 4. Optimize containers with security hardening /docker-optimize security_hardening: true vulnerability_scanning: true minimal_base_images: true ``` **Integrated Security Configuration** ```python # security-config.py - Shared across all commands class IntegratedSecurityConfig: def __init__(self): self.api_security = self.load_api_security_config() # From /api-scaffold self.scan_config = self.load_scan_config() # From /security-scan self.test_security = self.load_test_security_config() # From /test-harness self.container_security = self.load_container_config() # From /docker-optimize def generate_security_middleware(self): """Generate security middleware based on API scaffold config""" middleware = [] if self.api_security.get('rate_limiting'): middleware.append({ 'type': 'rate_limiting', 'config': { 'requests_per_minute': 100, 'burst_size': 10, 'key_func': 'lambda request: request.client.host' } }) if self.api_security.get('jwt_auth'): middleware.append({ 'type': 'jwt_auth', 'config': { 'secret_key': '${JWT_SECRET_KEY}', 'algorithm': 'HS256', 'token_expiry': 3600 } }) return middleware def generate_security_tests(self): """Generate security tests based on scan findings""" test_cases = [] # SQL Injection tests based on API endpoints api_endpoints = self.api_security.get('endpoints', []) for endpoint in api_endpoints: if endpoint.get('accepts_input'): test_cases.append({ 'type': 'sql_injection', 'endpoint': endpoint['path'], 'payloads': self.get_sql_injection_payloads() }) # Authentication bypass tests if self.api_security.get('jwt_auth'): test_cases.append({ 'type': 'auth_bypass', 'scenarios': [ 'invalid_token', 'expired_token', 'malformed_token', 'no_token' ] }) return test_cases def generate_container_security_policies(self): """Generate container security policies""" policies = { 'dockerfile_security': { 'non_root_user': True, 'minimal_layers': True, 'security_updates': True, 'no_secrets_in_layers': True }, 'runtime_security': { 'read_only_filesystem': True, 'no_new_privileges': True, 'drop_capabilities': ['ALL'], 'add_capabilities': ['NET_BIND_SERVICE'] if self.api_security.get('bind_privileged_ports') else [] } } return policies ``` **API Security Integration** ```python # Generated secure API endpoint with integrated security from fastapi import FastAPI, Depends, HTTPException, Request from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded import jwt from datetime import datetime, timedelta # Security configuration from /security-scan security_config = IntegratedSecurityConfig() # Rate limiting from security scan recommendations limiter = Limiter(key_func=get_remote_address) app = FastAPI() app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) # JWT authentication from security scan requirements security = HTTPBearer() def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)): """JWT verification with security scan compliance""" try: payload = jwt.decode( credentials.credentials, security_config.jwt_secret, algorithms=["HS256"] ) return payload except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Token expired") except jwt.InvalidTokenError: raise HTTPException(status_code=401, detail="Invalid token") # Secure endpoint with integrated protections @app.post("/api/v1/users/") @limiter.limit("10/minute") # Rate limiting from security scan async def create_user( request: Request, user_data: UserCreateSchema, # Input validation from security scan current_user: dict = Depends(verify_token) # Authentication ): """ Secure user creation endpoint with integrated security controls Security features applied: - Rate limiting (10 requests/minute) - JWT authentication required - Input validation via Pydantic - SQL injection prevention via ORM - XSS prevention via output encoding """ # Additional security validation from scan results if not validate_user_input(user_data): raise HTTPException(status_code=400, detail="Invalid input data") # Create user with security logging try: user = await user_service.create_user(user_data) security_logger.log_user_creation(current_user['sub'], user.id) return user except Exception as e: security_logger.log_error("user_creation_failed", str(e)) raise HTTPException(status_code=500, detail="User creation failed") ``` **Database Security Integration** ```python # Database security configuration from /db-migrate and /security-scan class SecureDatabaseConfig: def __init__(self): self.migration_config = self.load_migration_config() # From /db-migrate self.security_requirements = self.load_security_scan_results() def generate_secure_migrations(self): """Generate database migrations with security controls""" migrations = [] # User table with security controls migrations.append({ 'operation': 'create_table', 'table': 'users', 'columns': [ {'name': 'id', 'type': 'UUID', 'primary_key': True}, {'name': 'email', 'type': 'VARCHAR(255)', 'unique': True, 'encrypted': True}, {'name': 'password_hash', 'type': 'VARCHAR(255)', 'not_null': True}, {'name': 'created_at', 'type': 'TIMESTAMP', 'default': 'NOW()'}, {'name': 'last_login', 'type': 'TIMESTAMP'}, {'name': 'failed_login_attempts', 'type': 'INTEGER', 'default': 0}, {'name': 'locked_until', 'type': 'TIMESTAMP', 'nullable': True} ], 'security_features': { 'row_level_security': True, 'audit_logging': True, 'field_encryption': ['email'], 'password_policy': { 'min_length': 12, 'require_special_chars': True, 'require_numbers': True, 'expire_days': 90 } } }) # Security audit log table migrations.append({ 'operation': 'create_table', 'table': 'security_audit_log', 'columns': [ {'name': 'id', 'type': 'UUID', 'primary_key': True}, {'name': 'user_id', 'type': 'UUID', 'foreign_key': 'users.id'}, {'name': 'action', 'type': 'VARCHAR(100)', 'not_null': True}, {'name': 'ip_address', 'type': 'INET', 'not_null': True}, {'name': 'user_agent', 'type': 'TEXT'}, {'name': 'timestamp', 'type': 'TIMESTAMP', 'default': 'NOW()'}, {'name': 'success', 'type': 'BOOLEAN', 'not_null': True}, {'name': 'details', 'type': 'JSONB'} ], 'indexes': [ {'name': 'idx_audit_user_timestamp', 'columns': ['user_id', 'timestamp']}, {'name': 'idx_audit_action_timestamp', 'columns': ['action', 'timestamp']} ] }) return migrations ``` **Container Security Integration** ```dockerfile # Dockerfile.secure - Generated with /docker-optimize + /security-scan # Multi-stage build with security hardening FROM python:3.11-slim-bookworm AS base # Security: Create non-root user RUN groupadd -r appuser && useradd -r -g appuser appuser # Security: Update packages and remove package manager cache RUN apt-get update && \ apt-get upgrade -y && \ apt-get install -y --no-install-recommends \ # Only essential packages ca-certificates \ && rm -rf /var/lib/apt/lists/* # Security: Set work directory with proper permissions WORKDIR /app RUN chown appuser:appuser /app # Install Python dependencies with security checks COPY requirements.txt . RUN pip install --no-cache-dir --upgrade pip && \ pip install --no-cache-dir -r requirements.txt && \ # Security scan dependencies during build pip-audit --format=json --output=/tmp/pip-audit.json && \ safety check --json --output=/tmp/safety.json # Copy application code COPY --chown=appuser:appuser . . # Security: Remove any secrets or sensitive files RUN find . -name "*.key" -delete && \ find . -name "*.pem" -delete && \ find . -name ".env*" -delete # Security: Switch to non-root user USER appuser # Security: Read-only filesystem, no new privileges # These will be enforced at runtime via Kubernetes security context EXPOSE 8000 # Health check for container security monitoring HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \ CMD python -c "import requests; requests.get('http://localhost:8000/health')" CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] ``` **Kubernetes Security Integration** ```yaml # k8s-secure-deployment.yaml - From /k8s-manifest + /security-scan apiVersion: v1 kind: ServiceAccount metadata: name: api-service-account namespace: production automountServiceAccountToken: false --- apiVersion: networking.k8s.io/v1 kind: NetworkPolicy metadata: name: api-network-policy namespace: production spec: podSelector: matchLabels: app: api policyTypes: - Ingress - Egress ingress: - from: - namespaceSelector: matchLabels: name: ingress-nginx ports: - protocol: TCP port: 8000 egress: - to: - namespaceSelector: matchLabels: name: database ports: - protocol: TCP port: 5432 - to: [] # DNS ports: - protocol: UDP port: 53 --- apiVersion: apps/v1 kind: Deployment metadata: name: api-deployment namespace: production spec: replicas: 3 selector: matchLabels: app: api template: metadata: labels: app: api annotations: # Security scanning annotations container.apparmor.security.beta.kubernetes.io/api: runtime/default spec: serviceAccountName: api-service-account securityContext: # Pod-level security context runAsNonRoot: true runAsUser: 1000 runAsGroup: 1000 fsGroup: 1000 seccompProfile: type: RuntimeDefault containers: - name: api image: api:secure-latest ports: - containerPort: 8000 securityContext: # Container-level security context allowPrivilegeEscalation: false readOnlyRootFilesystem: true runAsNonRoot: true runAsUser: 1000 capabilities: drop: - ALL add: - NET_BIND_SERVICE resources: requests: memory: "256Mi" cpu: "250m" limits: memory: "512Mi" cpu: "500m" env: - name: DATABASE_URL valueFrom: secretKeyRef: name: database-credentials key: url - name: JWT_SECRET_KEY valueFrom: secretKeyRef: name: jwt-secret key: secret volumeMounts: - name: tmp-volume mountPath: /tmp - name: var-log mountPath: /var/log livenessProbe: httpGet: path: /health port: 8000 initialDelaySeconds: 30 periodSeconds: 10 readinessProbe: httpGet: path: /ready port: 8000 initialDelaySeconds: 5 periodSeconds: 5 volumes: - name: tmp-volume emptyDir: {} - name: var-log emptyDir: {} --- # Pod Security Policy apiVersion: policy/v1beta1 kind: PodSecurityPolicy metadata: name: api-psp spec: privileged: false allowPrivilegeEscalation: false requiredDropCapabilities: - ALL allowedCapabilities: - NET_BIND_SERVICE volumes: - 'configMap' - 'emptyDir' - 'projected' - 'secret' - 'downwardAPI' - 'persistentVolumeClaim' runAsUser: rule: 'MustRunAsNonRoot' seLinux: rule: 'RunAsAny' fsGroup: rule: 'RunAsAny' ``` **CI/CD Security Integration** ```yaml # .github/workflows/security-pipeline.yml name: Integrated Security Pipeline on: push: branches: [main, develop] pull_request: branches: [main] jobs: security-scan: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 # 1. Code Security Scanning - name: Run Bandit Security Scan run: | pip install bandit[toml] bandit -r . -f sarif -o bandit-results.sarif - name: Run Semgrep Security Scan uses: returntocorp/semgrep-action@v1 with: config: auto generateSarif: "1" # 2. Dependency Security Scanning - name: Run Safety Check run: | pip install safety safety check --json --output safety-results.json - name: Run npm audit if: hashFiles('package.json') != '' run: | npm audit --audit-level high --json > npm-audit-results.json # 3. Container Security Scanning - name: Build Container run: docker build -t app:security-test . - name: Run Trivy Container Scan uses: aquasecurity/trivy-action@master with: image-ref: 'app:security-test' format: 'sarif' output: 'trivy-results.sarif' # 4. Infrastructure Security Scanning - name: Run Checkov IaC Scan uses: bridgecrewio/checkov-action@master with: directory: . output_format: sarif output_file_path: checkov-results.sarif # 5. Secret Scanning - name: Run TruffleHog Secret Scan uses: trufflesecurity/trufflehog@main with: path: ./ base: main head: HEAD extra_args: --format=sarif --output=trufflehog-results.sarif # 6. Upload Security Results - name: Upload SARIF results to GitHub uses: github/codeql-action/upload-sarif@v2 with: sarif_file: | bandit-results.sarif semgrep.sarif trivy-results.sarif checkov-results.sarif trufflehog-results.sarif # 7. Security Test Integration - name: Run Security Tests run: | pytest tests/security/ -v --cov=src/security # 8. Generate Security Report - name: Generate Security Dashboard run: | python scripts/generate_security_report.py \ --bandit bandit-results.sarif \ --semgrep semgrep.sarif \ --trivy trivy-results.sarif \ --safety safety-results.json \ --output security-dashboard.html - name: Upload Security Dashboard uses: actions/upload-artifact@v3 with: name: security-dashboard path: security-dashboard.html penetration-testing: runs-on: ubuntu-latest needs: security-scan if: github.ref == 'refs/heads/main' steps: - uses: actions/checkout@v4 # Start application for dynamic testing - name: Start Application run: | docker-compose -f docker-compose.test.yml up -d sleep 30 # Wait for startup # OWASP ZAP Dynamic Testing - name: Run OWASP ZAP Scan uses: zaproxy/action-full-scan@v0.4.0 with: target: 'http://localhost:8000' rules_file_name: '.zap/rules.tsv' cmd_options: '-a -j -m 10 -T 60' # API Security Testing - name: Run API Security Tests run: | pip install requests pytest pytest tests/api_security/ -v ``` **Monitoring and Alerting Integration** ```python # security_monitoring.py - Integrated with all commands import logging from datetime import datetime from typing import Dict, Any import json class IntegratedSecurityMonitor: """Security monitoring that integrates with all command outputs""" def __init__(self): self.api_endpoints = self.load_api_endpoints() # From /api-scaffold self.container_metrics = self.load_container_config() # From /docker-optimize self.k8s_security = self.load_k8s_security() # From /k8s-manifest def monitor_api_security(self): """Monitor API security events""" security_events = [] # Monitor authentication failures auth_failures = self.get_auth_failure_rate() if auth_failures > 10: # More than 10 failures per minute security_events.append({ 'type': 'AUTH_FAILURE_SPIKE', 'severity': 'HIGH', 'details': f'Authentication failure rate: {auth_failures}/min', 'recommended_action': 'Check for brute force attacks' }) # Monitor rate limiting violations rate_limit_violations = self.get_rate_limit_violations() if rate_limit_violations: security_events.append({ 'type': 'RATE_LIMIT_VIOLATION', 'severity': 'MEDIUM', 'details': f'Rate limit violations: {len(rate_limit_violations)}', 'ips': [v['ip'] for v in rate_limit_violations], 'recommended_action': 'Consider IP blocking or CAPTCHA' }) return security_events def monitor_container_security(self): """Monitor container security events""" container_events = [] # Check for privilege escalation attempts privilege_events = self.check_privilege_escalation() if privilege_events: container_events.append({ 'type': 'PRIVILEGE_ESCALATION', 'severity': 'CRITICAL', 'containers': privilege_events, 'recommended_action': 'Immediate investigation required' }) # Check for filesystem violations readonly_violations = self.check_readonly_violations() if readonly_violations: container_events.append({ 'type': 'READONLY_VIOLATION', 'severity': 'HIGH', 'violations': readonly_violations, 'recommended_action': 'Review container security policies' }) return container_events def generate_security_dashboard(self) -> Dict[str, Any]: """Generate comprehensive security dashboard""" return { 'timestamp': datetime.utcnow().isoformat(), 'api_security': self.monitor_api_security(), 'container_security': self.monitor_container_security(), 'scan_results': self.get_latest_scan_results(), 'test_results': self.get_security_test_results(), 'compliance_status': self.check_compliance_status(), 'recommendations': self.generate_recommendations() } ``` This integrated approach ensures that security is built into every aspect of the application lifecycle, from development through deployment and monitoring. ## Output Format 1. **Tool Selection Matrix**: Recommended tools based on technology stack 2. **Comprehensive Scan Results**: Multi-tool aggregated findings 3. **Executive Security Report**: Business-focused risk assessment 4. **Detailed Technical Findings**: Code-level vulnerabilities with fixes 5. **SARIF Compliance Report**: Industry-standard security report format 6. **Automated Remediation Scripts**: Ready-to-run fix implementations 7. **CI/CD Integration Workflows**: Complete GitHub Actions security pipeline 8. **Compliance Assessment**: OWASP, NIST, ISO 27001 compliance mapping 9. **Business Impact Analysis**: Risk quantification and cost estimates 10. **Monitoring and Alerting Setup**: Real-time security event detection **Key Features**: - ✅ **Multi-tool integration**: Bandit, Safety, Trivy, Semgrep, ESLint Security, Snyk - ✅ **Automated remediation**: Smart dependency updates and configuration fixes - ✅ **CI/CD ready**: Complete GitHub Actions workflows with SARIF uploads - ✅ **Business context**: Risk scoring with financial impact estimates - ✅ **Framework-specific**: Tailored security patterns for Django, Flask, React, Express - ✅ **Compliance-focused**: Built-in OWASP Top 10, CWE, and regulatory mappings - ✅ **Actionable insights**: Specific remediation code and deployment guidance Focus on actionable remediation that can be implemented immediately while maintaining application functionality.