#!/usr/bin/env python3 from scapy.all import * import httpx, asyncio, time icmp_server = "103.50.33.127" # A server in Argentina icmp_payload = b"This is Sensitive Information." http_server = "http://example.com" client = httpx.AsyncClient() async def send_icmp(): icmp_request = IP(dst=icmp_server)/ICMP()/Raw(load=icmp_payload) sniffer = AsyncSniffer( filter=f"icmp and src host {icmp_server}", timeout=1, count=1 ) sniffer.start() start_time = time.perf_counter() send(icmp_request, verbose=False) await asyncio.to_thread(sniffer.join) end_time = time.perf_counter() reply = None if sniffer.results: reply = sniffer.results[0] if reply: rtt_ms = int((end_time - start_time) * 1000) print(f"[{rtt_ms}ms] ICMP reply Payload: [{reply[Raw].load.decode(errors="ignore") if reply.haslayer(Raw) else None}]") else: print("No ICMP reply received.") async def send_http(): try: response = await client.get(http_server, timeout=1) elapsed_ms = int(response.elapsed.total_seconds()*1000) print(f"[{elapsed_ms}ms] HTTP Status: [{response.status_code} {response.reason_phrase}]") except Exception as e: print(f"HTTP GET Exception: {e}") async def main(): global icmp_server, icmp_payload, http_server icmp_server = input(f"Enter the ICMP(ping) server address [Press Enter to use default: {icmp_server}]: ") or icmp_server icmp_payload = input(f"Enter the ICMP payload data [Press Enter to use default: {icmp_payload.decode()}]: ") or icmp_payload http_server = input(f"Enter the HTTP server URL [Press Enter to use default: {http_server}]: ") or http_server print("\nPress Enter to send an ICMP request and an HTTP GET request") cnt = 1 while True: input() print(f"[{cnt}] Send ICMP request and HTTP GET.") icmp_task = asyncio.create_task(send_icmp()) http_task = asyncio.create_task(send_http()) await asyncio.gather(icmp_task, http_task) cnt += 1 if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\n[!] Exit") except EOFError: print("\n[!] Exit")