#!/usr/bin/env python3
"""
APIShield — API Security Testing Framework
Tests REST APIs against OWASP API Security Top 10 (2023).
Author: Ali AlEnezi (@SiteQ8)
License: MIT
Version: 1.0.0
"""
import argparse
import json
import sys
import os
import re
import time
import hashlib
import datetime
import copy
import concurrent.futures
from pathlib import Path
from urllib.parse import urlparse, urljoin, parse_qs
try:
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
except ImportError:
print("[ERROR] 'requests' library required: pip install requests")
sys.exit(1)
VERSION = "1.0.0"
BANNER = r"""
___ ____ __________ __ _ __ __
/ | / __ \/ _/ ___// /_ (_)__ / /___/ /
/ /| | / /_/ // / \__ \/ __ \/ / _ \/ / __ /
/ ___ |/ ____// / ___/ / / / / / __/ / /_/ /
/_/ |_/_/ /___//____/_/ /_/_/\___/_/\__,_/
API Security Testing Framework v{ver}
OWASP API Security Top 10 (2023)
github.com/SiteQ8/APIShield
""".format(ver=VERSION)
# ─── Configuration ────────────────────────────────────────────────────
OUTPUT_DIR = Path("./apishield_reports")
class C:
"""Terminal colors."""
R="\033[91m";G="\033[92m";Y="\033[93m";B="\033[94m"
C="\033[96m";W="\033[97m";D="\033[90m";N="\033[0m";BOLD="\033[1m"
def log(msg, level="info"):
tags = {"info":(C.C,"INFO"),"pass":(C.G,"PASS"),"fail":(C.R,"FAIL"),
"warn":(C.Y,"WARN"),"data":(C.W,"DATA"),"vuln":(C.R,"VULN")}
color, tag = tags.get(level, (C.W, "INFO"))
print(f" {color}[{tag}]{C.N} {msg}")
class Session:
"""HTTP session with retry logic."""
def __init__(self, base_url, headers=None, timeout=10, verify=True):
self.base = base_url.rstrip('/')
self.timeout = timeout
self.verify = verify
self.session = requests.Session()
retry = Retry(total=2, backoff_factor=0.5, status_forcelist=[502,503,504])
self.session.mount('http://', HTTPAdapter(max_retries=retry))
self.session.mount('https://', HTTPAdapter(max_retries=retry))
if headers:
self.session.headers.update(headers)
self.session.headers.setdefault("User-Agent", "APIShield/1.0")
def request(self, method, path, **kwargs):
url = urljoin(self.base + '/', path.lstrip('/'))
kwargs.setdefault('timeout', self.timeout)
kwargs.setdefault('verify', self.verify)
try:
return self.session.request(method, url, **kwargs)
except Exception as e:
return None
def get(self, path, **kw): return self.request('GET', path, **kw)
def post(self, path, **kw): return self.request('POST', path, **kw)
def put(self, path, **kw): return self.request('PUT', path, **kw)
def delete(self, path, **kw): return self.request('DELETE', path, **kw)
def patch(self, path, **kw): return self.request('PATCH', path, **kw)
def options(self, path, **kw): return self.request('OPTIONS', path, **kw)
# ═══════════════════════════════════════════════════════════════════════
# API1:2023 — Broken Object Level Authorization (BOLA)
# ═══════════════════════════════════════════════════════════════════════
class TestBOLA:
"""API1 — Tests for IDOR / broken object-level authorization."""
ID = "API1:2023"
NAME = "Broken Object Level Authorization (BOLA)"
SEVERITY = "CRITICAL"
@staticmethod
def run(session, endpoints):
results = {"test": TestBOLA.ID, "name": TestBOLA.NAME, "severity": TestBOLA.SEVERITY, "findings": []}
log(f"{TestBOLA.ID} — {TestBOLA.NAME}", "info")
id_patterns = [
(r'/(\d+)(?:/|$)', 'numeric'),
(r'/([a-f0-9]{24})(?:/|$)', 'mongodb_objectid'),
(r'/([a-f0-9\-]{36})(?:/|$)', 'uuid'),
]
for ep in endpoints:
path = ep.get('path', '')
for pattern, id_type in id_patterns:
match = re.search(pattern, path)
if match:
original_id = match.group(1)
# Try incrementing numeric IDs
if id_type == 'numeric':
test_ids = [str(int(original_id) + i) for i in [1, -1, 100, 999]]
elif id_type == 'uuid':
test_ids = ['00000000-0000-0000-0000-000000000001']
else:
test_ids = ['000000000000000000000001']
for test_id in test_ids:
test_path = path[:match.start(1)] + test_id + path[match.end(1):]
resp = session.get(test_path)
if resp and resp.status_code == 200:
results["findings"].append({
"endpoint": test_path,
"original_id": original_id,
"test_id": test_id,
"status": resp.status_code,
"risk": "Object accessible with different ID — potential BOLA",
"remediation": "Implement object-level authorization checks on every request"
})
log(f"BOLA: {test_path} returned 200 with different ID", "vuln")
elif resp and resp.status_code == 403:
log(f"BOLA: {test_path} properly returned 403", "pass")
if not results["findings"]:
log("No BOLA vulnerabilities detected in tested endpoints", "pass")
return results
# ═══════════════════════════════════════════════════════════════════════
# API2:2023 — Broken Authentication
# ═══════════════════════════════════════════════════════════════════════
class TestBrokenAuth:
"""API2 — Tests authentication mechanism weaknesses."""
ID = "API2:2023"
NAME = "Broken Authentication"
SEVERITY = "CRITICAL"
@staticmethod
def run(session, endpoints):
results = {"test": TestBrokenAuth.ID, "name": TestBrokenAuth.NAME, "severity": TestBrokenAuth.SEVERITY, "findings": []}
log(f"{TestBrokenAuth.ID} — {TestBrokenAuth.NAME}", "info")
auth_endpoints = ['/login', '/auth', '/token', '/oauth/token', '/api/login',
'/api/auth', '/signin', '/api/signin', '/auth/login', '/v1/auth']
for ep in auth_endpoints:
# Test without auth
resp = session.get(ep)
if resp and resp.status_code not in [404, 405]:
log(f"Auth endpoint found: {ep} ({resp.status_code})", "data")
# Test weak credentials
weak_creds = [
{"username": "admin", "password": "admin"},
{"username": "admin", "password": "password"},
{"username": "admin", "password": "123456"},
{"username": "test", "password": "test"},
{"username": "api", "password": "api"},
]
for cred in weak_creds:
resp = session.post(ep, json=cred)
if resp and resp.status_code == 200:
try:
body = resp.json()
if any(k in body for k in ['token', 'access_token', 'jwt', 'session', 'key']):
results["findings"].append({
"endpoint": ep,
"credentials": f"{cred['username']}:{cred['password']}",
"risk": "Weak credentials accepted — token returned",
"remediation": "Enforce strong password policies, implement rate limiting"
})
log(f"Weak creds accepted at {ep}: {cred['username']}:{cred['password']}", "vuln")
except Exception:
pass
# Check for missing auth on protected endpoints
for ep in endpoints:
path = ep.get('path', '')
if any(s in path.lower() for s in ['/admin', '/users', '/settings', '/config', '/private']):
headers_backup = dict(session.session.headers)
session.session.headers.pop('Authorization', None)
session.session.headers.pop('X-API-Key', None)
resp = session.get(path)
session.session.headers.update(headers_backup)
if resp and resp.status_code == 200:
results["findings"].append({
"endpoint": path,
"risk": "Protected endpoint accessible without authentication",
"remediation": "Require authentication for all sensitive endpoints"
})
log(f"No auth required: {path}", "vuln")
if not results["findings"]:
log("No broken authentication issues detected", "pass")
return results
# ═══════════════════════════════════════════════════════════════════════
# API3:2023 — Broken Object Property Level Authorization
# ═══════════════════════════════════════════════════════════════════════
class TestBOPLA:
"""API3 — Tests for mass assignment and excessive data exposure."""
ID = "API3:2023"
NAME = "Broken Object Property Level Authorization"
SEVERITY = "HIGH"
@staticmethod
def run(session, endpoints):
results = {"test": TestBOPLA.ID, "name": TestBOPLA.NAME, "severity": TestBOPLA.SEVERITY, "findings": []}
log(f"{TestBOPLA.ID} — {TestBOPLA.NAME}", "info")
sensitive_fields = ['password', 'secret', 'token', 'api_key', 'apikey', 'credit_card',
'ssn', 'private_key', 'admin', 'role', 'is_admin', 'permissions',
'salary', 'bank_account', 'internal_id', 'debug', 'hash', 'salt']
for ep in endpoints:
path = ep.get('path', '')
resp = session.get(path)
if resp and resp.status_code == 200:
try:
body = resp.json()
body_str = json.dumps(body).lower()
exposed = [f for f in sensitive_fields if f in body_str]
if exposed:
results["findings"].append({
"endpoint": path,
"exposed_fields": exposed,
"risk": "Sensitive data fields in API response",
"remediation": "Filter response fields, implement view-level permissions"
})
log(f"Sensitive fields exposed at {path}: {', '.join(exposed)}", "vuln")
except Exception:
pass
# Mass assignment test — try adding admin/role fields
if ep.get('method', '').upper() in ['POST', 'PUT', 'PATCH']:
test_payloads = [
{"role": "admin", "is_admin": True},
{"permissions": ["*"], "admin": True},
]
for payload in test_payloads:
resp = session.request(ep.get('method', 'POST'), path, json=payload)
if resp and resp.status_code in [200, 201]:
try:
body = resp.json()
if any(k in json.dumps(body).lower() for k in ['admin', 'role']):
results["findings"].append({
"endpoint": path,
"payload": payload,
"risk": "Mass assignment — role/admin fields accepted",
"remediation": "Whitelist allowed input fields, reject unknown properties"
})
log(f"Mass assignment possible at {path}", "vuln")
except Exception:
pass
if not results["findings"]:
log("No property-level authorization issues detected", "pass")
return results
# ═══════════════════════════════════════════════════════════════════════
# API4:2023 — Unrestricted Resource Consumption
# ═══════════════════════════════════════════════════════════════════════
class TestResourceConsumption:
"""API4 — Tests rate limiting and resource exhaustion."""
ID = "API4:2023"
NAME = "Unrestricted Resource Consumption"
SEVERITY = "HIGH"
@staticmethod
def run(session, endpoints):
results = {"test": TestResourceConsumption.ID, "name": TestResourceConsumption.NAME,
"severity": TestResourceConsumption.SEVERITY, "findings": []}
log(f"{TestResourceConsumption.ID} — {TestResourceConsumption.NAME}", "info")
test_endpoints = endpoints[:5] if endpoints else [{'path': '/'}]
for ep in test_endpoints:
path = ep.get('path', '/')
# Rate limiting test — send 20 rapid requests
responses = []
for i in range(20):
resp = session.get(path)
if resp:
responses.append(resp.status_code)
time.sleep(0.05)
rate_limited = any(s in [429, 503] for s in responses)
rate_headers = False
# Check for rate limit headers
resp = session.get(path)
if resp:
rl_headers = ['X-RateLimit-Limit', 'X-RateLimit-Remaining', 'RateLimit-Limit',
'RateLimit-Remaining', 'X-Rate-Limit', 'Retry-After']
found = [h for h in rl_headers if h.lower() in [k.lower() for k in resp.headers]]
rate_headers = bool(found)
if not rate_limited and not rate_headers:
results["findings"].append({
"endpoint": path,
"requests_sent": 20,
"rate_limited": False,
"rate_headers": False,
"risk": "No rate limiting detected — susceptible to DoS/brute force",
"remediation": "Implement rate limiting, add X-RateLimit headers, use API gateway"
})
log(f"No rate limiting on {path} (20 requests, no 429)", "vuln")
else:
log(f"Rate limiting present on {path}", "pass")
# Large payload test
large_payload = {"data": "A" * 100000}
resp = session.post(path, json=large_payload)
if resp and resp.status_code in [200, 201]:
results["findings"].append({
"endpoint": path,
"risk": "Large payload accepted (100KB) — no size limit",
"remediation": "Set maximum request body size, validate input length"
})
log(f"Large payload accepted at {path}", "warn")
# Pagination abuse
resp = session.get(path, params={"limit": 999999, "page_size": 999999, "per_page": 999999})
if resp and resp.status_code == 200:
try:
body = resp.json()
if isinstance(body, list) and len(body) > 100:
results["findings"].append({
"endpoint": path,
"risk": "Pagination bypass — excessive records returned",
"remediation": "Enforce maximum page size server-side"
})
log(f"Pagination bypass at {path}: {len(body)} records", "vuln")
except Exception:
pass
if not results["findings"]:
log("Rate limiting and resource controls appear adequate", "pass")
return results
# ═══════════════════════════════════════════════════════════════════════
# API5:2023 — Broken Function Level Authorization
# ═══════════════════════════════════════════════════════════════════════
class TestBFLA:
"""API5 — Tests for horizontal/vertical privilege escalation."""
ID = "API5:2023"
NAME = "Broken Function Level Authorization"
SEVERITY = "CRITICAL"
@staticmethod
def run(session, endpoints):
results = {"test": TestBFLA.ID, "name": TestBFLA.NAME, "severity": TestBFLA.SEVERITY, "findings": []}
log(f"{TestBFLA.ID} — {TestBFLA.NAME}", "info")
admin_paths = ['/admin', '/api/admin', '/api/v1/admin', '/admin/users', '/admin/config',
'/api/admin/settings', '/management', '/internal', '/debug', '/actuator',
'/actuator/health', '/actuator/env', '/swagger-ui.html', '/api-docs',
'/graphql', '/.env', '/config', '/api/config', '/metrics', '/health',
'/status', '/info', '/api/internal', '/v1/admin', '/v2/admin']
for path in admin_paths:
for method in ['GET', 'POST', 'PUT', 'DELETE']:
resp = session.request(method, path)
if resp and resp.status_code == 200:
results["findings"].append({
"endpoint": path,
"method": method,
"status": 200,
"risk": f"Administrative endpoint accessible via {method}",
"remediation": "Restrict admin functions to authorized roles only"
})
log(f"Admin endpoint accessible: {method} {path}", "vuln")
break
elif resp and resp.status_code == 403:
log(f"Properly restricted: {method} {path}", "pass")
break
# HTTP method tampering
for ep in endpoints[:5]:
path = ep.get('path', '')
original_method = ep.get('method', 'GET')
test_methods = ['DELETE', 'PUT', 'PATCH'] if original_method == 'GET' else ['GET']
for method in test_methods:
resp = session.request(method, path)
if resp and resp.status_code in [200, 201, 204]:
results["findings"].append({
"endpoint": path,
"original_method": original_method,
"test_method": method,
"risk": f"Unexpected {method} method allowed on {original_method} endpoint",
"remediation": "Restrict HTTP methods per endpoint"
})
log(f"Method tampering: {method} {path} returned {resp.status_code}", "warn")
if not results["findings"]:
log("No function-level authorization issues detected", "pass")
return results
# ═══════════════════════════════════════════════════════════════════════
# API6:2023 — Unrestricted Access to Sensitive Business Flows
# ═══════════════════════════════════════════════════════════════════════
class TestSensitiveFlows:
"""API6 — Tests for abuse of business logic flows."""
ID = "API6:2023"
NAME = "Unrestricted Access to Sensitive Business Flows"
SEVERITY = "MEDIUM"
@staticmethod
def run(session, endpoints):
results = {"test": TestSensitiveFlows.ID, "name": TestSensitiveFlows.NAME,
"severity": TestSensitiveFlows.SEVERITY, "findings": []}
log(f"{TestSensitiveFlows.ID} — {TestSensitiveFlows.NAME}", "info")
sensitive_patterns = ['purchase', 'checkout', 'transfer', 'payment', 'order',
'register', 'signup', 'reset', 'forgot', 'verify', 'otp',
'coupon', 'discount', 'redeem', 'vote', 'comment', 'review']
for ep in endpoints:
path = ep.get('path', '').lower()
if any(p in path for p in sensitive_patterns):
# Check if CAPTCHA or anti-automation is required
resp = session.get(path)
if resp:
body_str = resp.text.lower()
has_captcha = any(c in body_str for c in ['captcha', 'recaptcha', 'hcaptcha', 'turnstile'])
has_csrf = any(h.lower() in [k.lower() for k in resp.headers]
for h in ['X-CSRF-Token', 'X-XSRF-Token'])
if not has_captcha and not has_csrf:
matched = [p for p in sensitive_patterns if p in path]
results["findings"].append({
"endpoint": path,
"matched_patterns": matched,
"captcha": has_captcha,
"csrf": has_csrf,
"risk": "Sensitive flow without anti-automation controls",
"remediation": "Add CAPTCHA, rate limiting, or step-up auth for sensitive operations"
})
log(f"Sensitive flow unprotected: {path} (no CAPTCHA/CSRF)", "warn")
if not results["findings"]:
log("No unprotected sensitive business flows detected", "pass")
return results
# ═══════════════════════════════════════════════════════════════════════
# API7:2023 — Server Side Request Forgery (SSRF)
# ═══════════════════════════════════════════════════════════════════════
class TestSSRF:
"""API7 — Tests for SSRF vulnerabilities."""
ID = "API7:2023"
NAME = "Server Side Request Forgery (SSRF)"
SEVERITY = "HIGH"
@staticmethod
def run(session, endpoints):
results = {"test": TestSSRF.ID, "name": TestSSRF.NAME, "severity": TestSSRF.SEVERITY, "findings": []}
log(f"{TestSSRF.ID} — {TestSSRF.NAME}", "info")
ssrf_params = ['url', 'uri', 'link', 'href', 'src', 'source', 'redirect', 'redirect_url',
'callback', 'webhook', 'proxy', 'target', 'dest', 'destination', 'fetch',
'load', 'image', 'img', 'file', 'path', 'page', 'feed', 'host', 'site']
internal_targets = [
'http://127.0.0.1', 'http://localhost', 'http://0.0.0.0',
'http://169.254.169.254/latest/meta-data/', # AWS IMDS
'http://metadata.google.internal/', # GCP
'http://169.254.169.254/metadata/instance', # Azure
'http://[::1]',
]
for ep in endpoints:
path = ep.get('path', '')
for param in ssrf_params:
for target in internal_targets[:3]:
resp = session.get(path, params={param: target})
if resp and resp.status_code == 200:
body = resp.text[:500]
# Check for indicators of internal access
indicators = ['ami-id', 'instance-id', 'local-ipv4', 'metadata',
'computeMetadata', 'root', 'localhost', '127.0.0.1']
if any(ind in body.lower() for ind in indicators):
results["findings"].append({
"endpoint": path,
"parameter": param,
"payload": target,
"risk": "Potential SSRF — internal resource accessible via URL parameter",
"remediation": "Validate and whitelist URLs, block internal IP ranges, disable redirects"
})
log(f"SSRF: {path}?{param}={target} returned internal data", "vuln")
if not results["findings"]:
log("No SSRF vulnerabilities detected", "pass")
return results
# ═══════════════════════════════════════════════════════════════════════
# API8:2023 — Security Misconfiguration
# ═══════════════════════════════════════════════════════════════════════
class TestMisconfig:
"""API8 — Tests for security misconfiguration."""
ID = "API8:2023"
NAME = "Security Misconfiguration"
SEVERITY = "MEDIUM"
@staticmethod
def run(session, endpoints):
results = {"test": TestMisconfig.ID, "name": TestMisconfig.NAME,
"severity": TestMisconfig.SEVERITY, "findings": []}
log(f"{TestMisconfig.ID} — {TestMisconfig.NAME}", "info")
base_resp = session.get('/')
if not base_resp:
log("Could not reach target", "fail")
return results
# CORS misconfiguration
resp = session.get('/', headers={'Origin': 'https://evil.com'})
if resp:
acao = resp.headers.get('Access-Control-Allow-Origin', '')
if acao == '*' or acao == 'https://evil.com':
results["findings"].append({
"check": "CORS",
"value": acao,
"risk": "Overly permissive CORS — reflects arbitrary origin or uses wildcard",
"remediation": "Whitelist specific trusted origins, avoid wildcard with credentials"
})
log(f"CORS misconfiguration: Access-Control-Allow-Origin: {acao}", "vuln")
else:
log("CORS properly configured", "pass")
# Security headers
security_headers = {
'Strict-Transport-Security': 'Missing HSTS header',
'X-Content-Type-Options': 'Missing X-Content-Type-Options',
'X-Frame-Options': 'Missing clickjacking protection',
'Content-Security-Policy': 'Missing CSP header',
'X-XSS-Protection': 'Missing XSS protection header',
'Cache-Control': 'Missing cache control for API responses',
}
for header, risk in security_headers.items():
if header.lower() not in [h.lower() for h in base_resp.headers]:
results["findings"].append({
"check": "Security Header",
"header": header,
"risk": risk,
"remediation": f"Add {header} header to all API responses"
})
log(f"Missing header: {header}", "warn")
# Server information disclosure
server = base_resp.headers.get('Server', '')
powered = base_resp.headers.get('X-Powered-By', '')
if server:
results["findings"].append({
"check": "Information Disclosure",
"header": "Server",
"value": server,
"risk": "Server version disclosed in headers",
"remediation": "Remove or obfuscate Server header"
})
log(f"Server disclosed: {server}", "warn")
if powered:
results["findings"].append({
"check": "Information Disclosure",
"header": "X-Powered-By",
"value": powered,
"risk": "Technology stack disclosed via X-Powered-By",
"remediation": "Remove X-Powered-By header"
})
log(f"X-Powered-By disclosed: {powered}", "warn")
# Verbose errors
error_paths = ['/api/undefined', '/api/../../etc/passwd', '/api/' + 'A' * 5000]
for ep in error_paths:
resp = session.get(ep)
if resp and resp.status_code >= 400:
body = resp.text.lower()
if any(ind in body for ind in ['stack trace', 'traceback', 'exception',
'at line', 'sql', 'syntax error', 'debug']):
results["findings"].append({
"check": "Verbose Errors",
"endpoint": ep[:80],
"risk": "Detailed error messages expose internal information",
"remediation": "Return generic error messages, log details server-side"
})
log(f"Verbose error response at {ep[:60]}", "vuln")
# HTTP methods
resp = session.options('/')
if resp and 'Allow' in resp.headers:
allowed = resp.headers['Allow']
dangerous = [m for m in ['TRACE', 'TRACK', 'DEBUG'] if m in allowed.upper()]
if dangerous:
results["findings"].append({
"check": "Dangerous Methods",
"methods": dangerous,
"risk": f"Dangerous HTTP methods enabled: {', '.join(dangerous)}",
"remediation": "Disable TRACE, TRACK, and DEBUG methods"
})
log(f"Dangerous methods: {', '.join(dangerous)}", "vuln")
if not results["findings"]:
log("No security misconfigurations detected", "pass")
return results
# ═══════════════════════════════════════════════════════════════════════
# API9:2023 — Improper Inventory Management
# ═══════════════════════════════════════════════════════════════════════
class TestInventory:
"""API9 — Tests for exposed old/debug/undocumented endpoints."""
ID = "API9:2023"
NAME = "Improper Inventory Management"
SEVERITY = "MEDIUM"
@staticmethod
def run(session, endpoints):
results = {"test": TestInventory.ID, "name": TestInventory.NAME,
"severity": TestInventory.SEVERITY, "findings": []}
log(f"{TestInventory.ID} — {TestInventory.NAME}", "info")
discovery_paths = [
'/swagger.json', '/swagger.yaml', '/openapi.json', '/openapi.yaml',
'/api-docs', '/api/docs', '/docs', '/swagger-ui.html', '/swagger-ui/',
'/redoc', '/api/swagger', '/v1/swagger.json', '/v2/swagger.json',
'/.well-known/openapi.yaml', '/graphql', '/graphiql', '/playground',
'/api/v1', '/api/v2', '/api/v3', '/api/beta', '/api/staging',
'/api/test', '/api/dev', '/api/debug', '/api/internal',
'/healthz', '/readyz', '/livez', '/health', '/status', '/info',
'/metrics', '/prometheus', '/actuator', '/actuator/health',
'/actuator/env', '/actuator/beans', '/actuator/mappings',
'/debug/pprof', '/debug/vars', '/_debug', '/trace',
'/.git/config', '/.env', '/robots.txt', '/sitemap.xml',
'/wp-admin', '/admin', '/phpinfo.php', '/server-status',
]
found = []
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
future_map = {executor.submit(session.get, p): p for p in discovery_paths}
for future in concurrent.futures.as_completed(future_map):
path = future_map[future]
try:
resp = future.result()
if resp and resp.status_code == 200:
found.append(path)
category = 'documentation' if any(s in path for s in ['swagger', 'openapi', 'docs', 'redoc', 'graphql']) else \
'debug' if any(s in path for s in ['debug', 'actuator', 'pprof', 'phpinfo']) else \
'versioning' if any(s in path for s in ['/v1', '/v2', '/v3', '/beta', '/staging', '/test', '/dev']) else \
'sensitive' if any(s in path for s in ['.git', '.env', 'admin', 'wp-admin']) else 'info'
risk_level = 'high' if category in ['debug', 'sensitive'] else 'medium'
results["findings"].append({
"endpoint": path,
"category": category,
"risk_level": risk_level,
"risk": f"Exposed {category} endpoint accessible",
"remediation": "Restrict access to documentation/debug endpoints in production"
})
log(f"Exposed: {path} [{category}]", "vuln" if risk_level == 'high' else "warn")
except Exception:
pass
if not results["findings"]:
log("No exposed inventory/debug endpoints detected", "pass")
else:
log(f"Found {len(found)} exposed endpoints", "data")
return results
# ═══════════════════════════════════════════════════════════════════════
# API10:2023 — Unsafe Consumption of APIs
# ═══════════════════════════════════════════════════════════════════════
class TestUnsafeConsumption:
"""API10 — Tests for injection via API inputs."""
ID = "API10:2023"
NAME = "Unsafe Consumption of APIs"
SEVERITY = "HIGH"
@staticmethod
def run(session, endpoints):
results = {"test": TestUnsafeConsumption.ID, "name": TestUnsafeConsumption.NAME,
"severity": TestUnsafeConsumption.SEVERITY, "findings": []}
log(f"{TestUnsafeConsumption.ID} — {TestUnsafeConsumption.NAME}", "info")
injection_payloads = {
"sql": ["' OR '1'='1", "1; DROP TABLE users--", "' UNION SELECT NULL--", "1' AND '1'='1"],
"nosql": ['{"$gt":""}', '{"$ne":""}', '{"$regex":".*"}'],
"xss": ['', '">
'],
"ssti": ['{{7*7}}', '${7*7}', '<%= 7*7 %>'],
"command": ['; ls', '| id', '`id`', '$(id)'],
"path": ['../../../etc/passwd', '..\\..\\..\\windows\\system32\\config\\sam'],
}
for ep in endpoints[:5]:
path = ep.get('path', '')
# Test via query parameters
for category, payloads in injection_payloads.items():
for payload in payloads[:2]:
resp = session.get(path, params={"id": payload, "search": payload, "q": payload})
if resp:
body = resp.text.lower()
indicators = {
"sql": ['sql', 'syntax', 'mysql', 'postgresql', 'sqlite', 'oracle', 'mssql'],
"xss": ['