#!/usr/bin/env python3 """ Agent 性能基准测试 — 生成评分卡 v2.0 测量 Hermes/Agent 各项性能指标: 1. 环境基线(CPU/内存/磁盘) 2. Python 导入耗时 3. 单次对话往返延迟(P50/P95) 4. 连续对话压力测试(10轮 + 衰减率) 5. 简单指令成功率 6. 磁盘 IO 性能 输出:JSON 报告 + 评分卡(量化版,满分120,几乎不可能达到) 用法: python3 bench_agent.py # 完整输出 python3 bench_agent.py --brief # 精简输出(2行) python3 bench_agent.py --json # 静默模式,仅写 JSON 文件 """ import subprocess import time import json import os import sys import statistics import re from pathlib import Path PATH = Path(__file__).parent def run_cmd(cmd, timeout=30): t0 = time.time() try: r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=timeout) elapsed = time.time() - t0 return {"elapsed": round(elapsed, 3), "returncode": r.returncode, "stdout": r.stdout[:500], "stderr": r.stderr[:200]} except subprocess.TimeoutExpired: return {"elapsed": timeout, "returncode": -1, "stdout": "", "stderr": "TIMEOUT"} except Exception as e: return {"elapsed": 0, "returncode": -1, "stdout": "", "stderr": str(e)} def bench_resource_baseline(): r = run_cmd("nproc") cpu_cores = int(r["stdout"].strip() or "0") r = run_cmd("lscpu | grep 'Model name' | head -1 | cut -d: -f2 | xargs") cpu_model = r["stdout"].strip() r = run_cmd(r"free -m | awk 'NR==2{print $2,$3,$4,$7}'") mem_parts = r["stdout"].strip().split() mem_total = int(mem_parts[0]) if len(mem_parts) > 0 else 0 mem_avail = int(mem_parts[3]) if len(mem_parts) > 3 else 0 r = run_cmd(r"df -h / | tail -1 | awk '{print $2,$5}'") disk = r["stdout"].strip().split() r = run_cmd("hermes prompt-size 2>&1 | grep 'System prompt total' | head -1") sys_prompt = r["stdout"].strip() return {"cpu_cores": cpu_cores, "cpu_model": cpu_model, "mem_total_mb": mem_total, "mem_avail_mb": mem_avail, "disk_total": disk[0] if disk else "?", "disk_used_pct": disk[1] if len(disk) > 1 else "?", "sys_prompt_size": sys_prompt} def bench_hermes_roundtrip(samples=5, quiet=False): latencies = [] for i in range(samples): t0 = time.time() subprocess.run(f'echo "ok" | timeout 25 hermes chat 2>/dev/null', shell=True, capture_output=True, text=True, timeout=25) t1 = time.time() latencies.append(t1 - t0) if not quiet: sys.stdout.write(f" Run {i+1}/{samples}: {t1-t0:.1f}s\n") sys.stdout.flush() if not latencies: return {"error": "no data"} return {"samples": len(latencies), "p50": round(statistics.median(latencies), 2), "p95": round(sorted(latencies)[int(len(latencies)*0.95)], 2), "min": round(min(latencies), 2), "max": round(max(latencies), 2), "avg": round(statistics.mean(latencies), 2), "all": [round(x, 2) for x in latencies]} def bench_stress_conversation(rounds=10, quiet=False): latencies = [] for i in range(rounds): t0 = time.time() subprocess.run(f'echo "task {i+1}" | timeout 25 hermes chat 2>/dev/null', shell=True, capture_output=True, text=True, timeout=25) t1 = time.time() latencies.append(t1 - t0) if not quiet: sys.stdout.write(f" Conversation {i+1}/{rounds}: {t1-t0:.1f}s\n") sys.stdout.flush() if not latencies: return {"error": "no data"} first_half = statistics.mean(latencies[:len(latencies)//2]) second_half = statistics.mean(latencies[len(latencies)//2:]) decay = round((second_half - first_half) / first_half * 100, 1) if first_half > 0 else 0 return {"rounds": len(latencies), "p50": round(statistics.median(latencies), 2), "avg_first_half": round(first_half, 2), "avg_second_half": round(second_half, 2), "decay_pct": decay, "all": [round(x, 2) for x in latencies]} def bench_success_rate(samples=10, quiet=False): success, fail = 0, 0 for _ in range(samples): r = subprocess.run('echo "ok" | timeout 20 hermes chat 2>/dev/null', shell=True, capture_output=True, text=True, timeout=25) if r.returncode != 0 or not r.stdout.strip(): fail += 1 else: success += 1 rate = round(success / samples * 100, 1) if samples > 0 else 0 return {"samples": samples, "success": success, "fail": fail, "success_rate_pct": rate} def bench_disk_io(): r = run_cmd("dd if=/dev/zero of=/tmp/bench_io bs=1M count=200 2>&1 | tail -1") write_speed = r["stdout"].strip() if r["returncode"] == 0 else "" r = run_cmd("dd if=/tmp/bench_io of=/dev/null bs=1M count=200 2>&1 | tail -1") read_speed = r["stdout"].strip() if r["returncode"] == 0 else "" run_cmd("rm -f /tmp/bench_io") return {"write": write_speed, "read": read_speed} def extract_disk_mb_s(disk_str): """Extract MB/s from disk speed string like '1.4 GB/s' or '210 MB/s'.""" try: m = re.search(r'([\d.]+)\s*(GB|MB)', disk_str) if m: val = float(m.group(1)) return val * 1000 if m.group(2) == 'GB' else val except Exception: pass return 0 def compute_scores(results): """ Quantified scoring: scale 0-120 (full score 120 almost never achievable). Uses linear penalty: score = max(0, ceiling - abs(actual - ideal) * slope) """ def scale(raw, ideal, ceiling=120, slope=1.0): return round(max(0, min(ceiling, ceiling - abs(raw - ideal) * slope)), 1) def grade(score, thresholds): for thresh, label in thresholds: if score >= thresh: return label return thresholds[-1][1] rt = results.get("roundtrip", {}) sr = results.get("success_rate", {}) stress = results.get("stress", {}) bl = results.get("baseline", {}) # 1. 响应速度 (30%) — ideal P50=0.1s p50 = rt.get("p50", 999) speed = scale(p50, ideal=0.1, ceiling=120, slope=25) speed_g = grade(speed, [(120, "S++🏆"), (105, "S+🥇"), (90, "S🏆"), (75, "A✅"), (55, "B⚠️"), (35, "C🔻"), (0, "D❌")]) # 2. 稳定性 (25%) — ideal 100% rate = sr.get("success_rate_pct", sr.get("rate_pct", 0)) stable = round(max(0, 120 - (100 - rate) * 3.0), 1) stable_g = grade(stable, [(120, "S++🏆"), (110, "S+🥇"), (95, "S🏆"), (80, "A✅"), (60, "B⚠️"), (35, "C🔻"), (0, "D❌")]) # 3. 压力表现 (20%) — ideal decay=0% decay = stress.get("decay_pct", 999) if decay >= 0: stress_c = scale(decay, ideal=0, ceiling=120, slope=4.8) else: stress_c = max(0, round(120 - abs(decay) * 3.0, 1)) stress_g = grade(stress_c, [(120, "S++🏆"), (105, "S+🥇"), (90, "S🏆"), (70, "A✅"), (50, "B⚠️"), (30, "C🔻"), (0, "D❌")]) # 4. 资源效率 (15%) — min(cpu_score, mem_score) cpu_score = scale(35, ideal=5, ceiling=120, slope=2.0) # CPU idle ~35% typical mt = bl.get("mem_total_mb", 1870) ma = bl.get("mem_avail_mb", 0) mem_ratio = ma / mt if mt > 0 else 0.5 mem_score = scale(mem_ratio * 100, ideal=80, ceiling=120, slope=1.5) resource = round(min(cpu_score, mem_score), 1) res_g = grade(resource, [(120, "S++🏆"), (105, "S+🥇"), (90, "S🏆"), (70, "A✅"), (50, "B⚠️"), (30, "C🔻"), (0, "D❌")]) # 5. 工具链 (10%) — min(py_score, disk_score) py_t = results.get("python_import", {}).get("elapsed", 0.765) py_score = scale(py_t, ideal=0.1, ceiling=120, slope=60) disk_mb = extract_disk_mb_s(results.get("disk_io", {}).get("write", "")) disk_score = scale(disk_mb, ideal=2000, ceiling=120, slope=0.04) tool_c = round(min(py_score, disk_score), 1) tool_g = grade(tool_c, [(120, "S++🏆"), (105, "S+🥇"), (90, "S🏆"), (70, "A✅"), (50, "B⚠️"), (30, "C🔻"), (0, "D❌")]) scores = { "speed": {"score": speed, "grade": speed_g, "weight": 0.3}, "stability": {"score": stable, "grade": stable_g, "weight": 0.25}, "stress": {"score": stress_c, "grade": stress_g, "weight": 0.2}, "resource": {"score": resource, "grade": res_g, "weight": 0.15}, "toolchain": {"score": tool_c, "grade": tool_g, "weight": 0.1}, } total = round( speed * 0.30 + stable * 0.25 + stress_c * 0.20 + resource * 0.15 + tool_c * 0.10, 1 ) total = min(120, total) total_g = grade(total, [(120, "S++🏆"), (110, "S+🥇"), (95, "S🏆"), (80, "A✅"), (65, "B⚠️"), (45, "C🔻"), (0, "D❌")]) return scores, total, total_g def print_scorecard_compact(scores, total, total_g): dims = [ ("🚀速度", scores["speed"]["score"], scores["speed"]["grade"]), ("✅稳定", scores["stability"]["score"], scores["stability"]["grade"]), ("📊压力", scores["stress"]["score"], scores["stress"]["grade"]), ("💾资源", scores["resource"]["score"], scores["resource"]["grade"]), ("🔧工具", scores["toolchain"]["score"], scores["toolchain"]["grade"]), ] parts = [f"{n} {s}{g}" for n, s, g in dims] print(f"🏆 总分 {total} → {total_g}") print(" " + " | ".join(parts)) def main(): brief = "--brief" in sys.argv json_only = "--json" in sys.argv quiet = brief or json_only show = lambda: not quiet results = {} if show(): print("[1/6] 📐 环境基线...") results["baseline"] = bench_resource_baseline() if show(): print(f" CPU: {results['baseline']['cpu_model']} ({results['baseline']['cpu_cores']}核)") print(f" 内存: {results['baseline']['mem_total_mb']}MB (可用 {results['baseline']['mem_avail_mb']}MB)") if show(): print("[2/6] 🚀 单次对话往返 (5次)...") results["roundtrip"] = bench_hermes_roundtrip(samples=5, quiet=quiet) if show(): print(f" P50: {results['roundtrip'].get('p50', 'N/A')}s") print(f" P95: {results['roundtrip'].get('p95', 'N/A')}s") if show(): print("[3/6] 📊 连续对话压力 (10轮)...") results["stress"] = bench_stress_conversation(rounds=10, quiet=quiet) if show(): print(f" P50: {results['stress'].get('p50', 'N/A')}s") print(f" 衰减: {results['stress'].get('decay_pct', 'N/A')}%") if show(): print("[4/6] ✅ 成功率...") results["success_rate"] = bench_success_rate(samples=10, quiet=quiet) if show(): print(f" 成功率: {results['success_rate'].get('success_rate_pct', 'N/A')}%") if show(): print("[5/6] 💾 磁盘IO...") results["disk_io"] = bench_disk_io() if show(): print(f" 写入: {results['disk_io'].get('write', 'N/A')[:60]}") if show(): print("[6/6] 🐍 Python导入...") results["python_import"] = run_cmd("python3 -c \"import openai; import httpx; import pydantic; print('ok')\"") if show(): print(f" 耗时: {results['python_import']['elapsed']}s") scores, total, total_g = compute_scores(results) results["scores"] = scores results["total_score"] = total results["total_grade"] = total_g if brief: print_scorecard_compact(scores, total, total_g) elif not json_only: print("\n" + "=" * 50) print("📊 评分卡 (量化版 v2.0)") print("=" * 50) for n, k in [("🚀 响应速度", "speed"), ("✅ 稳定性", "stability"), ("📊 压力表现", "stress"), ("💾 资源效率", "resource"), ("🔧 工具链", "toolchain")]: s = scores[k] print(f" {n}: {s['score']}分 {s['grade']}") print(f"\n 🏆 总分: {total} / 120 → {total_g}\n") output_path = "/tmp/agent_bench_report.json" with open(output_path, "w") as f: json.dump(results, f, ensure_ascii=False, indent=2) if show(): print(f"✅ 报告已保存到 {output_path}") return results if __name__ == "__main__": main()