# -*- coding: utf-8 -*- # # APIKnum++ - Multi-provider API key scanner for Burp Suite (Jython) # # Features: # - Passive & active scanning for many API key formats. # - For each detected key, runs a validation HTTP request roughly # equivalent to the "curl" examples in keyhacks. # - Caches results per (provider, key) so that: # * Passive scan only validates each key once. # * Active scan reuses cached results instead of re-validating. # - Raises: # * High/Medium issues for confirmed valid keys. # * Information issues for detected keys where validation fails, # is invalid, or needs additional context. # - Highlights exact locations of keys in request/response using markers. # - Logs detections and validation results to the Extender output. # # NOTE (Google Maps JavaScript API): # This extension does NOT load the Maps JS API or render HTML. # It only calls a Google HTTP API (Geocoding) from the scanner environment. # That can tell you: # - The key is active and usable from the scanner IP. # - Or it is restricted / invalid (based on error messages). # It CANNOT fully validate browser-side referrer restrictions. # The issue text for Google keys explains how to manually verify JS API # restrictions in a browser. from burp import IBurpExtender, IScannerCheck, IScanIssue from java.net import URL, HttpURLConnection from java.util import ArrayList import threading import re import base64 from array import array DEBUG = False HTTP_TIMEOUT_MS = 8000 # 8s network timeout per validation # ------------- Provider definitions ------------- PROVIDER_DEFS = [ { "id": "slack_webhook", "name": "Slack Incoming Webhook", "regexes": [ r"https://hooks\.slack\.com/services/[A-Z0-9]+/[A-Z0-9]+/[A-Za-z0-9]+" ], "validator": "validate_slack_webhook", }, { "id": "slack_api_token", "name": "Slack API token", "regexes": [ r"xox[abprsxo]-[0-9A-Za-z\-]+", ], "validator": "validate_slack_api_token", }, { "id": "github_token", "name": "GitHub Personal Access Token", "regexes": [ r"ghp_[0-9A-Za-z]{36}", r"gho_[0-9A-Za-z]{36}", r"ghu_[0-9A-Za-z]{36}", r"ghs_[0-9A-Za-z]{36}", r"ghr_[0-9A-Za-z]{36}", ], "validator": "validate_github_token", }, { "id": "sendgrid_token", "name": "SendGrid API Token", "regexes": [ r"SG\.[0-9A-Za-z_\-]{20,}\.[0-9A-Za-z_\-]{20,}", ], "validator": "validate_sendgrid_token", }, { "id": "square_access_token", "name": "Square Access Token", "regexes": [ r"EAAA[a-zA-Z0-9]{60}", ], "validator": "validate_square_access_token", }, { "id": "hubspot_hapikey", "name": "HubSpot hapikey", "regexes": [ r"\b[0-9a-fA-F]{32}\b", ], "context_keywords": ["hapikey", "hubapi.com", "hubspot"], "validator": "validate_hubspot_hapikey", "needs_context": False, }, { "id": "infura_api_key", "name": "Infura API key", "regexes": [ r"https://mainnet\.infura\.io/v3/[0-9a-fA-F]{32}", ], "validator": "validate_infura_v3_key_from_url", }, { "id": "dropbox_api_token", "name": "Dropbox API Token", "regexes": [ r"sl\.[A-Za-z0-9\-_]{30,}", ], "validator": "validate_dropbox_token", }, { "id": "pendo_integration_key", "name": "Pendo Integration Key", "regexes": [ r"[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}", ], "context_keywords": ["x-pendo-integration-key", "pendo"], "validator": "validate_pendo_integration_key", "needs_context": False, }, { "id": "twilio_sid_authtoken", "name": "Twilio Account SID (requires Auth Token context)", "regexes": [ r"AC[0-9a-fA-F]{32}", ], "validator": "validate_twilio_sid_and_token", "needs_context": True, }, { "id": "stripe_secret_key", "name": "Stripe Secret Key", "regexes": [ r"sk_live_[0-9a-zA-Z]{20,40}", r"sk_test_[0-9a-zA-Z]{20,40}", ], "validator": "validate_stripe_secret_key", }, { "id": "google_api_key", "name": "Google API Key (Maps/Geocoding etc.)", "regexes": [ r"\bAIza[0-9A-Za-z\-_]{35}\b", ], "validator": "validate_google_api_key", "needs_context": False, }, { "id": "mailgun_private_key", "name": "Mailgun Private API Key", "regexes": [ r"\bkey-[0-9A-Za-z]{32}\b", ], "validator": "validate_mailgun_private_key", }, { "id": "mailchimp_api_key", "name": "Mailchimp API Key", "regexes": [ r"\b[0-9a-f]{32}-us(?:0?[1-9]|1[0-3])\b", ], "validator": "validate_mailchimp_api_key", }, { "id": "mapbox_access_token", "name": "Mapbox Access Token (pk./sk.)", "regexes": [ r"\bpk\.[A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]+\b", r"\bsk\.[A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]+\b", ], "validator": "validate_mapbox_access_token", }, { "id": "gitlab_pat", "name": "GitLab Personal Access Token", "regexes": [ r"\bglpat-[0-9A-Za-z\-_]{20,}\b", ], "validator": "validate_gitlab_pat", }, { "id": "telegram_bot_token", "name": "Telegram Bot Token", "regexes": [ r"\b[0-9]{6,12}:[A-Za-z0-9_-]{30,50}\b", ], "validator": "validate_telegram_bot_token", }, { "id": "discord_bot_token", "name": "Discord Bot Token", "regexes": [ r"\b[MN][A-Za-z0-9\-_]{23,28}\.[A-Za-z0-9\-_]{6}\.[A-Za-z0-9\-_]{27,}\b", ], "validator": "validate_discord_bot_token", }, { "id": "npm_token", "name": "NPM Access Token", "regexes": [ r"\bnpm_[A-Za-z0-9]{32,48}\b", ], "validator": "validate_npm_token", }, # --- new providers from keyhacks --- { "id": "google_recaptcha_secret", "name": "Google reCAPTCHA secret key", "regexes": [ r"\b6[0-9A-Za-z_-]{39}\b", ], "validator": "validate_google_recaptcha_secret", "needs_context": True, }, { "id": "zapier_webhook", "name": "Zapier Webhook URL", "regexes": [ r"https://hooks\.zapier\.com/hooks/catch/[0-9]+/[0-9A-Za-z]+/?", ], "validator": "validate_zapier_webhook", }, { "id": "pagerduty_api_token", "name": "PagerDuty API token", "regexes": [ r"Token token=[0-9A-Za-z]{16,40}", ], "validator": "validate_pagerduty_api_token", }, { "id": "wpengine_api_key", "name": "WPEngine API Key", "regexes": [ r"wpe_apikey=[0-9A-Za-z]{16,64}", ], "validator": "validate_wpengine_api_key", "needs_context": True, }, { "id": "datadog_api_key", "name": "DataDog API key", "regexes": [ r"https://api\.datadoghq\.com/api/v1/dashboard\?api_key=[0-9a-fA-F]{32}&application_key=", ], "validator": "validate_datadog_api_key", "needs_context": True, }, { "id": "wakatime_api_key", "name": "WakaTime API Key", "regexes": [ r"https://wakatime\.com/api/v1/users/current\?api_key=[0-9a-fA-F]{32}", ], "validator": "validate_wakatime_api_key", }, { "id": "newrelic_rest_api_key", "name": "New Relic REST API Key", "regexes": [ r"X-Api-Key:[ \t]*[0-9A-Za-z]{20,40}", ], "validator": "validate_newrelic_rest_api_key", }, ] class BurpExtender(IBurpExtender, IScannerCheck): def registerExtenderCallbacks(self, callbacks): self._callbacks = callbacks self._helpers = callbacks.getHelpers() callbacks.setExtensionName("APIKnum++: Multi-API Key Scanner") # Build provider objects with compiled regexes self._providers = [] for pdef in PROVIDER_DEFS: compiled = [] for pat in pdef["regexes"]: try: compiled.append(re.compile(pat)) except Exception as e: callbacks.printError( "Error compiling regex for provider %s: %s" % (pdef["id"], str(e)) ) p = { "id": pdef["id"], "name": pdef["name"], "validator": pdef["validator"], "regexes": compiled, "needs_context": pdef.get("needs_context", False), } self._providers.append(p) # cache: (provider_id, key) -> {status, detail} self._tested = {} self._lock = threading.RLock() callbacks.registerScannerCheck(self) callbacks.printOutput("[+] APIKnum++ loaded with %d providers" % len(self._providers)) return # ------------- IScannerCheck methods ------------- def doPassiveScan(self, baseRequestResponse): try: text, is_authenticated = self._extract_text_and_auth_context(baseRequestResponse) if not text: return None matches = self._find_provider_matches_in_text(text) if not matches: return None url = self._helpers.analyzeRequest(baseRequestResponse).getUrl() for provider, key in matches: prov_id = provider["id"] snippet = key[:3] + "..." + key[-3:] with self._lock: state = self._tested.get((prov_id, key)) if state is None: self._tested[(prov_id, key)] = {"status": "pending"} self._callbacks.printOutput( "[APIKnum++] Passive: detected %s key (%s) at %s - scheduling validation" % (prov_id, snippet, url) ) t = threading.Thread( target=self._run_provider_validation, args=(provider, key, baseRequestResponse, is_authenticated), ) t.setDaemon(True) t.start() else: self._callbacks.printOutput( "[APIKnum++] Passive: %s key (%s) at %s already seen (status=%s), skipping re-validation" % (prov_id, snippet, url, state.get("status")) ) return None except Exception as e: self._callbacks.printError("Error in doPassiveScan: %s" % e) return None def doActiveScan(self, baseRequestResponse, insertionPoint): try: text, is_authenticated = self._extract_text_and_auth_context(baseRequestResponse) if not text: return None matches = self._find_provider_matches_in_text(text) if not matches: return None issues = [] url = self._helpers.analyzeRequest(baseRequestResponse).getUrl() for provider, key in matches: prov_id = provider["id"] validator_name = provider["validator"] vfunc = getattr(self, validator_name, None) if vfunc is None: continue snippet = key[:3] + "..." + key[-3:] with self._lock: cache = self._tested.get((prov_id, key)) if cache and cache.get("status") not in (None, "pending"): status = cache["status"] info = cache.get("detail", "") self._callbacks.printOutput( "[APIKnum++] Active: detected %s key (%s) at %s - reusing cached status=%s" % (prov_id, snippet, url, status) ) else: self._callbacks.printOutput( "[APIKnum++] Active: detected %s key (%s) at %s - validating" % (prov_id, snippet, url) ) status, info = vfunc(key) self._callbacks.printOutput( "[APIKnum++] Active validation result %s key (%s): %s" % (prov_id, snippet, status) ) with self._lock: self._tested[(prov_id, key)] = { "status": status, "detail": info, } status_flag = self._status_to_flag(status, provider) issue = self._build_issue(provider, key, info, baseRequestResponse, is_authenticated, status_flag) if issue is not None: issues.append(issue) if not issues: return None j_issues = ArrayList() for i in issues: j_issues.add(i) return j_issues except Exception as e: self._callbacks.printError("Error in doActiveScan: %s" % e) return None def consolidateDuplicateIssues(self, existingIssue, newIssue): if existingIssue.getIssueName() == newIssue.getIssueName() and \ existingIssue.getUrl() == newIssue.getUrl(): return -1 return 0 # ------------- Common scanning helpers ------------- def _extract_text_and_auth_context(self, baseRequestResponse): req_bytes = baseRequestResponse.getRequest() req_str = self._helpers.bytesToString(req_bytes) resp_bytes = baseRequestResponse.getResponse() if resp_bytes is None: text = req_str else: resp_str = self._helpers.bytesToString(resp_bytes) text = req_str + "\n" + resp_str analyzed = self._helpers.analyzeRequest(baseRequestResponse) headers = analyzed.getHeaders() is_authenticated = False for h in headers: hl = h.lower() if hl.startswith("cookie:") or hl.startswith("authorization:"): is_authenticated = True break return text, is_authenticated def _find_provider_matches_in_text(self, text): matches = [] tlower = text.lower() for provider in self._providers: ctx_keywords = provider.get("context_keywords") for cregex in provider["regexes"]: for m in cregex.finditer(text): if ctx_keywords: start = max(0, m.start() - 80) end = min(len(text), m.end() + 80) window = tlower[start:end] ok = False for kw in ctx_keywords: if kw.lower() in window: ok = True break if not ok: continue key = m.group(0) matches.append((provider, key)) return matches def _run_provider_validation(self, provider, key, baseRequestResponse, is_authenticated): prov_id = provider["id"] validator_name = provider["validator"] try: vfunc = getattr(self, validator_name, None) if vfunc is None: self._callbacks.printError( "[APIKnum++] No validator for provider %s" % prov_id ) with self._lock: self._tested[(prov_id, key)] = { "status": "error", "detail": "validator-missing", } return snippet = key[:3] + "..." + key[-3:] self._callbacks.printOutput( "[APIKnum++] Passive: validating %s key (%s)" % (prov_id, snippet) ) status, info = vfunc(key) self._callbacks.printOutput( "[APIKnum++] Passive validation result %s key (%s): %s" % (prov_id, snippet, status) ) with self._lock: self._tested[(prov_id, key)] = { "status": status, "detail": info, } status_flag = self._status_to_flag(status, provider) issue = self._build_issue(provider, key, info, baseRequestResponse, is_authenticated, status_flag) if issue is not None: self._callbacks.addScanIssue(issue) except Exception as e: self._callbacks.printError( "[APIKnum++] Error validating key for provider %s: %s" % (prov_id, e) ) with self._lock: self._tested[(prov_id, key)] = { "status": "error", "detail": str(e), } def _status_to_flag(self, status, provider): # Normalized statuses: # - invalid: not a real key / token rejected # - restricted: key exists but is blocked by ACL/scope/origin/IP/referrer restrictions # - unrestricted: key works from scanner environment (highest risk) # - needs_context: cannot validate without extra identifiers/secrets # - unknown/error: validator could not decide if status in ("unrestricted", "restricted", "invalid", "error", "unknown"): return status if status == "valid": return "unrestricted" if status == "unknown" and provider.get("needs_context", False): return "needs_context" return "unknown" def _build_issue(self, provider, key, info, baseRequestResponse, is_authenticated, status_flag): analyzed = self._helpers.analyzeRequest(baseRequestResponse) url = analyzed.getUrl() snippet = key prov_name = provider["name"] if status_flag == "unrestricted": if is_authenticated: severity = "Medium" context = "Valid key/token confirmed from the scanner environment. It appears in an authenticated request/response." else: severity = "Medium" context = "Valid key/token confirmed from the scanner environment and appears in content that looks unauthenticated." name = "UNRESTRICTED %s (APIKnum++)" % prov_name validation_line = info confidence = "Firm" elif status_flag == "restricted": severity = "Information" context = ( "Valid key/token confirmed, but provider-side restrictions (IP/origin/referrer, scopes, SSO, or ACLs) " "appear to block use from the scanner environment. This is still a secret exposure." ) name = "RESTRICTED %s (APIKnum++)" % prov_name validation_line = info confidence = "Firm" elif status_flag == "needs_context": severity = "Information" context = ( "A provider-specific identifier/secret is required to validate this credential (for example paired secrets, " "project/tenant configuration, or provider ACLs). Treat as potential exposure and validate manually." ) name = "%s requires manual verification (APIKnum++)" % prov_name validation_line = info confidence = "Tentative" elif status_flag == "error": severity = "Information" context = ( "The key was detected, but the validator encountered an error (network/provider " "issue). Treat this as a potential key exposure and validate manually." ) name = "%s detected - validation error (APIKnum++)" % prov_name validation_line = info confidence = "Tentative" else: # "unknown" return None detail = ( "APIKnum++ identified an API credential.
\n" "Provider: %s
\n" "Identified key: %s
\n" "Validation / notes: %s
\n" "Context: %s\n" ) % (prov_name, snippet, validation_line, context) if provider["id"] == "google_api_key": detail += ( "\n

Additional note for Google Maps JavaScript API: " "This extension only tests the key via an HTTP API from the scanner IP. " "To verify browser-side referrer restrictions, create a minimal HTML page that loads " "https://maps.googleapis.com/maps/api/js?key=YOUR_KEY&callback=initMap " "from the target origin and observe whether the map loads or errors such as " "'RefererNotAllowedMapError' are shown in the browser console." ) remediation = ( "Rotate the leaked key and ensure it is not exposed in client-side " "or otherwise untrusted locations. Review provider-side access controls " "(IP/referrer restrictions, scopes, environment segregation) and tighten " "them as appropriate." ) # Build markers for key occurrences in request/response req_bytes = baseRequestResponse.getRequest() resp_bytes = baseRequestResponse.getResponse() key_bytes = key.encode("utf-8") req_markers = ArrayList() resp_markers = ArrayList() if req_bytes is not None: self._add_markers_for_bytes(req_bytes, key_bytes, req_markers) if resp_bytes is not None: self._add_markers_for_bytes(resp_bytes, key_bytes, resp_markers) marked_rr = self._callbacks.applyMarkers( baseRequestResponse, req_markers if not req_markers.isEmpty() else None, resp_markers if not resp_markers.isEmpty() else None, ) http_messages = [marked_rr] return APIKeyLeakIssue( baseRequestResponse.getHttpService(), url, http_messages, name, detail, remediation, severity, confidence, ) def _add_markers_for_bytes(self, msg_bytes, key_bytes, marker_list, max_markers=10): helpers = self._helpers offset = 0 count = 0 total_len = len(msg_bytes) key_len = len(key_bytes) while count < max_markers and offset < total_len: idx = helpers.indexOf(msg_bytes, key_bytes, True, offset, total_len) if idx == -1: break marker_list.add(array('i', [idx, idx + key_len])) count += 1 offset = idx + key_len # ------------- HTTP helper ------------- def _http_request(self, method, url_str, headers=None, body=None): try: url = URL(url_str) conn = url.openConnection() conn.setConnectTimeout(HTTP_TIMEOUT_MS) conn.setReadTimeout(HTTP_TIMEOUT_MS) if isinstance(conn, HttpURLConnection): conn.setRequestMethod(method) if headers: for k, v in headers.items(): conn.setRequestProperty(k, v) if body is not None: conn.setDoOutput(True) out = conn.getOutputStream() out.write(body.encode("utf-8")) out.flush() out.close() status = conn.getResponseCode() try: stream = conn.getInputStream() except: stream = conn.getErrorStream() if stream is None: return status, "" import java.io as io reader = io.BufferedReader(io.InputStreamReader(stream, "UTF-8")) buf = [] line = reader.readLine() while line is not None: buf.append(line) line = reader.readLine() stream.close() return status, "\n".join(buf) except Exception as e: if DEBUG: self._callbacks.printError("[HTTP helper] %s" % e) return None, None # ------------- Validators ------------- def validate_slack_webhook(self, url): headers = {"Content-Type": "application/json"} body = '{"text":""}' status, resp_body = self._http_request("POST", url, headers, body) if status is None: return "unknown", "No response from Slack webhook" if "missing_text_or_fallback_or_attachments" in resp_body: return "unrestricted", "Slack webhook responded with missing_text_or_fallback_or_attachments (valid endpoint)" if status == 404 or "invalid" in resp_body.lower(): return "invalid", "Slack webhook returned error (%d)" % status return "unknown", "Slack webhook response (%d) did not match expected pattern" % status def validate_slack_api_token(self, token): headers = { "Accept": "application/json; charset=utf-8", "Authorization": "Bearer " + token, } status, body = self._http_request("POST", "https://slack.com/api/auth.test", headers, "") if status is None: return "unknown", "No response from Slack API" bl = body.lower() if '"ok":true' in bl.replace(" ", "") or '"ok": true' in bl: return "unrestricted", "Slack auth.test returned ok=true (token accepted)" if "invalid_auth" in bl or "not_authed" in bl: return "invalid", "Slack auth.test reported invalid_auth/not_authed" return "unknown", "Slack auth.test HTTP %d" % status def validate_github_token(self, token): headers = { "Authorization": "token " + token, "User-Agent": "APIKnumPlusPlus", } status, body = self._http_request("GET", "https://api.github.com/user", headers, None) if status is None: return "unknown", "No response from GitHub" if status == 200 and '"login"' in body: return "unrestricted", "GitHub /user returned 200 with login field (token accepted)" if status == 401: return "invalid", "GitHub returned 401 (invalid/expired token)" if status in (403, 404): return "restricted", "GitHub returned %d (token valid but insufficient permissions / SSO / scope)" % status return "unknown", "GitHub /user HTTP %d" % status def validate_sendgrid_token(self, token): headers = { "Authorization": "Bearer " + token, "Content-Type": "application/json", } status, body = self._http_request("GET", "https://api.sendgrid.com/v3/scopes", headers, None) if status is None: return "unknown", "No response from SendGrid" if status == 200 and "scopes" in body: return "unrestricted", "SendGrid /v3/scopes returned scopes (token accepted)" if status in (401, 403): return "invalid", "SendGrid returned %d (unauthorized)" % status return "unknown", "SendGrid /v3/scopes HTTP %d" % status def validate_square_access_token(self, token): headers = {"Authorization": "Bearer " + token} status, body = self._http_request("GET", "https://connect.squareup.com/v2/locations", headers, None) if status is None: return "unknown", "No response from Square" if status == 200 and '"locations"' in body: return "unrestricted", "Square /v2/locations returned locations (token accepted)" if status == 401 or "UNAUTHORIZED" in body: return "invalid", "Square reported unauthorized" return "unknown", "Square /v2/locations HTTP %d" % status def validate_hubspot_hapikey(self, key): url = "https://api.hubapi.com/owners/v2/owners?hapikey=%s" % key status, body = self._http_request("GET", url, None, None) if status is None: return "unknown", "No response from HubSpot" bl = body.lower() if status == 200 and "[" in body and "ownerid" in bl: return "unrestricted", "HubSpot owners API returned data (hapikey accepted)" if status in (401, 403): return "invalid", "HubSpot returned %d (unauthorized)" % status return "unknown", "HubSpot /owners HTTP %d" % status def validate_infura_v3_key_from_url(self, url_with_key): headers = {"Content-Type": "application/json"} body = '{"jsonrpc":"2.0","method":"eth_accounts","params":[],"id":1}' status, body_resp = self._http_request("POST", url_with_key, headers, body) if status is None: return "unknown", "No response from Infura" if status == 200 and '"result"' in body_resp: return "unrestricted", "Infura returned JSON-RPC result (key accepted)" if status in (401, 403): return "invalid", "Infura returned %d (unauthorized)" % status return "unknown", "Infura HTTP %d" % status def validate_dropbox_token(self, token): headers = {"Authorization": "Bearer " + token} status, body = self._http_request( "POST", "https://api.dropboxapi.com/2/users/get_current_account", headers, "" ) if status is None: return "unknown", "No response from Dropbox" if status == 200 and '"account_id"' in body: return "unrestricted", "Dropbox current_account returned account_id (token accepted)" if status in (401, 403): return "invalid", "Dropbox returned %d (unauthorized)" % status return "unknown", "Dropbox HTTP %d" % status def validate_pendo_integration_key(self, key): headers = { "content-type": "application/json", "x-pendo-integration-key": key, } status, body = self._http_request("GET", "https://app.pendo.io/api/v1/feature", headers, None) if status is None: return "unknown", "No response from Pendo" if status == 200 and ("[" in body or "{" in body): return "unrestricted", "Pendo /api/v1/feature returned data (key accepted)" if status in (401, 403): return "invalid", "Pendo returned %d (unauthorized)" % status return "unknown", "Pendo HTTP %d" % status def validate_twilio_sid_and_token(self, sid): return "unknown", ( "Twilio Account SID detected. Full validation requires pairing with the " "corresponding Auth Token and knowledge of account scopes / ACLs. " "Use Twilio/keyhacks checks and tenant context to confirm impact." ) def validate_stripe_secret_key(self, sk): auth_bytes = ("%s:" % sk).encode("utf-8") auth_value = base64.b64encode(auth_bytes) if isinstance(auth_value, bytes): auth_value = auth_value.decode("ascii") headers = { "Authorization": "Basic " + auth_value, } status, body = self._http_request("GET", "https://api.stripe.com/v1/charges?limit=1", headers, None) if status is None: return "unknown", "No response from Stripe" if status == 200 and '"data"' in body: return "unrestricted", "Stripe /v1/charges returned data (secret key accepted)" if status in (401, 403): return "invalid", "Stripe returned %d (unauthorized)" % status return "unknown", "Stripe HTTP %d" % status def validate_google_api_key(self, key): url = "https://maps.googleapis.com/maps/api/geocode/json?address=Berlin&key=%s" % key status, body = self._http_request("GET", url, None, None) if status is None: return "unknown", "No response from Google Geocoding API" bl = body.lower() if status == 200 and '"error_message"' not in bl: return ( "valid", "Google Geocoding API call succeeded from the scanner IP. " "The key is active and accepts requests from this environment. " "Referrer/IP/application restrictions must still be checked in the GCP console." ) if "api keys with referer restrictions" in bl or \ "this ip, site or mobile application is not authorized" in bl or \ "referernotallowedmaperror" in bl: return ( "restricted", "Google API responded with a restriction-related error. " "The key likely exists but is restricted by IP/referrer/origin. " "Manually confirm restrictions and scopes in GCP." ) if "invalid api key" in bl: return "invalid", "Google reported 'invalid API key'." return ( "unknown", "Google Geocoding API returned HTTP %d with error_message or error codes; manual review needed." % status ) def validate_mailgun_private_key(self, key): auth_bytes = ("api:%s" % key).encode("utf-8") auth_value = base64.b64encode(auth_bytes) if isinstance(auth_value, bytes): auth_value = auth_value.decode("ascii") headers = { "Authorization": "Basic " + auth_value, } status, body = self._http_request("GET", "https://api.mailgun.net/v3/domains", headers, None) if status is None: return "unknown", "No response from Mailgun" if status == 200 and "domains" in body: return ( "valid", "Mailgun /v3/domains returned data (key accepted). " "Review Mailgun key type and IP restrictions." ) if status in (401, 403): return "invalid", "Mailgun returned %d (unauthorized)" % status return "unknown", "Mailgun /v3/domains HTTP %d" % status def validate_mailchimp_api_key(self, key): try: parts = key.split("-") dc = parts[1] except Exception: return "unknown", "Cannot parse Mailchimp data center from key" url = "https://%s.api.mailchimp.com/3.0/" % dc headers = { "Authorization": "apikey " + key, } status, body = self._http_request("GET", url, headers, None) if status is None: return "unknown", "No response from Mailchimp" if status == 200 and '"account_id"' in body: return ( "valid", "Mailchimp root API returned data (API key accepted). " "Ensure it is not used client-side and has least privilege." ) if status in (401, 403): return "invalid", "Mailchimp returned %d (unauthorized)" % status return "unknown", "Mailchimp /3.0 HTTP %d" % status def validate_mapbox_access_token(self, token): url = "https://api.mapbox.com/tokens/v2?access_token=%s" % token status, body = self._http_request("GET", url, None, None) if status is None: return "unknown", "No response from Mapbox" if status == 200 and ('"tokens"' in body or '"note"' in body): return ( "valid", "Mapbox Tokens API accepted the token from the scanner IP. " "Review token scopes and allowed URLs." ) if status == 401: return "invalid", "Mapbox returned 401 (invalid/expired token)" if status == 403: return "restricted", "Mapbox returned 403 (token lacks scopes/ACLs or is otherwise restricted)" return "unknown", "Mapbox tokens API HTTP %d" % status def validate_gitlab_pat(self, token): headers = { "Private-Token": token, } status, body = self._http_request("GET", "https://gitlab.com/api/v4/user", headers, None) if status is None: return "unknown", "No response from gitlab.com" if status == 200 and '"username"' in body: return "unrestricted", "GitLab /user returned 200 with username (token accepted)" if status in (401, 403): return "invalid", "GitLab returned %d (unauthorized)" % status return "unknown", "GitLab /user HTTP %d" % status def validate_telegram_bot_token(self, token): url = "https://api.telegram.org/bot%s/getMe" % token status, body = self._http_request("GET", url, None, None) if status is None: return "unknown", "No response from Telegram" bl = body.lower() if status == 200 and '"ok":true' in bl.replace(" ", ""): return "unrestricted", "Telegram getMe succeeded (bot token accepted)" if "unauthorized" in bl or status == 401: return "invalid", "Telegram reported unauthorized" return "unknown", "Telegram getMe HTTP %d" % status def validate_discord_bot_token(self, token): headers = { "Authorization": "Bot " + token, } status, body = self._http_request("GET", "https://discord.com/api/v10/users/@me", headers, None) if status is None: return "unknown", "No response from Discord" if status == 200 and '"id"' in body and '"username"' in body: return "unrestricted", "Discord /users/@me returned user info (bot token accepted)" if status in (401, 403): return "invalid", "Discord returned %d (unauthorized)" % status return "unknown", "Discord /users/@me HTTP %d" % status def validate_npm_token(self, token): headers = { "Authorization": "Bearer " + token, } status, body = self._http_request("GET", "https://registry.npmjs.org/-/whoami", headers, None) if status is None: return "unknown", "No response from NPM registry" if status == 200 and '"username"' in body: return "unrestricted", "NPM /-/whoami returned username (token accepted)" if status in (401, 403): return "invalid", "NPM returned %d (unauthorized)" % status return "unknown", "NPM /-/whoami HTTP %d" % status def validate_google_recaptcha_secret(self, key): """ We cannot fully validate without a real user response. Test: POST secret=&response=dummy. If invalid-input-secret appears, the secret is invalid. Otherwise we treat as unknown but existing. """ body = "secret=%s&response=dummy" % key headers = { "Content-Type": "application/x-www-form-urlencoded", } status, resp = self._http_request( "POST", "https://www.google.com/recaptcha/api/siteverify", headers, body, ) if status is None: return "unknown", "No response from Google reCAPTCHA siteverify" if "invalid-input-secret" in resp: return "invalid", "reCAPTCHA siteverify reported invalid-input-secret" return ( "unknown", "reCAPTCHA siteverify responded, but without a real challenge response " "the extension cannot confirm validity; use a real response parameter as per keyhacks." ) def validate_zapier_webhook(self, url): headers = { "Accept": "application/json", "Content-Type": "application/json", } status, body = self._http_request( "POST", url, headers, '{"name":"apiknum"}' ) if status is None: return "unknown", "No response from Zapier webhook URL" if 200 <= status < 300: return "unrestricted", "Zapier webhook URL responded with HTTP %d to POST JSON" % status if status == 404: return "invalid", "Zapier webhook URL returned 404" return "unknown", "Zapier webhook URL returned HTTP %d" % status def validate_pagerduty_api_token(self, header_value): # header_value looks like "Token token=XYZ..." try: token = header_value.split("=", 1)[1].strip() except Exception: return "unknown", "Could not extract PagerDuty token from header" headers = { "Accept": "application/vnd.pagerduty+json;version=2", "Authorization": "Token token=%s" % token, } status, body = self._http_request( "GET", "https://api.pagerduty.com/schedules", headers, None, ) if status is None: return "unknown", "No response from PagerDuty" if status == 200 and '"schedules"' in body: return "unrestricted", "PagerDuty schedules endpoint returned data (token accepted)" if status in (401, 403): return "invalid", "PagerDuty returned %d (unauthorized)" % status return "unknown", "PagerDuty schedules HTTP %d" % status def validate_wpengine_api_key(self, key_with_param): """ Needs account_name context to exercise properly. We only flag that the WPEngine key was found. """ return "unknown", ( "WPEngine wpe_apikey parameter detected. Full validation needs account_name " "and the /1.2/?method=site call as per keyhacks." ) def validate_datadog_api_key(self, full_url): """ This pattern includes both api_key and application_key in the URL. Proper validation requires both and may leak dashboards; this is context-heavy. """ return "unknown", ( "DataDog api_key parameter detected in Datadog API URL. " "Use the dashboard or metrics endpoints with both api_key and application_key " "from a controlled environment to confirm scope/impact." ) def validate_wakatime_api_key(self, url_with_key): status, body = self._http_request("GET", url_with_key, None, None) if status is None: return "unknown", "No response from WakaTime" if status == 200 and '"data"' in body: return "unrestricted", "WakaTime /users/current API returned data (api_key accepted)" if status in (401, 403): return "invalid", "WakaTime returned %d (unauthorized)" % status return "unknown", "WakaTime /users/current HTTP %d" % status def validate_newrelic_rest_api_key(self, header_line): # header_line looks like "X-Api-Key: ABCDEF..." try: token = header_line.split(":", 1)[1].strip() except Exception: return "unknown", "Could not extract New Relic key from header" headers = { "X-Api-Key": token, } status, body = self._http_request( "GET", "https://api.newrelic.com/v2/applications.json", headers, None, ) if status is None: return "unknown", "No response from New Relic" if status == 200 and '"applications"' in body: return "unrestricted", "New Relic applications API returned data (key accepted)" if status in (401, 403): return "invalid", "New Relic returned %d (unauthorized)" % status return "unknown", "New Relic /v2/applications HTTP %d" % status class APIKeyLeakIssue(IScanIssue): def __init__(self, httpService, url, httpMessages, name, detail, remediation, severity, confidence): self._httpService = httpService self._url = url self._httpMessages = httpMessages self._name = name self._detail = detail self._remediation = remediation self._severity = severity self._confidence = confidence def getUrl(self): return self._url def getIssueName(self): return self._name def getIssueType(self): return 0 # custom def getSeverity(self): return self._severity def getConfidence(self): return self._confidence def getIssueBackground(self): return ( "Various third-party API providers issue long-lived API keys and tokens " "for programmatic access. If these keys are exposed in client-side code or " "responses, an attacker may leverage them to perform actions against the " "associated accounts, often with significant impact (data exfiltration, " "billing abuse, or service disruption)." ) def getRemediationBackground(self): return ( "API keys and tokens should be treated as secrets. They must not be embedded " "in client-side code or exposed to untrusted parties. Restrict keys by IP, " "referrer, or environment wherever the provider allows it, and ensure keys " "have only the minimal necessary privileges." ) def getIssueDetail(self): return self._detail def getRemediationDetail(self): return self._remediation def getHttpMessages(self): return self._httpMessages def getHttpService(self): return self._httpService