const fs = require("node:fs/promises"); const fssync = require("node:fs"); const path = require("node:path"); const readline = require("node:readline"); const crypto = require("node:crypto"); const { ensureDir } = require("./fs"); const { readSqliteJsonRows } = require("./sqlite-reader"); const DEFAULT_SOURCE = "codex"; const DEFAULT_MODEL = "unknown"; const BUCKET_SEPARATOR = "|"; const CLAUDE_MEM_OBSERVER_PATH_SEGMENT = "--claude-mem-observer-sessions"; const CLAUDE_MEM_OBSERVER_PROJECT_REF = "https://local.tokentracker/claude-mem/observer-sessions"; async function listRolloutFiles(sessionsDir) { const out = []; const years = await safeReadDir(sessionsDir); for (const y of years) { if (!/^[0-9]{4}$/.test(y.name) || !y.isDirectory()) continue; const yearDir = path.join(sessionsDir, y.name); const months = await safeReadDir(yearDir); for (const m of months) { if (!/^[0-9]{2}$/.test(m.name) || !m.isDirectory()) continue; const monthDir = path.join(yearDir, m.name); const days = await safeReadDir(monthDir); for (const d of days) { if (!/^[0-9]{2}$/.test(d.name) || !d.isDirectory()) continue; const dayDir = path.join(monthDir, d.name); const files = await safeReadDir(dayDir); for (const f of files) { if (!f.isFile()) continue; if (!f.name.startsWith("rollout-") || !f.name.endsWith(".jsonl")) continue; out.push(path.join(dayDir, f.name)); } } } } out.sort((a, b) => a.localeCompare(b)); return out; } async function listClaudeProjectFiles(projectsDir) { const out = []; await walkClaudeProjects(projectsDir, out); out.sort((a, b) => a.localeCompare(b)); return out; } async function listGeminiSessionFiles(tmpDir) { const out = []; const roots = await safeReadDir(tmpDir); for (const root of roots) { if (!root.isDirectory()) continue; const chatsDir = path.join(tmpDir, root.name, "chats"); const chats = await safeReadDir(chatsDir); for (const entry of chats) { if (!entry.isFile()) continue; if (!entry.name.startsWith("session-") || !entry.name.endsWith(".json")) continue; out.push(path.join(chatsDir, entry.name)); } } out.sort((a, b) => a.localeCompare(b)); return out; } async function listOpencodeMessageFiles(storageDir) { const out = []; const messageDir = path.join(storageDir, "message"); await walkOpencodeMessages(messageDir, out); out.sort((a, b) => a.localeCompare(b)); return out; } async function parseRolloutIncremental({ rolloutFiles, cursors, queuePath, projectQueuePath, onProgress, source, publicRepoResolver, }) { await ensureDir(path.dirname(queuePath)); let filesProcessed = 0; let eventsAggregated = 0; const cb = typeof onProgress === "function" ? onProgress : null; const totalFiles = Array.isArray(rolloutFiles) ? rolloutFiles.length : 0; const hourlyState = normalizeHourlyState(cursors?.hourly); const projectEnabled = typeof projectQueuePath === "string" && projectQueuePath.length > 0; const projectState = projectEnabled ? normalizeProjectState(cursors?.projectHourly) : null; const projectTouchedBuckets = projectEnabled ? new Set() : null; const projectMetaCache = projectEnabled ? new Map() : null; const publicRepoCache = projectEnabled ? new Map() : null; const touchedBuckets = new Set(); const defaultSource = normalizeSourceInput(source) || DEFAULT_SOURCE; if (!cursors.files || typeof cursors.files !== "object") { cursors.files = {}; } for (let idx = 0; idx < rolloutFiles.length; idx++) { const entry = rolloutFiles[idx]; const filePath = typeof entry === "string" ? entry : entry?.path; if (!filePath) continue; const fileSource = typeof entry === "string" ? defaultSource : normalizeSourceInput(entry?.source) || defaultSource; const st = await fs.stat(filePath).catch(() => null); if (!st || !st.isFile()) continue; const key = filePath; const prev = cursors.files[key] || null; const inode = st.ino || 0; const startOffset = prev && prev.inode === inode ? prev.offset || 0 : 0; const lastTotal = prev && prev.inode === inode ? prev.lastTotal || null : null; const lastModel = prev && prev.inode === inode ? prev.lastModel || null : null; const projectContext = projectEnabled ? await resolveProjectContextForFile({ filePath, projectMetaCache, publicRepoCache, publicRepoResolver, projectState, }) : null; const projectRef = projectContext?.projectRef || null; const projectKey = projectContext?.projectKey || null; const result = await parseRolloutFile({ filePath, startOffset, lastTotal, lastModel, hourlyState, touchedBuckets, source: fileSource, projectState, projectTouchedBuckets, projectRef, projectKey, projectMetaCache, publicRepoCache, publicRepoResolver, }); cursors.files[key] = { inode, offset: result.endOffset, lastTotal: result.lastTotal, lastModel: result.lastModel, updatedAt: new Date().toISOString(), }; filesProcessed += 1; eventsAggregated += result.eventsAggregated; if (cb) { cb({ index: idx + 1, total: totalFiles, filePath, filesProcessed, eventsAggregated, bucketsQueued: touchedBuckets.size, }); } } const bucketsQueued = await enqueueTouchedBuckets({ queuePath, hourlyState, touchedBuckets }); const projectBucketsQueued = projectEnabled ? await enqueueTouchedProjectBuckets({ projectQueuePath, projectState, projectTouchedBuckets }) : 0; hourlyState.updatedAt = new Date().toISOString(); cursors.hourly = hourlyState; if (projectState) { projectState.updatedAt = new Date().toISOString(); cursors.projectHourly = projectState; } return { filesProcessed, eventsAggregated, bucketsQueued, projectBucketsQueued }; } async function parseClaudeIncremental({ projectFiles, cursors, queuePath, projectQueuePath, onProgress, source, publicRepoResolver, }) { await ensureDir(path.dirname(queuePath)); let filesProcessed = 0; let eventsAggregated = 0; const cb = typeof onProgress === "function" ? onProgress : null; const files = Array.isArray(projectFiles) ? projectFiles : []; const totalFiles = files.length; const hourlyState = normalizeHourlyState(cursors?.hourly); const projectEnabled = typeof projectQueuePath === "string" && projectQueuePath.length > 0; const projectState = projectEnabled ? normalizeProjectState(cursors?.projectHourly) : null; const projectTouchedBuckets = projectEnabled ? new Set() : null; const projectMetaCache = projectEnabled ? new Map() : null; const publicRepoCache = projectEnabled ? new Map() : null; const touchedBuckets = new Set(); // Persist seenMessageHashes across syncs to prevent cross-file duplicates // (e.g. subagent file created after main session was already parsed). const prevHashes = Array.isArray(cursors.claudeHashes) ? cursors.claudeHashes : []; const seenMessageHashes = new Set(prevHashes); const defaultSource = normalizeSourceInput(source) || "claude"; if (!cursors.files || typeof cursors.files !== "object") { cursors.files = {}; } for (let idx = 0; idx < files.length; idx++) { const entry = files[idx]; const filePath = typeof entry === "string" ? entry : entry?.path; if (!filePath) continue; const fileSource = typeof entry === "string" ? defaultSource : normalizeSourceInput(entry?.source) || defaultSource; const st = await fs.stat(filePath).catch(() => null); if (!st || !st.isFile()) continue; const key = filePath; const prev = cursors.files[key] || null; const inode = st.ino || 0; const startOffset = prev && prev.inode === inode ? prev.offset || 0 : 0; const projectContext = projectEnabled ? await resolveProjectContextForFile({ filePath, projectMetaCache, publicRepoCache, publicRepoResolver, projectState, }) : null; const projectRef = projectContext?.projectRef || null; const projectKey = projectContext?.projectKey || null; const result = await parseClaudeFile({ filePath, startOffset, hourlyState, touchedBuckets, source: fileSource, projectState, projectTouchedBuckets, projectRef, projectKey, seenMessageHashes, }); cursors.files[key] = { inode, offset: result.endOffset, updatedAt: new Date().toISOString(), }; filesProcessed += 1; eventsAggregated += result.eventsAggregated; if (cb) { cb({ index: idx + 1, total: totalFiles, filePath, filesProcessed, eventsAggregated, bucketsQueued: touchedBuckets.size, }); } } const bucketsQueued = await enqueueTouchedBuckets({ queuePath, hourlyState, touchedBuckets }); const projectBucketsQueued = projectEnabled ? await enqueueTouchedProjectBuckets({ projectQueuePath, projectState, projectTouchedBuckets }) : 0; hourlyState.updatedAt = new Date().toISOString(); cursors.hourly = hourlyState; if (projectState) { projectState.updatedAt = new Date().toISOString(); cursors.projectHourly = projectState; } // Persist message hashes for cross-sync dedup; cap at 100k entries to bound size. const allHashes = Array.from(seenMessageHashes); cursors.claudeHashes = allHashes.length > 100_000 ? allHashes.slice(allHashes.length - 100_000) : allHashes; return { filesProcessed, eventsAggregated, bucketsQueued, projectBucketsQueued }; } async function parseGeminiIncremental({ sessionFiles, cursors, queuePath, projectQueuePath, onProgress, source, publicRepoResolver, }) { await ensureDir(path.dirname(queuePath)); let filesProcessed = 0; let eventsAggregated = 0; const cb = typeof onProgress === "function" ? onProgress : null; const files = Array.isArray(sessionFiles) ? sessionFiles : []; const totalFiles = files.length; const hourlyState = normalizeHourlyState(cursors?.hourly); const projectEnabled = typeof projectQueuePath === "string" && projectQueuePath.length > 0; const projectState = projectEnabled ? normalizeProjectState(cursors?.projectHourly) : null; const projectTouchedBuckets = projectEnabled ? new Set() : null; const projectMetaCache = projectEnabled ? new Map() : null; const publicRepoCache = projectEnabled ? new Map() : null; const touchedBuckets = new Set(); const defaultSource = normalizeSourceInput(source) || "gemini"; if (!cursors.files || typeof cursors.files !== "object") { cursors.files = {}; } for (let idx = 0; idx < files.length; idx++) { const entry = files[idx]; const filePath = typeof entry === "string" ? entry : entry?.path; if (!filePath) continue; const fileSource = typeof entry === "string" ? defaultSource : normalizeSourceInput(entry?.source) || defaultSource; const st = await fs.stat(filePath).catch(() => null); if (!st || !st.isFile()) continue; const key = filePath; const prev = cursors.files[key] || null; const inode = st.ino || 0; let startIndex = prev && prev.inode === inode ? Number(prev.lastIndex || -1) : -1; let lastTotals = prev && prev.inode === inode ? prev.lastTotals || null : null; let lastModel = prev && prev.inode === inode ? prev.lastModel || null : null; const projectContext = projectEnabled ? await resolveProjectContextForFile({ filePath, projectMetaCache, publicRepoCache, publicRepoResolver, projectState, }) : null; const projectRef = projectContext?.projectRef || null; const projectKey = projectContext?.projectKey || null; const result = await parseGeminiFile({ filePath, startIndex, lastTotals, lastModel, hourlyState, touchedBuckets, source: fileSource, projectState, projectTouchedBuckets, projectRef, projectKey, }); cursors.files[key] = { inode, lastIndex: result.lastIndex, lastTotals: result.lastTotals, lastModel: result.lastModel, updatedAt: new Date().toISOString(), }; filesProcessed += 1; eventsAggregated += result.eventsAggregated; if (cb) { cb({ index: idx + 1, total: totalFiles, filePath, filesProcessed, eventsAggregated, bucketsQueued: touchedBuckets.size, }); } } const bucketsQueued = await enqueueTouchedBuckets({ queuePath, hourlyState, touchedBuckets }); const projectBucketsQueued = projectEnabled ? await enqueueTouchedProjectBuckets({ projectQueuePath, projectState, projectTouchedBuckets }) : 0; hourlyState.updatedAt = new Date().toISOString(); cursors.hourly = hourlyState; if (projectState) { projectState.updatedAt = new Date().toISOString(); cursors.projectHourly = projectState; } return { filesProcessed, eventsAggregated, bucketsQueued, projectBucketsQueued }; } async function parseOpencodeIncremental({ messageFiles, cursors, queuePath, projectQueuePath, onProgress, source, publicRepoResolver, }) { await ensureDir(path.dirname(queuePath)); let filesProcessed = 0; let eventsAggregated = 0; const cb = typeof onProgress === "function" ? onProgress : null; const files = Array.isArray(messageFiles) ? messageFiles : []; const totalFiles = files.length; const hourlyState = normalizeHourlyState(cursors?.hourly); const projectEnabled = typeof projectQueuePath === "string" && projectQueuePath.length > 0; const projectState = projectEnabled ? normalizeProjectState(cursors?.projectHourly) : null; const projectTouchedBuckets = projectEnabled ? new Set() : null; const projectMetaCache = projectEnabled ? new Map() : null; const publicRepoCache = projectEnabled ? new Map() : null; const opencodeState = normalizeOpencodeState(cursors?.opencode); const messageIndex = opencodeState.messages; const touchedBuckets = new Set(); const defaultSource = normalizeSourceInput(source) || "opencode"; if (!cursors.files || typeof cursors.files !== "object") { cursors.files = {}; } for (let idx = 0; idx < files.length; idx++) { const entry = files[idx]; const filePath = typeof entry === "string" ? entry : entry?.path; if (!filePath) continue; const fileSource = typeof entry === "string" ? defaultSource : normalizeSourceInput(entry?.source) || defaultSource; const st = await fs.stat(filePath).catch(() => null); if (!st || !st.isFile()) continue; const key = filePath; const prev = cursors.files[key] || null; const inode = st.ino || 0; const size = Number.isFinite(st.size) ? st.size : 0; const mtimeMs = Number.isFinite(st.mtimeMs) ? st.mtimeMs : 0; const unchanged = prev && prev.inode === inode && prev.size === size && prev.mtimeMs === mtimeMs; if (unchanged) { filesProcessed += 1; if (cb) { cb({ index: idx + 1, total: totalFiles, filePath, filesProcessed, eventsAggregated, bucketsQueued: touchedBuckets.size, }); } continue; } const fallbackTotals = prev && typeof prev.lastTotals === "object" ? prev.lastTotals : null; const fallbackMessageKey = prev && typeof prev.messageKey === "string" && prev.messageKey.trim() ? prev.messageKey.trim() : null; const projectContext = projectEnabled ? await resolveProjectContextForFile({ filePath, projectMetaCache, publicRepoCache, publicRepoResolver, projectState, }) : null; const projectRef = projectContext?.projectRef || null; const projectKey = projectContext?.projectKey || null; const result = await parseOpencodeMessageFile({ filePath, messageIndex, fallbackTotals, fallbackMessageKey, hourlyState, touchedBuckets, source: fileSource, projectState, projectTouchedBuckets, projectRef, projectKey, }); cursors.files[key] = { inode, size, mtimeMs, lastTotals: result.lastTotals, messageKey: result.messageKey || null, updatedAt: new Date().toISOString(), }; filesProcessed += 1; eventsAggregated += result.eventsAggregated; if (result.messageKey && result.shouldUpdate) { messageIndex[result.messageKey] = { lastTotals: result.lastTotals, updatedAt: new Date().toISOString(), }; } if (cb) { cb({ index: idx + 1, total: totalFiles, filePath, filesProcessed, eventsAggregated, bucketsQueued: touchedBuckets.size, }); } } const bucketsQueued = await enqueueTouchedBuckets({ queuePath, hourlyState, touchedBuckets }); const projectBucketsQueued = projectEnabled ? await enqueueTouchedProjectBuckets({ projectQueuePath, projectState, projectTouchedBuckets }) : 0; hourlyState.updatedAt = new Date().toISOString(); cursors.hourly = hourlyState; opencodeState.updatedAt = new Date().toISOString(); cursors.opencode = opencodeState; if (projectState) { projectState.updatedAt = new Date().toISOString(); cursors.projectHourly = projectState; } return { filesProcessed, eventsAggregated, bucketsQueued, projectBucketsQueued }; } async function parseOpenclawIncremental({ sessionFiles, cursors, queuePath, projectQueuePath, onProgress, source, }) { await ensureDir(path.dirname(queuePath)); let filesProcessed = 0; let eventsAggregated = 0; const cb = typeof onProgress === "function" ? onProgress : null; const files = Array.isArray(sessionFiles) ? sessionFiles : []; const totalFiles = files.length; const hourlyState = normalizeHourlyState(cursors?.hourly); const projectEnabled = typeof projectQueuePath === "string" && projectQueuePath.length > 0; const projectState = projectEnabled ? normalizeProjectState(cursors?.projectHourly) : null; const projectTouchedBuckets = projectEnabled ? new Set() : null; const touchedBuckets = new Set(); const defaultSource = normalizeSourceInput(source) || "openclaw"; if (!cursors.files || typeof cursors.files !== "object") { cursors.files = {}; } for (let idx = 0; idx < files.length; idx++) { const entry = files[idx]; const filePath = typeof entry === "string" ? entry : entry?.path; if (!filePath) continue; const fileSource = typeof entry === "string" ? defaultSource : normalizeSourceInput(entry?.source) || defaultSource; const st = await fs.stat(filePath).catch(() => null); if (!st || !st.isFile()) continue; const key = filePath; const prev = cursors.files[key] || null; const inode = st.ino || 0; const startOffset = prev && prev.inode === inode ? prev.offset || 0 : 0; const result = await parseOpenclawSessionFile({ filePath, startOffset, hourlyState, touchedBuckets, source: fileSource, projectState, projectTouchedBuckets, }); cursors.files[key] = { inode, offset: result.endOffset, updatedAt: new Date().toISOString(), }; filesProcessed += 1; eventsAggregated += result.eventsAggregated; if (cb) { cb({ index: idx + 1, total: totalFiles, filePath, filesProcessed, eventsAggregated, bucketsQueued: touchedBuckets.size, }); } } const bucketsQueued = await enqueueTouchedBuckets({ queuePath, hourlyState, touchedBuckets }); const projectBucketsQueued = projectEnabled ? await enqueueTouchedProjectBuckets({ projectQueuePath, projectState, projectTouchedBuckets }) : 0; hourlyState.updatedAt = new Date().toISOString(); cursors.hourly = hourlyState; if (projectState) { projectState.updatedAt = new Date().toISOString(); cursors.projectHourly = projectState; } return { filesProcessed, eventsAggregated, bucketsQueued, projectBucketsQueued }; } async function parseOpenclawSessionFile({ filePath, startOffset, hourlyState, touchedBuckets, source, projectState, projectTouchedBuckets, }) { const st = await fs.stat(filePath); const endOffset = st.size; if (startOffset >= endOffset) return { endOffset, eventsAggregated: 0 }; const stream = fssync.createReadStream(filePath, { encoding: "utf8", start: startOffset }); const rl = readline.createInterface({ input: stream, crlfDelay: Infinity }); let eventsAggregated = 0; for await (const line of rl) { if (!line) continue; // Fast-path filter: OpenClaw assistant messages include message.usage.totalTokens. if (!line.includes('"usage"') || !line.includes("totalTokens")) continue; let obj; try { obj = JSON.parse(line); } catch (_e) { continue; } if (obj?.type !== "message") continue; const msg = obj?.message; if (!msg || typeof msg !== "object") continue; const usage = msg.usage; if (!usage || typeof usage !== "object") continue; const tokenTimestamp = typeof obj?.timestamp === "string" ? obj.timestamp : null; if (!tokenTimestamp) continue; const model = normalizeModelInput(msg.model) || DEFAULT_MODEL; // Per CLAUDE.md: cached_input_tokens = cache reads, // cache_creation_input_tokens = cache writes. Also re-derive total_tokens // as input + output + cache_creation + cache_read so cost math works // even when the source's own totalTokens is stale or rounded. const inputTok = Number(usage.input || 0); const cacheReadTok = Number(usage.cacheRead || 0); const cacheWriteTok = Number(usage.cacheWrite || 0); const outputTok = Number(usage.output || 0); const delta = { input_tokens: inputTok, cached_input_tokens: cacheReadTok, cache_creation_input_tokens: cacheWriteTok, output_tokens: outputTok, reasoning_output_tokens: 0, total_tokens: inputTok + outputTok + cacheReadTok + cacheWriteTok, conversation_count: 1, }; if (isAllZeroUsage(delta)) continue; const bucketStart = toUtcHalfHourStart(tokenTimestamp); if (!bucketStart) continue; const bucket = getHourlyBucket(hourlyState, source, model, bucketStart); addTotals(bucket.totals, delta); touchedBuckets.add(bucketKey(source, model, bucketStart)); // Project-level OpenClaw attribution is not supported yet (no stable cwd info). // If OpenClaw later records cwd per event, we can mirror rollout's project logic. eventsAggregated += 1; } rl.close(); stream.close?.(); return { endOffset, eventsAggregated }; } async function parseRolloutFile({ filePath, startOffset, lastTotal, lastModel, hourlyState, touchedBuckets, source, projectState, projectTouchedBuckets, projectRef, projectKey, projectMetaCache, publicRepoCache, publicRepoResolver, }) { const st = await fs.stat(filePath); const endOffset = st.size; if (startOffset >= endOffset) { return { endOffset, lastTotal, lastModel, eventsAggregated: 0 }; } const stream = fssync.createReadStream(filePath, { encoding: "utf8", start: startOffset }); const rl = readline.createInterface({ input: stream, crlfDelay: Infinity }); let model = typeof lastModel === "string" ? lastModel : null; let totals = lastTotal && typeof lastTotal === "object" ? lastTotal : null; let currentCwd = null; let currentProjectRef = projectRef || null; let currentProjectKey = projectKey || null; let eventsAggregated = 0; for await (const line of rl) { if (!line) continue; const maybeTokenCount = line.includes('"token_count"'); const maybeTurnContext = !maybeTokenCount && (line.includes('"turn_context"') || line.includes('"session_meta"')) && (line.includes('"model"') || line.includes('"cwd"')); if (!maybeTokenCount && !maybeTurnContext) continue; let obj; try { obj = JSON.parse(line); } catch (_e) { continue; } if ( (obj?.type === "turn_context" || obj?.type === "session_meta") && obj?.payload && typeof obj.payload === "object" ) { if (typeof obj.payload.model === "string") { model = obj.payload.model; } if (projectState && typeof obj.payload.cwd === "string") { const nextCwd = obj.payload.cwd.trim(); if (nextCwd && nextCwd !== currentCwd) { const context = await resolveProjectContextForPath({ startDir: nextCwd, projectMetaCache, publicRepoCache, publicRepoResolver, projectState, }); currentCwd = nextCwd; currentProjectRef = context?.projectRef || null; currentProjectKey = context?.projectKey || null; } } continue; } const token = extractTokenCount(obj); if (!token) continue; const info = token.info; if (!info || typeof info !== "object") continue; const tokenTimestamp = typeof token.timestamp === "string" ? token.timestamp : null; if (!tokenTimestamp) continue; const lastUsage = info.last_token_usage; const totalUsage = info.total_token_usage; const delta = pickDelta(lastUsage, totalUsage, totals); if (!delta) continue; delta.conversation_count = 1; if (totalUsage && typeof totalUsage === "object") { totals = totalUsage; } const bucketStart = toUtcHalfHourStart(tokenTimestamp); if (!bucketStart) continue; const bucket = getHourlyBucket(hourlyState, source, model, bucketStart); addTotals(bucket.totals, delta); touchedBuckets.add(bucketKey(source, model, bucketStart)); if (currentProjectKey && projectState && projectTouchedBuckets) { const projectBucket = getProjectBucket( projectState, currentProjectKey, source, bucketStart, currentProjectRef, ); addTotals(projectBucket.totals, delta); projectTouchedBuckets.add(projectBucketKey(currentProjectKey, source, bucketStart)); } eventsAggregated += 1; } return { endOffset, lastTotal: totals, lastModel: model, eventsAggregated }; } async function parseClaudeFile({ filePath, startOffset, hourlyState, touchedBuckets, source, projectState, projectTouchedBuckets, projectRef, projectKey, seenMessageHashes, }) { const st = await fs.stat(filePath).catch(() => null); if (!st || !st.isFile()) return { endOffset: startOffset, eventsAggregated: 0 }; const endOffset = st.size; if (startOffset >= endOffset) return { endOffset, eventsAggregated: 0 }; const stream = fssync.createReadStream(filePath, { encoding: "utf8", start: startOffset }); const rl = readline.createInterface({ input: stream, crlfDelay: Infinity }); let eventsAggregated = 0; const isMainSession = !filePath.includes("/subagents/"); for await (const line of rl) { if (!line) continue; // Count user-typed messages as conversations (main sessions only). // Exclude tool_result messages — those are auto-generated by tool calls, // not manually typed by the user. Only count messages with a "text" block. if (isMainSession && line.includes('"type":"user"')) { let userObj; try { userObj = JSON.parse(line); } catch (_e) { /* skip */ } if (userObj?.type === "user") { const content = userObj?.message?.content; const hasText = typeof content === "string" || (Array.isArray(content) && content.some((b) => b?.type === "text")); if (hasText) { const userTs = typeof userObj?.timestamp === "string" ? userObj.timestamp : null; const userBucketStart = userTs ? toUtcHalfHourStart(userTs) : null; if (userBucketStart) { const userModel = DEFAULT_MODEL; const userBucket = getHourlyBucket(hourlyState, source, userModel, userBucketStart); userBucket.totals.conversation_count += 1; touchedBuckets.add(bucketKey(source, userModel, userBucketStart)); } } } } if (!line.includes('"usage"')) continue; let obj; try { obj = JSON.parse(line); } catch (_e) { continue; } const usage = obj?.message?.usage || obj?.usage; if (!usage || typeof usage !== "object") continue; const dedupHash = seenMessageHashes ? claudeMessageDedupKey(obj) : null; if (dedupHash && seenMessageHashes.has(dedupHash)) continue; const model = normalizeModelInput(obj?.message?.model || obj?.model) || DEFAULT_MODEL; const tokenTimestamp = typeof obj?.timestamp === "string" ? obj.timestamp : null; if (!tokenTimestamp) continue; const delta = normalizeClaudeUsage(usage); if (!delta || isAllZeroUsage(delta)) continue; if (dedupHash) seenMessageHashes.add(dedupHash); delta.conversation_count = 0; const bucketStart = toUtcHalfHourStart(tokenTimestamp); if (!bucketStart) continue; const bucket = getHourlyBucket(hourlyState, source, model, bucketStart); addTotals(bucket.totals, delta); touchedBuckets.add(bucketKey(source, model, bucketStart)); if (projectKey && projectState && projectTouchedBuckets) { const projectBucket = getProjectBucket( projectState, projectKey, source, bucketStart, projectRef, ); addTotals(projectBucket.totals, delta); projectTouchedBuckets.add(projectBucketKey(projectKey, source, bucketStart)); } eventsAggregated += 1; } rl.close(); stream.close?.(); return { endOffset, eventsAggregated }; } async function parseGeminiFile({ filePath, startIndex, lastTotals, lastModel, hourlyState, touchedBuckets, source, projectState, projectTouchedBuckets, projectRef, projectKey, }) { const raw = await fs.readFile(filePath, "utf8").catch(() => ""); if (!raw.trim()) return { lastIndex: startIndex, lastTotals, lastModel, eventsAggregated: 0 }; let session; try { session = JSON.parse(raw); } catch (_e) { return { lastIndex: startIndex, lastTotals, lastModel, eventsAggregated: 0 }; } const messages = Array.isArray(session?.messages) ? session.messages : []; if (startIndex >= messages.length) { startIndex = -1; lastTotals = null; lastModel = null; } let eventsAggregated = 0; let model = typeof lastModel === "string" ? lastModel : null; let totals = lastTotals && typeof lastTotals === "object" ? lastTotals : null; const begin = Number.isFinite(startIndex) ? startIndex + 1 : 0; for (let idx = begin; idx < messages.length; idx++) { const msg = messages[idx]; if (!msg || typeof msg !== "object") continue; const normalizedModel = normalizeModelInput(msg.model); if (normalizedModel) model = normalizedModel; const timestamp = typeof msg.timestamp === "string" ? msg.timestamp : null; const currentTotals = normalizeGeminiTokens(msg.tokens); if (!timestamp || !currentTotals) { totals = currentTotals || totals; continue; } const delta = diffGeminiTotals(currentTotals, totals); if (!delta || isAllZeroUsage(delta)) { totals = currentTotals; continue; } delta.conversation_count = 1; const bucketStart = toUtcHalfHourStart(timestamp); if (!bucketStart) { totals = currentTotals; continue; } const bucket = getHourlyBucket(hourlyState, source, model, bucketStart); addTotals(bucket.totals, delta); touchedBuckets.add(bucketKey(source, model, bucketStart)); if (projectKey && projectState && projectTouchedBuckets) { const projectBucket = getProjectBucket( projectState, projectKey, source, bucketStart, projectRef, ); addTotals(projectBucket.totals, delta); projectTouchedBuckets.add(projectBucketKey(projectKey, source, bucketStart)); } eventsAggregated += 1; totals = currentTotals; } return { lastIndex: messages.length - 1, lastTotals: totals, lastModel: model, eventsAggregated, }; } async function parseOpencodeMessageFile({ filePath, messageIndex, fallbackTotals, fallbackMessageKey, hourlyState, touchedBuckets, source, projectState, projectTouchedBuckets, projectRef, projectKey, }) { const fallbackKey = typeof fallbackMessageKey === "string" && fallbackMessageKey.trim() ? fallbackMessageKey.trim() : null; const legacyTotals = fallbackTotals && typeof fallbackTotals === "object" ? fallbackTotals : null; const fallbackEntry = messageIndex && fallbackKey ? messageIndex[fallbackKey] : null; const fallbackLastTotals = fallbackEntry && typeof fallbackEntry.lastTotals === "object" ? fallbackEntry.lastTotals : legacyTotals; const raw = await fs.readFile(filePath, "utf8").catch(() => ""); if (!raw.trim()) { return { messageKey: fallbackKey, lastTotals: fallbackLastTotals, eventsAggregated: 0, shouldUpdate: false, }; } let msg; try { msg = JSON.parse(raw); } catch (_e) { return { messageKey: fallbackKey, lastTotals: fallbackLastTotals, eventsAggregated: 0, shouldUpdate: false, }; } const messageKey = deriveOpencodeMessageKey(msg, filePath); const prev = messageIndex && messageKey ? messageIndex[messageKey] : null; const indexTotals = prev && typeof prev.lastTotals === "object" ? prev.lastTotals : null; const fallbackMatch = !fallbackKey || fallbackKey === messageKey; const lastTotals = indexTotals || (fallbackMatch ? fallbackLastTotals : null); const currentTotals = normalizeOpencodeTokens(msg?.tokens); if (!currentTotals) { return { messageKey, lastTotals, eventsAggregated: 0, shouldUpdate: false }; } const delta = diffGeminiTotals(currentTotals, lastTotals); if (!delta || isAllZeroUsage(delta)) { return { messageKey, lastTotals: currentTotals, eventsAggregated: 0, shouldUpdate: true }; } delta.conversation_count = 1; const timestampMs = coerceEpochMs(msg?.time?.completed) || coerceEpochMs(msg?.time?.created); if (!timestampMs) { return { messageKey, lastTotals, eventsAggregated: 0, shouldUpdate: Boolean(lastTotals), }; } const tsIso = new Date(timestampMs).toISOString(); const bucketStart = toUtcHalfHourStart(tsIso); if (!bucketStart) { return { messageKey, lastTotals, eventsAggregated: 0, shouldUpdate: Boolean(lastTotals), }; } const model = normalizeModelInput(msg?.modelID || msg?.model || msg?.modelId) || DEFAULT_MODEL; const bucket = getHourlyBucket(hourlyState, source, model, bucketStart); addTotals(bucket.totals, delta); touchedBuckets.add(bucketKey(source, model, bucketStart)); if (projectKey && projectState && projectTouchedBuckets) { const projectBucket = getProjectBucket( projectState, projectKey, source, bucketStart, projectRef, ); addTotals(projectBucket.totals, delta); projectTouchedBuckets.add(projectBucketKey(projectKey, source, bucketStart)); } return { messageKey, lastTotals: currentTotals, eventsAggregated: 1, shouldUpdate: true }; } async function enqueueTouchedBuckets({ queuePath, hourlyState, touchedBuckets }) { if (!touchedBuckets || touchedBuckets.size === 0) return 0; const touchedGroups = new Set(); for (const bucketStart of touchedBuckets) { const parsed = parseBucketKey(bucketStart); const hourStart = parsed.hourStart; if (!hourStart) continue; touchedGroups.add(groupBucketKey(parsed.source, hourStart)); } if (touchedGroups.size === 0) return 0; const groupQueued = hourlyState.groupQueued && typeof hourlyState.groupQueued === "object" ? hourlyState.groupQueued : {}; let codexTouched = false; const legacyGroups = new Set(); for (const groupKey of touchedGroups) { if (Object.prototype.hasOwnProperty.call(groupQueued, groupKey)) { legacyGroups.add(groupKey); } if (!codexTouched && groupKey.startsWith(`${DEFAULT_SOURCE}${BUCKET_SEPARATOR}`)) { codexTouched = true; } } const groupedBuckets = new Map(); for (const [key, bucket] of Object.entries(hourlyState.buckets || {})) { if (!bucket || !bucket.totals) continue; const parsed = parseBucketKey(key); const hourStart = parsed.hourStart; if (!hourStart) continue; const groupKey = groupBucketKey(parsed.source, hourStart); if (!touchedGroups.has(groupKey) || legacyGroups.has(groupKey)) continue; const source = normalizeSourceInput(parsed.source) || DEFAULT_SOURCE; const model = normalizeModelInput(parsed.model) || DEFAULT_MODEL; let group = groupedBuckets.get(groupKey); if (!group) { group = { source, hourStart, buckets: new Map() }; groupedBuckets.set(groupKey, group); } if (bucket.queuedKey != null && typeof bucket.queuedKey !== "string") { bucket.queuedKey = null; } group.buckets.set(model, bucket); } if (codexTouched) { const recomputeGroups = new Set(); for (const [key, bucket] of Object.entries(hourlyState.buckets || {})) { if (!bucket || !bucket.totals) continue; const parsed = parseBucketKey(key); const hourStart = parsed.hourStart; if (!hourStart) continue; const source = normalizeSourceInput(parsed.source) || DEFAULT_SOURCE; if (source !== "every-code") continue; const groupKey = groupBucketKey(source, hourStart); if (legacyGroups.has(groupKey) || groupedBuckets.has(groupKey)) continue; const model = normalizeModelInput(parsed.model) || DEFAULT_MODEL; if (model !== DEFAULT_MODEL) continue; recomputeGroups.add(groupKey); } if (recomputeGroups.size > 0) { for (const [key, bucket] of Object.entries(hourlyState.buckets || {})) { if (!bucket || !bucket.totals) continue; const parsed = parseBucketKey(key); const hourStart = parsed.hourStart; if (!hourStart) continue; const source = normalizeSourceInput(parsed.source) || DEFAULT_SOURCE; const groupKey = groupBucketKey(source, hourStart); if (!recomputeGroups.has(groupKey)) continue; let group = groupedBuckets.get(groupKey); if (!group) { group = { source, hourStart, buckets: new Map() }; groupedBuckets.set(groupKey, group); } if (bucket.queuedKey != null && typeof bucket.queuedKey !== "string") { bucket.queuedKey = null; } const model = normalizeModelInput(parsed.model) || DEFAULT_MODEL; group.buckets.set(model, bucket); } } } const codexDominants = collectCodexDominantModels(hourlyState); const toAppend = []; for (const group of groupedBuckets.values()) { const unknownBucket = group.buckets.get(DEFAULT_MODEL) || null; const dominantModel = pickDominantModel(group.buckets); let alignedModel = null; if (unknownBucket?.alignedModel) { const normalized = normalizeModelInput(unknownBucket.alignedModel); alignedModel = normalized && normalized !== DEFAULT_MODEL ? normalized : null; } const zeroTotals = initTotals(); const zeroKey = totalsKey(zeroTotals); if (dominantModel) { if (alignedModel && !group.buckets.has(alignedModel)) { toAppend.push( JSON.stringify({ source: group.source, model: alignedModel, hour_start: group.hourStart, input_tokens: zeroTotals.input_tokens, cached_input_tokens: zeroTotals.cached_input_tokens, cache_creation_input_tokens: zeroTotals.cache_creation_input_tokens, output_tokens: zeroTotals.output_tokens, reasoning_output_tokens: zeroTotals.reasoning_output_tokens, total_tokens: zeroTotals.total_tokens, billable_total_tokens: zeroTotals.billable_total_tokens, conversation_count: zeroTotals.conversation_count, }), ); } if ( unknownBucket && !alignedModel && unknownBucket.queuedKey && unknownBucket.queuedKey !== zeroKey ) { if (unknownBucket.retractedUnknownKey !== zeroKey) { toAppend.push( JSON.stringify({ source: group.source, model: DEFAULT_MODEL, hour_start: group.hourStart, input_tokens: zeroTotals.input_tokens, cached_input_tokens: zeroTotals.cached_input_tokens, output_tokens: zeroTotals.output_tokens, reasoning_output_tokens: zeroTotals.reasoning_output_tokens, total_tokens: zeroTotals.total_tokens, billable_total_tokens: zeroTotals.billable_total_tokens, conversation_count: zeroTotals.conversation_count, }), ); unknownBucket.retractedUnknownKey = zeroKey; } } if (unknownBucket) unknownBucket.alignedModel = null; for (const [model, bucket] of group.buckets.entries()) { if (model === DEFAULT_MODEL) continue; let totals = bucket.totals; if (model === dominantModel && unknownBucket?.totals) { totals = cloneTotals(bucket.totals); addTotals(totals, unknownBucket.totals); } const key = totalsKey(totals); if (bucket.queuedKey === key) continue; toAppend.push( JSON.stringify({ source: group.source, model, hour_start: group.hourStart, input_tokens: totals.input_tokens, cached_input_tokens: totals.cached_input_tokens, cache_creation_input_tokens: totals.cache_creation_input_tokens, output_tokens: totals.output_tokens, reasoning_output_tokens: totals.reasoning_output_tokens, total_tokens: totals.total_tokens, billable_total_tokens: totals.billable_total_tokens ?? totals.total_tokens, conversation_count: totals.conversation_count, }), ); bucket.queuedKey = key; } continue; } if (!unknownBucket?.totals) continue; let outputModel = DEFAULT_MODEL; if (group.source === "every-code") { const aligned = findNearestCodexModel(group.hourStart, codexDominants); if (aligned) outputModel = aligned; } const nextAligned = outputModel !== DEFAULT_MODEL ? outputModel : null; if (alignedModel && alignedModel !== nextAligned) { toAppend.push( JSON.stringify({ source: group.source, model: alignedModel, hour_start: group.hourStart, input_tokens: zeroTotals.input_tokens, cached_input_tokens: zeroTotals.cached_input_tokens, output_tokens: zeroTotals.output_tokens, reasoning_output_tokens: zeroTotals.reasoning_output_tokens, total_tokens: zeroTotals.total_tokens, billable_total_tokens: zeroTotals.billable_total_tokens, conversation_count: zeroTotals.conversation_count, }), ); } if ( !alignedModel && nextAligned && unknownBucket.queuedKey && unknownBucket.queuedKey !== zeroKey ) { if (unknownBucket.retractedUnknownKey !== zeroKey) { toAppend.push( JSON.stringify({ source: group.source, model: DEFAULT_MODEL, hour_start: group.hourStart, input_tokens: zeroTotals.input_tokens, cached_input_tokens: zeroTotals.cached_input_tokens, cache_creation_input_tokens: zeroTotals.cache_creation_input_tokens, output_tokens: zeroTotals.output_tokens, reasoning_output_tokens: zeroTotals.reasoning_output_tokens, total_tokens: zeroTotals.total_tokens, billable_total_tokens: zeroTotals.billable_total_tokens, conversation_count: zeroTotals.conversation_count, }), ); unknownBucket.retractedUnknownKey = zeroKey; } } if (unknownBucket) unknownBucket.alignedModel = nextAligned; const key = totalsKey(unknownBucket.totals); const outputKey = outputModel === DEFAULT_MODEL ? key : `${key}|${outputModel}`; if (unknownBucket.queuedKey === outputKey) continue; toAppend.push( JSON.stringify({ source: group.source, model: outputModel, hour_start: group.hourStart, input_tokens: unknownBucket.totals.input_tokens, cached_input_tokens: unknownBucket.totals.cached_input_tokens, cache_creation_input_tokens: unknownBucket.totals.cache_creation_input_tokens, output_tokens: unknownBucket.totals.output_tokens, reasoning_output_tokens: unknownBucket.totals.reasoning_output_tokens, total_tokens: unknownBucket.totals.total_tokens, billable_total_tokens: unknownBucket.totals.billable_total_tokens ?? unknownBucket.totals.total_tokens, conversation_count: unknownBucket.totals.conversation_count, }), ); unknownBucket.queuedKey = outputKey; } if (legacyGroups.size > 0) { const grouped = new Map(); for (const [key, bucket] of Object.entries(hourlyState.buckets || {})) { if (!bucket || !bucket.totals) continue; const parsed = parseBucketKey(key); const hourStart = parsed.hourStart; if (!hourStart) continue; const groupKey = groupBucketKey(parsed.source, hourStart); if (!legacyGroups.has(groupKey)) continue; let group = grouped.get(groupKey); if (!group) { group = { source: normalizeSourceInput(parsed.source) || DEFAULT_SOURCE, hourStart, models: new Set(), totals: initTotals(), }; grouped.set(groupKey, group); } group.models.add(parsed.model || DEFAULT_MODEL); addTotals(group.totals, bucket.totals); } for (const group of grouped.values()) { const model = group.models.size === 1 ? [...group.models][0] : DEFAULT_MODEL; const key = totalsKey(group.totals); const groupKey = groupBucketKey(group.source, group.hourStart); if (groupQueued[groupKey] === key) continue; toAppend.push( JSON.stringify({ source: group.source, model, hour_start: group.hourStart, input_tokens: group.totals.input_tokens, cached_input_tokens: group.totals.cached_input_tokens, cache_creation_input_tokens: group.totals.cache_creation_input_tokens, output_tokens: group.totals.output_tokens, reasoning_output_tokens: group.totals.reasoning_output_tokens, total_tokens: group.totals.total_tokens, billable_total_tokens: group.totals.billable_total_tokens ?? group.totals.total_tokens, conversation_count: group.totals.conversation_count, }), ); groupQueued[groupKey] = key; } } hourlyState.groupQueued = groupQueued; if (toAppend.length > 0) { await fs.appendFile(queuePath, toAppend.join("\n") + "\n", "utf8"); } return toAppend.length; } async function enqueueTouchedProjectBuckets({ projectQueuePath, projectState, projectTouchedBuckets, }) { if ( !projectQueuePath || !projectState || !projectTouchedBuckets || projectTouchedBuckets.size === 0 ) return 0; await ensureDir(path.dirname(projectQueuePath)); const toAppend = []; for (const key of projectTouchedBuckets) { const bucket = projectState.buckets[key]; if (!bucket || !bucket.totals) continue; const totals = bucket.totals; const queuedKey = totalsKey(totals); if (bucket.queuedKey === queuedKey) continue; const projectRef = typeof bucket.project_ref === "string" ? bucket.project_ref : null; const projectKey = typeof bucket.project_key === "string" ? bucket.project_key : null; if (!projectRef || !projectKey) continue; toAppend.push( JSON.stringify({ project_ref: projectRef, project_key: projectKey, source: bucket.source, hour_start: bucket.hour_start, input_tokens: totals.input_tokens, cached_input_tokens: totals.cached_input_tokens, cache_creation_input_tokens: totals.cache_creation_input_tokens, output_tokens: totals.output_tokens, reasoning_output_tokens: totals.reasoning_output_tokens, total_tokens: totals.total_tokens, billable_total_tokens: totals.billable_total_tokens ?? totals.total_tokens, conversation_count: totals.conversation_count, }), ); bucket.queuedKey = queuedKey; } if (toAppend.length > 0) { await fs.appendFile(projectQueuePath, toAppend.join("\n") + "\n", "utf8"); } return toAppend.length; } function pickDominantModel(buckets) { let dominantModel = null; let dominantTotal = -1; for (const [model, bucket] of buckets.entries()) { if (model === DEFAULT_MODEL) continue; const total = Number(bucket?.totals?.total_tokens || 0); if ( dominantModel == null || total > dominantTotal || (total === dominantTotal && model < dominantModel) ) { dominantModel = model; dominantTotal = total; } } return dominantModel; } function cloneTotals(totals) { const cloned = initTotals(); addTotals(cloned, totals || {}); return cloned; } function collectCodexDominantModels(hourlyState) { const grouped = new Map(); for (const [key, bucket] of Object.entries(hourlyState.buckets || {})) { if (!bucket || !bucket.totals) continue; const parsed = parseBucketKey(key); const hourStart = parsed.hourStart; if (!hourStart) continue; const source = normalizeSourceInput(parsed.source) || DEFAULT_SOURCE; if (source !== DEFAULT_SOURCE) continue; const model = normalizeModelInput(parsed.model) || DEFAULT_MODEL; if (model === DEFAULT_MODEL) continue; let models = grouped.get(hourStart); if (!models) { models = new Map(); grouped.set(hourStart, models); } const total = Number(bucket.totals.total_tokens || 0); models.set(model, (models.get(model) || 0) + total); } const dominants = []; for (const [hourStart, models] of grouped.entries()) { let dominantModel = null; let dominantTotal = -1; for (const [model, total] of models.entries()) { if ( dominantModel == null || total > dominantTotal || (total === dominantTotal && model < dominantModel) ) { dominantModel = model; dominantTotal = total; } } if (dominantModel) { dominants.push({ hourStart, model: dominantModel }); } } return dominants; } function findNearestCodexModel(hourStart, dominants) { if (!hourStart || !dominants || dominants.length === 0) return null; const target = Date.parse(hourStart); if (!Number.isFinite(target)) return null; let best = null; for (const entry of dominants) { const candidate = Date.parse(entry.hourStart); if (!Number.isFinite(candidate)) continue; const diff = Math.abs(candidate - target); if (!best || diff < best.diff || (diff === best.diff && candidate < best.time)) { best = { diff, time: candidate, model: entry.model }; } } return best ? best.model : null; } function normalizeHourlyState(raw) { const state = raw && typeof raw === "object" ? raw : {}; const version = Number(state.version || 1); const rawBuckets = state.buckets && typeof state.buckets === "object" ? state.buckets : {}; const buckets = {}; const groupQueued = {}; if (!Number.isFinite(version) || version < 2) { for (const [key, value] of Object.entries(rawBuckets)) { const parsed = parseBucketKey(key); const hourStart = parsed.hourStart; if (!hourStart) continue; const source = normalizeSourceInput(parsed.source) || DEFAULT_SOURCE; const normalizedKey = bucketKey(source, DEFAULT_MODEL, hourStart); buckets[normalizedKey] = value; if (value?.queuedKey) { groupQueued[groupBucketKey(source, hourStart)] = value.queuedKey; } } return { version: 3, buckets, groupQueued, updatedAt: typeof state.updatedAt === "string" ? state.updatedAt : null, }; } for (const [key, value] of Object.entries(rawBuckets)) { const parsed = parseBucketKey(key); const hourStart = parsed.hourStart; if (!hourStart) continue; const normalizedKey = bucketKey(parsed.source, parsed.model, hourStart); buckets[normalizedKey] = value; } const existingGroupQueued = state.groupQueued && typeof state.groupQueued === "object" ? state.groupQueued : {}; return { version: 3, buckets, groupQueued: version >= 3 ? existingGroupQueued : {}, updatedAt: typeof state.updatedAt === "string" ? state.updatedAt : null, }; } function normalizeProjectState(raw) { const state = raw && typeof raw === "object" ? raw : {}; const rawBuckets = state.buckets && typeof state.buckets === "object" ? state.buckets : {}; const buckets = {}; const rawProjects = state.projects && typeof state.projects === "object" ? state.projects : {}; const projects = {}; for (const [key, value] of Object.entries(rawBuckets)) { if (!key) continue; buckets[key] = value; } for (const [key, value] of Object.entries(rawProjects)) { if (!key || !value || typeof value !== "object") continue; projects[key] = { ...value }; } return { version: 2, buckets, projects, updatedAt: typeof state.updatedAt === "string" ? state.updatedAt : null, }; } function normalizeOpencodeState(raw) { const state = raw && typeof raw === "object" ? raw : {}; const messages = state.messages && typeof state.messages === "object" ? state.messages : {}; return { messages, updatedAt: typeof state.updatedAt === "string" ? state.updatedAt : null, }; } function normalizeMessageKeyPart(value) { if (typeof value !== "string") return ""; return value.trim(); } function deriveOpencodeMessageKey(msg, fallback) { const sessionId = normalizeMessageKeyPart(msg?.sessionID || msg?.sessionId || msg?.session_id); const messageId = normalizeMessageKeyPart(msg?.id || msg?.messageID || msg?.messageId); if (sessionId && messageId) return `${sessionId}|${messageId}`; return fallback; } function getHourlyBucket(state, source, model, hourStart) { const buckets = state.buckets; const normalizedSource = normalizeSourceInput(source) || DEFAULT_SOURCE; const normalizedModel = normalizeModelInput(model) || DEFAULT_MODEL; const key = bucketKey(normalizedSource, normalizedModel, hourStart); let bucket = buckets[key]; if (!bucket || typeof bucket !== "object") { bucket = { totals: initTotals(), queuedKey: null }; buckets[key] = bucket; return bucket; } if (!bucket.totals || typeof bucket.totals !== "object") { bucket.totals = initTotals(); } if (bucket.queuedKey != null && typeof bucket.queuedKey !== "string") { bucket.queuedKey = null; } return bucket; } function getProjectBucket(state, projectKey, source, hourStart, projectRef) { const buckets = state.buckets; const normalizedSource = normalizeSourceInput(source) || DEFAULT_SOURCE; const key = projectBucketKey(projectKey, normalizedSource, hourStart); let bucket = buckets[key]; if (!bucket || typeof bucket !== "object") { bucket = { totals: initTotals(), queuedKey: null, project_key: projectKey, project_ref: projectRef, source: normalizedSource, hour_start: hourStart, }; buckets[key] = bucket; return bucket; } if (!bucket.totals || typeof bucket.totals !== "object") { bucket.totals = initTotals(); } if (bucket.queuedKey != null && typeof bucket.queuedKey !== "string") { bucket.queuedKey = null; } if (projectRef) bucket.project_ref = projectRef; if (projectKey) bucket.project_key = projectKey; bucket.source = normalizedSource; bucket.hour_start = hourStart; return bucket; } function initTotals() { return { input_tokens: 0, cached_input_tokens: 0, cache_creation_input_tokens: 0, output_tokens: 0, reasoning_output_tokens: 0, total_tokens: 0, billable_total_tokens: 0, conversation_count: 0, }; } function addTotals(target, delta) { target.input_tokens += delta.input_tokens || 0; target.cached_input_tokens += delta.cached_input_tokens || 0; target.cache_creation_input_tokens += delta.cache_creation_input_tokens || 0; target.output_tokens += delta.output_tokens || 0; target.reasoning_output_tokens += delta.reasoning_output_tokens || 0; target.total_tokens += delta.total_tokens || 0; target.billable_total_tokens += delta.billable_total_tokens ?? delta.total_tokens ?? 0; target.conversation_count += delta.conversation_count || 0; } function totalsKey(totals) { return [ totals.input_tokens || 0, totals.cached_input_tokens || 0, totals.cache_creation_input_tokens || 0, totals.output_tokens || 0, totals.reasoning_output_tokens || 0, totals.total_tokens || 0, totals.billable_total_tokens ?? totals.total_tokens ?? 0, totals.conversation_count || 0, ].join("|"); } function toUtcHalfHourStart(ts) { const dt = new Date(ts); if (!Number.isFinite(dt.getTime())) return null; const minutes = dt.getUTCMinutes(); const halfMinute = minutes >= 30 ? 30 : 0; const bucketStart = new Date( Date.UTC( dt.getUTCFullYear(), dt.getUTCMonth(), dt.getUTCDate(), dt.getUTCHours(), halfMinute, 0, 0, ), ); return bucketStart.toISOString(); } function normalizeNonNegativeNumber(value) { const n = Number(value || 0); if (!Number.isFinite(n) || n <= 0) return 0; return n; } function bucketKey(source, model, hourStart) { const safeSource = normalizeSourceInput(source) || DEFAULT_SOURCE; const safeModel = normalizeModelInput(model) || DEFAULT_MODEL; return `${safeSource}${BUCKET_SEPARATOR}${safeModel}${BUCKET_SEPARATOR}${hourStart}`; } function projectBucketKey(projectKey, source, hourStart) { const safeSource = normalizeSourceInput(source) || DEFAULT_SOURCE; return `${projectKey}${BUCKET_SEPARATOR}${safeSource}${BUCKET_SEPARATOR}${hourStart}`; } function groupBucketKey(source, hourStart) { const safeSource = normalizeSourceInput(source) || DEFAULT_SOURCE; return `${safeSource}${BUCKET_SEPARATOR}${hourStart}`; } function parseBucketKey(key) { if (typeof key !== "string") return { source: DEFAULT_SOURCE, model: DEFAULT_MODEL, hourStart: "" }; const first = key.indexOf(BUCKET_SEPARATOR); if (first <= 0) return { source: DEFAULT_SOURCE, model: DEFAULT_MODEL, hourStart: key }; const second = key.indexOf(BUCKET_SEPARATOR, first + 1); if (second <= 0) { return { source: key.slice(0, first), model: DEFAULT_MODEL, hourStart: key.slice(first + 1) }; } return { source: key.slice(0, first), model: key.slice(first + 1, second), hourStart: key.slice(second + 1), }; } function normalizeSourceInput(value) { if (typeof value !== "string") return null; const trimmed = value.trim().toLowerCase(); return trimmed.length > 0 ? trimmed : null; } function normalizeModelInput(value) { if (typeof value !== "string") return null; const trimmed = value.trim(); return trimmed.length > 0 ? trimmed : null; } async function resolveProjectMetaForPath(startDir, cache) { if (!startDir || typeof startDir !== "string") return null; if (cache && cache.has(startDir)) return cache.get(startDir); if (startDir.includes(CLAUDE_MEM_OBSERVER_PATH_SEGMENT)) { const meta = { projectRef: CLAUDE_MEM_OBSERVER_PROJECT_REF, repoRoot: startDir }; if (cache) cache.set(startDir, meta); return meta; } const visited = []; let current = startDir; const root = path.parse(startDir).root; while (current) { if (cache && cache.has(current)) { const cached = cache.get(current); for (const entry of visited) cache.set(entry, cached); return cached; } visited.push(current); const configPath = await resolveGitConfigPath(current); if (configPath) { const remoteUrl = await readGitRemoteUrl(configPath); const projectRef = canonicalizeProjectRef(remoteUrl); const meta = { projectRef: projectRef || null, repoRoot: current }; if (cache) { for (const entry of visited) cache.set(entry, meta); } return meta; } if (current === root) break; const parent = path.dirname(current); if (!parent || parent === current) break; current = parent; } if (cache) { for (const entry of visited) cache.set(entry, null); } return null; } function hashRepoRoot(repoRoot) { return crypto.createHash("sha256").update(String(repoRoot)).digest("hex"); } function deriveProjectKeyFromRef(projectRef) { if (typeof projectRef !== "string") return null; try { const parsed = new URL(projectRef); const segments = parsed.pathname.split("/").filter(Boolean); if (segments.length < 2) return null; // GitHub paths are always owner/repo, but GitLab supports nested groups // (group/subgroup/.../repo). Preserve the full path so nested-group repos // don't collapse to the first two segments. return segments.join("/"); } catch (_e) { return null; } } async function defaultPublicRepoResolver({ projectRef, repoRoot }) { const repoRootHash = repoRoot ? hashRepoRoot(repoRoot) : null; const projectKey = deriveProjectKeyFromRef(projectRef); if (!projectKey) { return { status: "blocked", projectKey: null, projectRef: projectRef || null, repoRootHash, reason: projectRef ? "unparseable_ref" : "missing_ref", }; } return { status: "public_verified", projectKey, projectRef, repoRootHash, }; } function recordProjectMeta(projectState, meta) { if (!projectState || !meta || typeof meta !== "object") return; const repoRootHash = typeof meta.repoRootHash === "string" ? meta.repoRootHash : null; let projectKey = typeof meta.projectKey === "string" ? meta.projectKey : null; if ( !projectKey && repoRootHash && projectState.projects && typeof projectState.projects === "object" ) { for (const [key, entry] of Object.entries(projectState.projects)) { if (entry && entry.repo_root_hash === repoRootHash) { projectKey = key; break; } } } if (!projectKey) return; if (!projectState.projects || typeof projectState.projects !== "object") { projectState.projects = {}; } const prev = projectState.projects[projectKey] || {}; const status = typeof meta.status === "string" ? meta.status : null; const projectRef = typeof meta.projectRef === "string" ? meta.projectRef : null; const next = { ...prev, project_ref: projectRef || prev.project_ref || null, status: status || prev.status || null, repo_root_hash: repoRootHash || prev.repo_root_hash || null, updated_at: new Date().toISOString(), }; if (status === "blocked" && prev.status !== "blocked") { next.purge_pending = true; } else if (status && status !== "blocked") { next.purge_pending = false; } projectState.projects[projectKey] = next; } async function resolveProjectContextForFile({ filePath, projectMetaCache, publicRepoCache, publicRepoResolver, projectState, }) { if (!filePath) return null; return resolveProjectContextForPath({ startDir: path.dirname(filePath), projectMetaCache, publicRepoCache, publicRepoResolver, projectState, }); } async function resolveProjectContextForPath({ startDir, projectMetaCache, publicRepoCache, publicRepoResolver, projectState, }) { if (!startDir) return null; const projectMeta = await resolveProjectMetaForPath(startDir, projectMetaCache); if (!projectMeta) return null; const resolver = typeof publicRepoResolver === "function" ? publicRepoResolver : defaultPublicRepoResolver; const meta = await resolver({ projectRef: projectMeta.projectRef, repoRoot: projectMeta.repoRoot, cache: publicRepoCache, }); const repoRootHash = projectMeta.repoRoot ? hashRepoRoot(projectMeta.repoRoot) : null; const normalized = { ...(meta || {}), projectRef: meta?.projectRef || projectMeta.projectRef, projectKey: meta?.projectKey || null, status: meta?.status || "blocked", repoRootHash: meta?.repoRootHash || repoRootHash, }; recordProjectMeta(projectState, normalized); if (normalized.status !== "public_verified") { return { projectRef: normalized.projectRef, projectKey: null, status: normalized.status }; } return { projectRef: normalized.projectRef, projectKey: normalized.projectKey, status: normalized.status, }; } async function resolveGitConfigPath(rootDir) { const gitPath = path.join(rootDir, ".git"); const st = await fs.stat(gitPath).catch(() => null); if (!st) return null; if (st.isDirectory()) { const configPath = path.join(gitPath, "config"); const cfg = await fs.stat(configPath).catch(() => null); return cfg && cfg.isFile() ? configPath : null; } if (st.isFile()) { const content = await fs.readFile(gitPath, "utf8").catch(() => ""); const match = content.match(/gitdir:\s*(.+)/i); if (!match) return null; let gitDir = match[1].trim(); if (!gitDir) return null; if (!path.isAbsolute(gitDir)) { gitDir = path.resolve(rootDir, gitDir); } const configPath = path.join(gitDir, "config"); const cfg = await fs.stat(configPath).catch(() => null); if (cfg && cfg.isFile()) return configPath; const commonDirRaw = await fs.readFile(path.join(gitDir, "commondir"), "utf8").catch(() => ""); const commonDirRel = commonDirRaw.trim(); if (!commonDirRel) return null; let commonDir = commonDirRel; if (!path.isAbsolute(commonDir)) { commonDir = path.resolve(gitDir, commonDir); } const commonConfigPath = path.join(commonDir, "config"); const commonCfg = await fs.stat(commonConfigPath).catch(() => null); return commonCfg && commonCfg.isFile() ? commonConfigPath : null; } return null; } async function readGitRemoteUrl(configPath) { const raw = await fs.readFile(configPath, "utf8").catch(() => ""); if (!raw.trim()) return null; const remotes = new Map(); let current = null; for (const line of raw.split(/\r?\n/)) { const sectionHeader = line.match(/^\s*\[[^\]]+\]\s*$/); if (sectionHeader) { const sectionMatch = line.match(/^\s*\[remote\s+"([^"]+)"\]\s*$/i); current = sectionMatch ? sectionMatch[1] : null; continue; } if (!current) continue; const urlMatch = line.match(/^\s*url\s*=\s*(.+)\s*$/i); if (urlMatch) { remotes.set(current, urlMatch[1].trim()); } } if (remotes.has("origin")) return remotes.get("origin"); const first = remotes.values().next(); return first.done ? null : first.value; } function canonicalizeProjectRef(remoteUrl) { if (typeof remoteUrl !== "string") return null; let ref = remoteUrl.trim(); if (!ref) return null; if (ref.startsWith("file://")) return null; if (path.isAbsolute(ref) || path.win32.isAbsolute(ref)) return null; const gitAtMatch = ref.match(/^git@([^:]+):(.+)$/i); if (gitAtMatch) { ref = `https://${gitAtMatch[1]}/${gitAtMatch[2]}`; } else if (ref.startsWith("ssh://")) { try { const parsed = new URL(ref); ref = `https://${parsed.hostname}${parsed.pathname}`; } catch (_e) { return null; } } else if (ref.startsWith("git://")) { ref = `https://${ref.slice("git://".length)}`; } else if (ref.startsWith("http://")) { ref = `https://${ref.slice("http://".length)}`; } else if (!ref.startsWith("https://")) { return null; } try { const parsed = new URL(ref); if (!parsed.hostname) return null; ref = `https://${parsed.hostname}${parsed.pathname}`; } catch (_e) { return null; } ref = ref.replace(/\.git$/i, ""); ref = ref.replace(/\/+$/, ""); return ref || null; } function normalizeGeminiTokens(tokens) { if (!tokens || typeof tokens !== "object") return null; const input = toNonNegativeInt(tokens.input); const cached = toNonNegativeInt(tokens.cached); const output = toNonNegativeInt(tokens.output); const tool = toNonNegativeInt(tokens.tool); const thoughts = toNonNegativeInt(tokens.thoughts); const reportedTotal = toNonNegativeInt(tokens.total); const computedTotal = input + cached + output + tool + thoughts; const total = Math.max(reportedTotal, computedTotal); return { input_tokens: input, cached_input_tokens: cached, cache_creation_input_tokens: 0, output_tokens: output + tool, reasoning_output_tokens: thoughts, total_tokens: total, }; } function normalizeOpencodeTokens(tokens) { if (!tokens || typeof tokens !== "object") return null; const input = toNonNegativeInt(tokens.input); const output = toNonNegativeInt(tokens.output); const reasoning = toNonNegativeInt(tokens.reasoning); const cached = toNonNegativeInt(tokens.cache?.read); const cacheWrite = toNonNegativeInt(tokens.cache?.write); const total = input + output + reasoning + cached + cacheWrite; return { input_tokens: input, cached_input_tokens: cached, cache_creation_input_tokens: cacheWrite, output_tokens: output, reasoning_output_tokens: reasoning, total_tokens: total, }; } function sameGeminiTotals(a, b) { if (!a || !b) return false; return ( a.input_tokens === b.input_tokens && a.cached_input_tokens === b.cached_input_tokens && a.cache_creation_input_tokens === b.cache_creation_input_tokens && a.output_tokens === b.output_tokens && a.reasoning_output_tokens === b.reasoning_output_tokens && a.total_tokens === b.total_tokens ); } function diffGeminiTotals(current, previous) { if (!current || typeof current !== "object") return null; if (!previous || typeof previous !== "object") return current; if (sameGeminiTotals(current, previous)) return null; const totalReset = (current.total_tokens || 0) < (previous.total_tokens || 0); if (totalReset) return current; // Must include cache_creation_input_tokens in both the equality check and // the delta — OpenCode routes through this diff and its cache.write number // would otherwise be permanently reported as zero. Gemini itself always // emits cache_creation=0 so the extra field is a no-op for Gemini. const delta = { input_tokens: Math.max(0, (current.input_tokens || 0) - (previous.input_tokens || 0)), cached_input_tokens: Math.max( 0, (current.cached_input_tokens || 0) - (previous.cached_input_tokens || 0), ), cache_creation_input_tokens: Math.max( 0, (current.cache_creation_input_tokens || 0) - (previous.cache_creation_input_tokens || 0), ), output_tokens: Math.max(0, (current.output_tokens || 0) - (previous.output_tokens || 0)), reasoning_output_tokens: Math.max( 0, (current.reasoning_output_tokens || 0) - (previous.reasoning_output_tokens || 0), ), total_tokens: Math.max(0, (current.total_tokens || 0) - (previous.total_tokens || 0)), }; return isAllZeroUsage(delta) ? null : delta; } function extractTokenCount(obj) { const payload = obj?.payload; if (!payload) return null; if (payload.type === "token_count") { return { info: payload.info, timestamp: obj?.timestamp || null }; } const msg = payload.msg; if (msg && msg.type === "token_count") { return { info: msg.info, timestamp: obj?.timestamp || null }; } return null; } function pickDelta(lastUsage, totalUsage, prevTotals) { const hasLast = isNonEmptyObject(lastUsage); const hasTotal = isNonEmptyObject(totalUsage); const hasPrevTotals = isNonEmptyObject(prevTotals); if (hasTotal && hasPrevTotals) { if (totalsReset(totalUsage, prevTotals)) { const resetUsage = hasLast ? lastUsage : totalUsage; const normalized = normalizeUsage(resetUsage); return isAllZeroUsage(normalized) ? null : normalized; } const delta = {}; for (const k of [ "input_tokens", "cached_input_tokens", "cache_creation_input_tokens", "output_tokens", "reasoning_output_tokens", "total_tokens", ]) { const a = Number(totalUsage[k]); const b = Number(prevTotals[k]); if (Number.isFinite(a) && Number.isFinite(b)) delta[k] = Math.max(0, a - b); } const normalized = normalizeUsage(delta); return isAllZeroUsage(normalized) ? null : normalized; } if (hasLast) { const normalized = normalizeUsage(lastUsage); return isAllZeroUsage(normalized) ? null : normalized; } if (hasTotal) { const normalized = normalizeUsage(totalUsage); return isAllZeroUsage(normalized) ? null : normalized; } return null; } function normalizeUsage(u) { const out = {}; for (const k of [ "input_tokens", "cached_input_tokens", "cache_creation_input_tokens", "output_tokens", "reasoning_output_tokens", "total_tokens", ]) { const n = Number(u[k] || 0); out[k] = Number.isFinite(n) && n >= 0 ? Math.floor(n) : 0; } // Codex rollouts (and Every Code, which shares the format) report // `input_tokens` as the TOTAL prompt, with `cached_input_tokens` as the // cached subset — i.e. the cached slice is INSIDE the input count. Our // queue schema (CLAUDE.md → Token Normalization Convention) stores // `input_tokens` as pure non-cached input and `cached_input_tokens` // separately. Without this subtraction the cost formula bills the cached // bytes twice: once at the full input rate and again at the cache_read // rate, producing ~6–7x cost inflation on cache-heavy Codex sessions // (verified against ccusage's per-day numbers on the same rollouts). // We intentionally leave `total_tokens` unchanged: Codex reports // total = input(inclusive of cached) + output, which numerically equals // our schema's non_cached + cached + output + 0 (cache_creation=0 here). out.input_tokens = Math.max(0, out.input_tokens - out.cached_input_tokens); return out; } // Stable dedup key for one Claude jsonl entry. Anthropic's official protocol // guarantees `message.id` is globally unique per response, so msgId alone is a // valid dedup key. Older code required both msgId AND requestId, which short- // circuited dedup entirely for jsonl entries where `requestId` is absent // (DeepSeek/Kimi/Mimo/MiniMax anthropic-compatible endpoints don't return the // `request-id` HTTP header, and Claude Code's sub-agent / thinking transport // paths drop the field too). The short-circuit caused 1.6–3.7x overcounting on // every affected provider — see issue #64. Falling back to msgId-only keeps // backward compatibility for the (msgId, reqId) format already persisted in // cursors.claudeHashes (msgId strings don't contain `:`, so the two formats // share the same Set without collision). function claudeMessageDedupKey(obj) { const msgId = typeof obj?.message?.id === "string" && obj.message.id ? obj.message.id : null; if (!msgId) return null; const reqId = typeof obj?.requestId === "string" && obj.requestId ? obj.requestId : null; return reqId ? `${msgId}:${reqId}` : msgId; } function normalizeClaudeUsage(u) { const inputTokens = toNonNegativeInt(u?.input_tokens); const outputTokens = toNonNegativeInt(u?.output_tokens); const cacheCreation = toNonNegativeInt(u?.cache_creation_input_tokens); const cacheRead = toNonNegativeInt(u?.cache_read_input_tokens); const totalTokens = inputTokens + outputTokens + cacheCreation + cacheRead; return { input_tokens: inputTokens, cached_input_tokens: cacheRead, cache_creation_input_tokens: cacheCreation, output_tokens: outputTokens, reasoning_output_tokens: 0, total_tokens: totalTokens, }; } function isNonEmptyObject(v) { return Boolean(v && typeof v === "object" && !Array.isArray(v) && Object.keys(v).length > 0); } function isAllZeroUsage(u) { if (!u || typeof u !== "object") return true; for (const k of [ "input_tokens", "cached_input_tokens", "cache_creation_input_tokens", "output_tokens", "reasoning_output_tokens", "total_tokens", ]) { if (Number(u[k] || 0) !== 0) return false; } return true; } function totalsReset(curr, prev) { const currTotal = curr?.total_tokens; const prevTotal = prev?.total_tokens; if (!isFiniteNumber(currTotal) || !isFiniteNumber(prevTotal)) return false; return currTotal < prevTotal; } function isFiniteNumber(v) { return typeof v === "number" && Number.isFinite(v); } function toNonNegativeInt(v) { const n = Number(v); if (!Number.isFinite(n) || n < 0) return 0; return Math.floor(n); } function coerceEpochMs(v) { const n = Number(v); if (!Number.isFinite(n) || n <= 0) return 0; if (n < 1e12) return Math.floor(n * 1000); return Math.floor(n); } async function safeReadDir(dir) { try { return await fs.readdir(dir, { withFileTypes: true }); } catch (_e) { return []; } } async function walkClaudeProjects(dir, out) { const entries = await safeReadDir(dir); for (const entry of entries) { const fullPath = path.join(dir, entry.name); if (entry.isDirectory()) { await walkClaudeProjects(fullPath, out); continue; } if (entry.isFile() && entry.name.endsWith(".jsonl")) out.push(fullPath); } } async function walkOpencodeMessages(dir, out) { const entries = await safeReadDir(dir); for (const entry of entries) { const fullPath = path.join(dir, entry.name); if (entry.isDirectory()) { await walkOpencodeMessages(fullPath, out); continue; } if (entry.isFile() && entry.name.startsWith("msg_") && entry.name.endsWith(".json")) out.push(fullPath); } } // --------------------------------------------------------------------------- // OpenCode SQLite DB reader (v1.2+ stores messages in opencode.db) // --------------------------------------------------------------------------- function readOpencodeDbMessages(dbPath, sqliteOptions = {}) { if (!dbPath || !fssync.existsSync(dbPath)) return []; const sql = `SELECT id, session_id, time_updated, data FROM message WHERE json_extract(data, '$.role') = 'assistant' ORDER BY time_created ASC`; const rows = readSqliteJsonRows(dbPath, sql, { label: "OpenCode", maxBuffer: 50 * 1024 * 1024, timeout: 30_000, ...sqliteOptions, }); const out = []; for (const row of rows) { if (!row || typeof row.data !== "string") continue; let data; try { data = JSON.parse(row.data); } catch (_e) { continue; } const tokens = data?.tokens; if (!tokens || typeof tokens !== "object") continue; // Skip messages with no meaningful token data const hasTokens = toNonNegativeInt(tokens.input) > 0 || toNonNegativeInt(tokens.output) > 0 || toNonNegativeInt(tokens.reasoning) > 0; if (!hasTokens) continue; out.push({ id: row.id || data.id, sessionID: row.session_id || data.sessionID, timeUpdated: row.time_updated || 0, data, }); } return out; } async function parseOpencodeDbIncremental({ dbMessages, cursors, queuePath, projectQueuePath, onProgress, source, cursorKey, publicRepoResolver, }) { await ensureDir(path.dirname(queuePath)); let messagesProcessed = 0; let eventsAggregated = 0; const cb = typeof onProgress === "function" ? onProgress : null; const messages = Array.isArray(dbMessages) ? dbMessages : []; const totalMessages = messages.length; const hourlyState = normalizeHourlyState(cursors?.hourly); const projectEnabled = typeof projectQueuePath === "string" && projectQueuePath.length > 0; const projectState = projectEnabled ? normalizeProjectState(cursors?.projectHourly) : null; const projectTouchedBuckets = projectEnabled ? new Set() : null; const projectMetaCache = projectEnabled ? new Map() : null; const publicRepoCache = projectEnabled ? new Map() : null; const cursorNamespace = typeof cursorKey === "string" && cursorKey.length > 0 ? cursorKey : "opencode"; const opencodeState = normalizeOpencodeState(cursors?.[cursorNamespace]); const messageIndex = opencodeState.messages; const touchedBuckets = new Set(); const defaultSource = normalizeSourceInput(source) || "opencode"; for (let idx = 0; idx < messages.length; idx++) { const entry = messages[idx]; const msg = entry.data; if (!msg) continue; // DB stores id/sessionID as separate columns; inject into msg for key derivation const msgForKey = { ...msg }; if (entry.id && !msgForKey.id) msgForKey.id = entry.id; if (entry.sessionID && !msgForKey.sessionID) msgForKey.sessionID = entry.sessionID; const messageKey = deriveOpencodeMessageKey(msgForKey, null); if (!messageKey) { messagesProcessed += 1; continue; } // Skip messages already indexed (from prior JSON-file parsing or previous DB sync) const prev = messageIndex[messageKey]; const lastTotals = prev && typeof prev.lastTotals === "object" ? prev.lastTotals : null; const currentTotals = normalizeOpencodeTokens(msg?.tokens); if (!currentTotals) { messagesProcessed += 1; continue; } const delta = diffGeminiTotals(currentTotals, lastTotals); if (!delta || isAllZeroUsage(delta)) { // Update index with current totals even if no delta (normalization may have changed) if (!sameGeminiTotals(currentTotals, lastTotals)) { messageIndex[messageKey] = { lastTotals: currentTotals, updatedAt: new Date().toISOString(), }; } messagesProcessed += 1; if (cb) { cb({ index: idx + 1, total: totalMessages, messagesProcessed, eventsAggregated, bucketsQueued: touchedBuckets.size, }); } continue; } delta.conversation_count = 1; const timestampMs = coerceEpochMs(msg?.time?.completed) || coerceEpochMs(msg?.time?.created); if (!timestampMs) { messagesProcessed += 1; continue; } const tsIso = new Date(timestampMs).toISOString(); const bucketStart = toUtcHalfHourStart(tsIso); if (!bucketStart) { messagesProcessed += 1; continue; } const model = normalizeModelInput(msg?.modelID || msg?.model || msg?.modelId) || DEFAULT_MODEL; const bucket = getHourlyBucket(hourlyState, defaultSource, model, bucketStart); addTotals(bucket.totals, delta); touchedBuckets.add(bucketKey(defaultSource, model, bucketStart)); if (projectEnabled) { const projectContext = await resolveProjectContextForDb({ msg, projectMetaCache, publicRepoCache, publicRepoResolver, projectState, }); const projectRef = projectContext?.projectRef || null; const projectKey = projectContext?.projectKey || null; if (projectKey && projectState && projectTouchedBuckets) { const projectBucket = getProjectBucket( projectState, projectKey, defaultSource, bucketStart, projectRef, ); addTotals(projectBucket.totals, delta); projectTouchedBuckets.add(projectBucketKey(projectKey, defaultSource, bucketStart)); } } messageIndex[messageKey] = { lastTotals: currentTotals, updatedAt: new Date().toISOString(), }; messagesProcessed += 1; eventsAggregated += 1; if (cb) { cb({ index: idx + 1, total: totalMessages, messagesProcessed, eventsAggregated, bucketsQueued: touchedBuckets.size, }); } } const bucketsQueued = await enqueueTouchedBuckets({ queuePath, hourlyState, touchedBuckets }); const projectBucketsQueued = projectEnabled ? await enqueueTouchedProjectBuckets({ projectQueuePath, projectState, projectTouchedBuckets }) : 0; hourlyState.updatedAt = new Date().toISOString(); cursors.hourly = hourlyState; opencodeState.updatedAt = new Date().toISOString(); cursors[cursorNamespace] = opencodeState; if (projectState) { projectState.updatedAt = new Date().toISOString(); cursors.projectHourly = projectState; } return { messagesProcessed, eventsAggregated, bucketsQueued, projectBucketsQueued }; } // Resolve project context from DB message (no file path available) async function resolveProjectContextForDb({ msg, projectMetaCache, publicRepoCache, publicRepoResolver, projectState, }) { const cwd = msg?.path?.cwd; if (!cwd || typeof cwd !== "string") return null; return resolveProjectContextForPath({ startDir: cwd, projectMetaCache, publicRepoCache, publicRepoResolver, projectState, }); } // ── Cursor (API-based) ── /** * Incremental parser for Cursor usage data fetched via API. * * Unlike other parsers that read local files, this one receives pre-parsed * CSV records from cursor-config.js and aggregates them into 30-min buckets. * * Incremental state is tracked in `cursors.cursorApi.lastRecordTimestamp`. */ async function parseCursorApiIncremental({ records, cursors, queuePath, onProgress, source, }) { await ensureDir(path.dirname(queuePath)); const defaultSource = normalizeSourceInput(source) || "cursor"; const hourlyState = normalizeHourlyState(cursors?.hourly); const touchedBuckets = new Set(); // Cursor's CSV is an account-level API export, not an append-only local log. // Treat the fetched CSV as authoritative so historical backfills and row // corrections replace prior local bucket totals instead of being skipped. const lastTs = cursors?.cursorApi?.lastRecordTimestamp || null; let latestTs = lastTs; let eventsAggregated = 0; const cb = typeof onProgress === "function" ? onProgress : null; const total = records.length; if (records.length > 0) { for (const [key, bucket] of Object.entries(hourlyState.buckets || {})) { const parsed = parseBucketKey(key); const sourceKey = normalizeSourceInput(parsed.source) || DEFAULT_SOURCE; if (sourceKey !== defaultSource) continue; if (!bucket?.totals) continue; bucket.totals = initTotals(); touchedBuckets.add(key); } } for (let i = 0; i < records.length; i++) { const record = records[i]; const recordDate = record.date; if (!recordDate) continue; const { normalizeCursorUsage } = require("./cursor-config"); const delta = normalizeCursorUsage(record); if (isAllZeroUsage(delta)) continue; delta.conversation_count = 1; const bucketStart = toUtcHalfHourStart(recordDate); if (!bucketStart) continue; const model = normalizeModelInput(record.model) || DEFAULT_MODEL; const bucket = getHourlyBucket(hourlyState, defaultSource, model, bucketStart); addTotals(bucket.totals, delta); touchedBuckets.add(bucketKey(defaultSource, model, bucketStart)); eventsAggregated += 1; // Track latest timestamp if (!latestTs || recordDate > latestTs) { latestTs = recordDate; } if (cb && (i % 200 === 0 || i === records.length - 1)) { cb({ index: i + 1, total, eventsAggregated, bucketsQueued: touchedBuckets.size, }); } } const bucketsQueued = await enqueueTouchedBuckets({ queuePath, hourlyState, touchedBuckets }); hourlyState.updatedAt = new Date().toISOString(); cursors.hourly = hourlyState; // Update cursor state if (!cursors.cursorApi) cursors.cursorApi = {}; if (latestTs && latestTs !== lastTs) { cursors.cursorApi.lastRecordTimestamp = latestTs; } cursors.cursorApi.updatedAt = new Date().toISOString(); return { recordsProcessed: total, eventsAggregated, bucketsQueued }; } // --------------------------------------------------------------------------- // Kiro token tracking (reads from devdata.sqlite or tokens_generated.jsonl) // --------------------------------------------------------------------------- function resolveKiroBasePath() { const home = require("node:os").homedir(); return path.join( home, "Library", "Application Support", "Kiro", "User", "globalStorage", "kiro.kiroagent", ); } function resolveKiroDbPath() { return path.join(resolveKiroBasePath(), "dev_data", "devdata.sqlite"); } function resolveKiroJsonlPath() { return path.join(resolveKiroBasePath(), "dev_data", "tokens_generated.jsonl"); } function readKiroDbTokens(dbPath, sinceId, sqliteOptions = {}) { if (!dbPath || !fssync.existsSync(dbPath)) return []; const minId = Number.isFinite(sinceId) && sinceId > 0 ? sinceId : 0; const sql = `SELECT id, model, provider, tokens_prompt, tokens_generated, timestamp FROM tokens_generated WHERE id > ${minId} ORDER BY id ASC`; return readSqliteJsonRows(dbPath, sql, { label: "Kiro", maxBuffer: 10 * 1024 * 1024, timeout: 15_000, ...sqliteOptions, }); } // Read Kiro token data from JSONL fallback (tokens_generated.jsonl). // Each line: {"model":"agent","provider":"kiro","promptTokens":N,"generatedTokens":N} // The fallback file does not include per-row timestamps, so newly appended rows are // bucketed using the file mtime observed during this sync. We track a separate JSONL // cursor so it never shares state with the SQLite path. function countKiroJsonlLines(jsonlPath) { if (!jsonlPath || !fssync.existsSync(jsonlPath)) return 0; try { const raw = fssync.readFileSync(jsonlPath, "utf8"); return raw.split("\n").filter((l) => l.trim()).length; } catch (_e) { return 0; } } function readKiroJsonlTokens(jsonlPath, sinceLineIndex) { if (!jsonlPath || !fssync.existsSync(jsonlPath)) { return { rows: [], lineCount: 0, reset: false }; } const startLine = Number.isFinite(sinceLineIndex) && sinceLineIndex > 0 ? sinceLineIndex : 0; let raw; try { raw = fssync.readFileSync(jsonlPath, "utf8"); } catch (_e) { return { rows: [], lineCount: 0, reset: false }; } const lines = raw.split("\n").filter((l) => l.trim()); const lineCount = lines.length; if (startLine > lineCount) { return { rows: [], lineCount, reset: true }; } let mtime; try { mtime = fssync.statSync(jsonlPath).mtime.toISOString(); } catch (_e) { mtime = new Date().toISOString(); } const timestamp = mtime.replace("T", " ").replace("Z", "").slice(0, 19); const rows = []; for (let i = startLine; i < lines.length; i++) { try { const obj = JSON.parse(lines[i]); rows.push({ id: i + 1, model: obj.model || "agent", provider: obj.provider || "kiro", tokens_prompt: obj.promptTokens || 0, tokens_generated: obj.generatedTokens || 0, timestamp, }); } catch (_e) { // skip malformed lines } } return { rows, lineCount, reset: false }; } // Build a sorted timeline of model usage from Kiro .chat metadata files function buildKiroModelTimeline(basePath) { const timeline = []; // [{ startMs, endMs, model }] if (!basePath || !fssync.existsSync(basePath)) return timeline; let dirs; try { dirs = fssync.readdirSync(basePath, { withFileTypes: true }); } catch (_e) { return timeline; } for (const entry of dirs) { if (!entry.isDirectory()) continue; const dirPath = path.join(basePath, entry.name); let files; try { files = fssync.readdirSync(dirPath).filter((f) => f.endsWith(".chat")); } catch (_e) { continue; } for (const file of files) { try { const raw = fssync.readFileSync(path.join(dirPath, file), "utf8"); const data = JSON.parse(raw); const meta = data?.metadata; if (!meta?.modelId || !meta?.startTime) continue; timeline.push({ startMs: meta.startTime, endMs: meta.endTime || meta.startTime, model: String(meta.modelId), }); } catch (_e) {} } } timeline.sort((a, b) => a.startMs - b.startMs); return timeline; } // Find the model for a given UTC timestamp string using the .chat timeline function resolveKiroModel(timeline, utcTimestamp) { if (!timeline.length || !utcTimestamp) return null; const ts = new Date(utcTimestamp).getTime(); if (!Number.isFinite(ts)) return null; // Binary search for the closest .chat entry let lo = 0; let hi = timeline.length - 1; let best = null; let bestDist = Infinity; while (lo <= hi) { const mid = (lo + hi) >>> 1; const entry = timeline[mid]; // Check if timestamp falls within the .chat execution window if (ts >= entry.startMs && ts <= entry.endMs) return entry.model; const dist = Math.min(Math.abs(ts - entry.startMs), Math.abs(ts - entry.endMs)); if (dist < bestDist) { bestDist = dist; best = entry.model; } if (ts < entry.startMs) hi = mid - 1; else lo = mid + 1; } // Only match if within 10 minutes return bestDist < 10 * 60 * 1000 ? best : null; } // Normalize Kiro internal model IDs to readable names // e.g. "CLAUDE_SONNET_4_20250514_V1_0" → "claude-sonnet-4" function normalizeKiroModelName(raw) { if (!raw || typeof raw !== "string") return null; let name = raw.trim(); if (!name) return null; // Already lowercase with dashes (e.g. "claude-opus-4.5") → keep as-is if (name === name.toLowerCase() && name.includes("-")) return name; // UPPER_SNAKE_CASE internal names: strip date/version suffixes, convert to lowercase-dash name = name .replace(/_\d{8}_V\d+_\d+$/i, "") // remove _20250514_V1_0 .replace(/_V\d+$/i, "") // remove _V1 .toLowerCase() .replace(/_/g, "-"); return name || null; } async function parseKiroIncremental({ dbPath, jsonlPath, cursors, queuePath, onProgress, sqliteOptions } = {}) { await ensureDir(path.dirname(queuePath)); const kiroState = cursors.kiro && typeof cursors.kiro === "object" ? cursors.kiro : {}; const lastDbId = typeof kiroState.lastDbId === "number" ? kiroState.lastDbId : (typeof kiroState.lastId === "number" ? kiroState.lastId : 0); const jsonlState = kiroState.jsonl && typeof kiroState.jsonl === "object" ? kiroState.jsonl : {}; const lastJsonlLine = typeof jsonlState.lastLine === "number" ? jsonlState.lastLine : 0; const resolvedDbPath = dbPath || resolveKiroDbPath(); const resolvedJsonlPath = jsonlPath || resolveKiroJsonlPath(); // Try SQLite first, fall back to JSONL. let rows = []; let nextDbId = lastDbId; let nextJsonlLine = lastJsonlLine; let usingDb = false; if (fssync.existsSync(resolvedDbPath)) { rows = readKiroDbTokens(resolvedDbPath, lastDbId, sqliteOptions); usingDb = true; // DB and JSONL are siblings for the same usage events. If the DB ever // disappears (corrupted / wiped) and we fall back to JSONL in a later // run, we must not re-read lines that the DB path already consumed. // Advance the JSONL line cursor to the current file tail. if (fssync.existsSync(resolvedJsonlPath)) { const tailLineCount = countKiroJsonlLines(resolvedJsonlPath); if (tailLineCount > nextJsonlLine) nextJsonlLine = tailLineCount; } } else if (fssync.existsSync(resolvedJsonlPath)) { const jsonlResult = readKiroJsonlTokens(resolvedJsonlPath, lastJsonlLine); rows = jsonlResult.rows; nextJsonlLine = jsonlResult.lineCount; if (jsonlResult.reset) { cursors.kiro = { ...kiroState, lastDbId, jsonl: { lastLine: jsonlResult.lineCount, updatedAt: new Date().toISOString() }, updatedAt: new Date().toISOString(), }; return { recordsProcessed: 0, eventsAggregated: 0, bucketsQueued: 0 }; } } else { return { recordsProcessed: 0, eventsAggregated: 0, bucketsQueued: 0 }; } if (rows.length === 0) { cursors.kiro = { ...kiroState, lastDbId, jsonl: { lastLine: nextJsonlLine, updatedAt: new Date().toISOString() }, updatedAt: new Date().toISOString(), }; return { recordsProcessed: 0, eventsAggregated: 0, bucketsQueued: 0 }; } // Build model timeline from .chat files for model name resolution const basePath = resolveKiroBasePath(); const modelTimeline = buildKiroModelTimeline(basePath); const hourlyState = normalizeHourlyState(cursors?.hourly); const touchedBuckets = new Set(); const cb = typeof onProgress === "function" ? onProgress : null; let eventsAggregated = 0; let maxId = lastDbId; for (let i = 0; i < rows.length; i++) { const row = rows[i]; const inputTokens = toNonNegativeInt(row.tokens_prompt); const outputTokens = toNonNegativeInt(row.tokens_generated); if (inputTokens === 0 && outputTokens === 0) continue; // timestamp format: "2026-01-09 15:25:30" (UTC from SQLite DEFAULT CURRENT_TIMESTAMP) const ts = row.timestamp ? row.timestamp.replace(" ", "T") + "Z" : null; const bucketStart = ts ? toUtcHalfHourStart(ts) : null; if (!bucketStart) continue; // Resolve actual model from .chat timeline, fallback to "kiro-agent" const resolvedModel = resolveKiroModel(modelTimeline, ts); const model = normalizeKiroModelName(resolvedModel) || "kiro-agent"; const delta = { input_tokens: inputTokens, cached_input_tokens: 0, output_tokens: outputTokens, reasoning_output_tokens: 0, total_tokens: inputTokens + outputTokens, conversation_count: 1, }; const bucket = getHourlyBucket(hourlyState, "kiro", model, bucketStart); addTotals(bucket.totals, delta); touchedBuckets.add(bucketKey("kiro", model, bucketStart)); eventsAggregated++; if (usingDb && row.id && row.id > maxId) maxId = row.id; if (cb) { cb({ index: i + 1, total: rows.length, recordsProcessed: i + 1, eventsAggregated, bucketsQueued: touchedBuckets.size, }); } } const bucketsQueued = await enqueueTouchedBuckets({ queuePath, hourlyState, touchedBuckets }); const updatedAt = new Date().toISOString(); hourlyState.updatedAt = updatedAt; cursors.hourly = hourlyState; cursors.kiro = { ...kiroState, lastId: maxId, lastDbId: maxId, jsonl: { lastLine: nextJsonlLine, updatedAt }, updatedAt, }; return { recordsProcessed: rows.length, eventsAggregated, bucketsQueued }; } // ───────────────────────────────────────────────────────────────────────────── // Hermes Agent — SQLite-based (sessions table in ~/.hermes/state.db) // ───────────────────────────────────────────────────────────────────────────── function resolveHermesPath(env = process.env) { const override = env.TOKENTRACKER_HERMES_HOME; if (typeof override === "string" && override.trim().length > 0) { return override.trim(); } const home = require("node:os").homedir(); const defaultPath = path.join(home, ".hermes"); // Hermes official Windows installer (install.ps1) writes state to // %LOCALAPPDATA%\hermes, not ~/.hermes. Prefer it when present so native // Windows users don't need to set TOKENTRACKER_HERMES_HOME manually. if (process.platform === "win32") { const localAppData = typeof env.LOCALAPPDATA === "string" ? env.LOCALAPPDATA.trim() : ""; if (localAppData.length > 0) { const winNative = path.join(localAppData, "hermes"); try { if (fssync.existsSync(winNative)) return winNative; } catch (_e) { } } } return defaultPath; } function resolveHermesDbPath(env = process.env) { return path.join(resolveHermesPath(env), "state.db"); } function resolveAllHermesDBPaths({ hermesPath, dbPath } = {}) { const hermesDir = hermesPath ?? (dbPath ? path.dirname(dbPath) : resolveHermesPath()); const defaultDbPath = dbPath ?? path.join(hermesDir, "state.db"); const profilePaths = {}; try { const profilesDir = path.join(hermesDir, "profiles"); const profiles = fssync.readdirSync(profilesDir, { withFileTypes: true }) .filter((entry) => entry.isDirectory()) .sort((a, b) => a.name.localeCompare(b.name)); for (const entry of profiles) { const dbPath = path.join(profilesDir, entry.name, "state.db"); if (fssync.existsSync(dbPath)) { profilePaths[entry.name] = dbPath; } } } catch (_e) { } return { default: fssync.existsSync(defaultDbPath) ? defaultDbPath : null, profiles: profilePaths, } } function sqliteStringLiteral(value) { return `'${String(value).replace(/'/g, "''")}'`; } // UNC paths (\\wsl$\Ubuntu\..., \\wsl.localhost\..., \\server\share\...) make // sqlite3.exe fail with "database is locked (5)" on Windows because the Plan 9 // / SMB bridge can't grant the locks SQLite asks for — even after `wsl // --shutdown`. Detect those paths so we can snapshot the DB locally first. function isUncPath(p) { return typeof p === "string" && (p.startsWith("\\\\") || p.startsWith("//")); } function snapshotSqliteDb(dbPath) { const tmpRoot = fssync.mkdtempSync( path.join(require("node:os").tmpdir(), "tokentracker-hermes-snap-"), ); const target = path.join(tmpRoot, path.basename(dbPath)); fssync.copyFileSync(dbPath, target); // Best-effort copy of SQLite sidecars; missing -wal/-shm/-journal is fine. for (const suffix of ["-wal", "-shm", "-journal"]) { const src = dbPath + suffix; try { if (fssync.existsSync(src)) fssync.copyFileSync(src, target + suffix); } catch (_e) { } } return { path: target, cleanup() { try { fssync.rmSync(tmpRoot, { recursive: true, force: true }); } catch (_e) { } }, }; } function readHermesSessions(dbPath, lastCompletedEpoch, unfinishedSessionIds = [], sqliteOptions = {}) { if (!dbPath || !fssync.existsSync(dbPath)) return []; const since = Number.isFinite(lastCompletedEpoch) && lastCompletedEpoch > 0 ? lastCompletedEpoch : 0; const forceIds = Array.isArray(unfinishedSessionIds) ? [...new Set(unfinishedSessionIds.filter((id) => typeof id === "string" && id.length > 0))] : []; const forceIncludeSql = forceIds.length > 0 ? ` OR id IN (${forceIds.map(sqliteStringLiteral).join(",")})` : ""; // Fetch sessions that started at/after the cursor, sessions that are still // in-progress (ended_at IS NULL), OR sessions that were previously observed // unfinished. Hermes updates token counts in real-time, including a final // delta when an active session later gets ended_at set. const sql = `SELECT id, model, started_at, ended_at, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens, reasoning_tokens, message_count FROM sessions WHERE (started_at >= ${since} OR ended_at IS NULL${forceIncludeSql}) AND (input_tokens > 0 OR output_tokens > 0 OR cache_read_tokens > 0 OR reasoning_tokens > 0) ORDER BY started_at ASC`; let snapshot = null; let effectiveDbPath = dbPath; if (isUncPath(dbPath)) { try { snapshot = snapshotSqliteDb(dbPath); effectiveDbPath = snapshot.path; } catch (_e) { // Snapshot failed — fall through to a direct read so we don't regress // the non-locked case (e.g. permissions, transient I/O). } } try { return readSqliteJsonRows(effectiveDbPath, sql, { label: "Hermes", maxBuffer: 10 * 1024 * 1024, timeout: 15_000, ...sqliteOptions, }); } finally { if (snapshot) snapshot.cleanup(); } } function hasLegacyHermesDefaultState(hermesState) { return ( typeof hermesState.lastStartedAt === "number" || typeof hermesState.lastCompletedStartedAt === "number" || (hermesState.snapshots && typeof hermesState.snapshots === "object") ); } async function parseHermesIncremental({ hermesPath, dbPath, cursors, queuePath, onProgress, sqliteOptions } = {}) { await ensureDir(path.dirname(queuePath)); const hermesState = cursors.hermes && typeof cursors.hermes === "object" ? cursors.hermes : {}; const dbPaths = resolveAllHermesDBPaths({ hermesPath, dbPath }); if (dbPaths.default === null && Object.keys(dbPaths.profiles).length === 0) { // No state in any profile return { recordsProcessed: 0, eventsAggregated: 0, bucketsQueued: 0 }; } const hourlyState = normalizeHourlyState(cursors?.hourly); const cb = typeof onProgress === "function" ? onProgress : null; const updatedAt = new Date().toISOString(); let recordsProcessed = 0; let eventsAggregated = 0; const touchedBuckets = new Set(); function ingestProfile(dbPath, dbState) { const trackedUnfinishedSessionIds = Array.isArray(dbState.unfinishedSessionIds) ? dbState.unfinishedSessionIds : []; const rows = readHermesSessions( dbPath, dbState.lastCompletedStartedAt, trackedUnfinishedSessionIds, sqliteOptions, ); recordsProcessed += rows.length; if (rows.length === 0) { dbState.updatedAt = updatedAt; return; } // Per-session snapshot from the previous sync: { [sessionId]: { in, out, cacheRead, cacheWrite, reasoning } } const prevSnapshots = (dbState.snapshots && typeof dbState.snapshots === "object") ? dbState.snapshots : {}; // Only advance past sessions that have fully ended. Active sessions // (ended_at IS NULL) must be re-read every sync because Hermes updates // their token counts in real-time after each turn. const lastCompletedStartedAt = typeof dbState.lastCompletedStartedAt === "number" ? dbState.lastCompletedStartedAt : 0; let maxCompletedStartedAt = lastCompletedStartedAt; let oldestUnfinishedStartedAt = Infinity; const nextUnfinishedSessionIds = new Set(); const nextSnapshots = {}; for (let i = 0; i < rows.length; i++) { const row = rows[i]; const inputTokens = toNonNegativeInt(row.input_tokens); const outputTokens = toNonNegativeInt(row.output_tokens); const cacheRead = toNonNegativeInt(row.cache_read_tokens); const cacheWrite = toNonNegativeInt(row.cache_write_tokens); const reasoning = toNonNegativeInt(row.reasoning_tokens); const messageCount = toNonNegativeInt(row.message_count); if (inputTokens === 0 && outputTokens === 0 && cacheRead === 0 && reasoning === 0) continue; // Save current snapshot for next sync nextSnapshots[row.id] = { in: inputTokens, out: outputTokens, cacheRead, cacheWrite, reasoning, message_count: messageCount }; const startedAt = Number(row.started_at); const endedAt = row.ended_at == null ? null : Number(row.ended_at); if (endedAt == null) { if (row.id && Number.isFinite(startedAt)) { nextUnfinishedSessionIds.add(row.id); oldestUnfinishedStartedAt = Math.min(oldestUnfinishedStartedAt, startedAt); } } else if (Number.isFinite(startedAt) && startedAt > maxCompletedStartedAt) { maxCompletedStartedAt = startedAt; } // Compute delta from previous snapshot (if any) so that we only count // new usage since the last sync. First time we see a session the // previous snapshot is absent, so the full amount is the delta. const prev = prevSnapshots[row.id]; let dInput = inputTokens; let dOutput = outputTokens; let dCacheRead = cacheRead; let dCacheWrite = cacheWrite; let dReasoning = reasoning; let dMessageCount = messageCount; if (prev) { dInput = Math.max(0, inputTokens - (prev.in || 0)); dOutput = Math.max(0, outputTokens - (prev.out || 0)); dCacheRead = Math.max(0, cacheRead - (prev.cacheRead || 0)); dCacheWrite = Math.max(0, cacheWrite - (prev.cacheWrite || 0)); dReasoning = Math.max(0, reasoning - (prev.reasoning || 0)); dMessageCount = Math.max(0, messageCount - (prev.message_count || 0)); } // Skip if delta is zero (session unchanged since last sync) if (dInput === 0 && dOutput === 0 && dCacheRead === 0 && dCacheWrite === 0 && dReasoning === 0) continue; // Prefer ended_at for bucket placement; fall back to started_at const epochSec = endedAt ?? startedAt; if (!epochSec || !Number.isFinite(epochSec)) continue; const tsIso = new Date(epochSec * 1000).toISOString(); const bucketStart = toUtcHalfHourStart(tsIso); if (!bucketStart) continue; const model = normalizeModelInput(row.model) || "hermes-agent"; const delta = { input_tokens: dInput, cached_input_tokens: dCacheRead, cache_creation_input_tokens: dCacheWrite, output_tokens: dOutput, reasoning_output_tokens: dReasoning, total_tokens: dInput + dOutput + dCacheRead + dCacheWrite + dReasoning, conversation_count: dMessageCount, }; const bucket = getHourlyBucket(hourlyState, "hermes", model, bucketStart); addTotals(bucket.totals, delta); touchedBuckets.add(bucketKey("hermes", model, bucketStart)); eventsAggregated++; if (cb) { cb({ index: i + 1, total: rows.length, recordsProcessed: i + 1, eventsAggregated, bucketsQueued: touchedBuckets.size, }); } } const nextLastCompletedStartedAt = Number.isFinite(oldestUnfinishedStartedAt) ? Math.min(maxCompletedStartedAt, oldestUnfinishedStartedAt) : maxCompletedStartedAt; Object.assign(dbState, { lastStartedAt: nextLastCompletedStartedAt, lastCompletedStartedAt: nextLastCompletedStartedAt, unfinishedSessionIds: Array.from(nextUnfinishedSessionIds), snapshots: nextSnapshots, updatedAt, }); } if (dbPaths.default) { ingestProfile(dbPaths.default, hermesState); } hermesState.profiles = hermesState.profiles && typeof hermesState.profiles === "object" ? hermesState.profiles : {}; for (const [profileName, dbPath] of Object.entries(dbPaths.profiles)) { const profileState = hermesState.profiles[profileName] && typeof hermesState.profiles[profileName] === "object" ? hermesState.profiles[profileName] : {}; hermesState.profiles[profileName] = profileState; ingestProfile(dbPath, profileState); } const bucketsQueued = await enqueueTouchedBuckets({ queuePath, hourlyState, touchedBuckets }); hourlyState.updatedAt = updatedAt; cursors.hourly = hourlyState; cursors.hermes = { ...hermesState, updatedAt, // Update the overall profile state timestamp even if the DB doesn't exist for the fast-path check }; return { recordsProcessed, eventsAggregated, bucketsQueued }; } // ───────────────────────────────────────────────────────────────────────────── // Kimi — passive JSONL reader (~/.kimi/sessions/**/wire.jsonl) // No hook installation needed; Kimi writes wire.jsonl automatically. function resolveKimiDefaultModel(env = process.env) { const fallback = "kimi-for-coding"; try { const home = env.HOME || require("node:os").homedir(); const cfgPath = path.join(env.KIMI_HOME || path.join(home, ".kimi"), "config.toml"); const raw = fssync.readFileSync(cfgPath, "utf8"); const defaultMatch = raw.match(/^\s*default_model\s*=\s*"([^"]+)"/m); if (!defaultMatch) return fallback; const sectionKey = defaultMatch[1]; const escaped = sectionKey.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); const sectionRe = new RegExp( `\\[models\\."${escaped}"\\]([\\s\\S]*?)(?:\\n\\[|$)`, ); const section = raw.match(sectionRe); if (section) { const modelMatch = section[1].match(/^\s*model\s*=\s*"([^"]+)"/m); if (modelMatch && modelMatch[1]) return modelMatch[1]; } if (sectionKey.includes("/")) return sectionKey.split("/").pop(); return sectionKey || fallback; } catch { return fallback; } } // ───────────────────────────────────────────────────────────────────────────── // ───────────────────────────────────────────────────────────────────────────── // Kiro CLI — reads historical conversation state from // ~/Library/Application Support/kiro-cli/data.sqlite3 (table conversations_v2). // Kiro CLI does NOT store explicit token counts locally. Each request row // carries: user_prompt_length (chars), response_size (chars), model_id, // request_start_timestamp_ms, message_id. We approximate tokens at 4 chars / // token. Source is merged with Kiro IDE (source='kiro') and canonicalized // model names are used so CLI and IDE rows collapse when they refer to the // same underlying Bedrock model. Cursor state is per-request-id so mutable // requests can be reprocessed (subtract-old/add-new on fingerprint change). // ───────────────────────────────────────────────────────────────────────────── const KIRO_CLI_CHARS_PER_TOKEN = 4; function resolveKiroCliDbPath(env = process.env) { if (env.KIRO_CLI_DB_PATH) return env.KIRO_CLI_DB_PATH; const home = env.HOME || require("node:os").homedir(); return path.join(home, "Library", "Application Support", "kiro-cli", "data.sqlite3"); } // Bug-4: canonical UUID shape — 8-4-4-4-12 hex groups. The looser // /^[0-9a-f-]{36}\.json$/ form accepted `36 hyphens`.json or 36 hex with // no hyphens. kiro-cli writes proper UUIDs; lock to the canonical shape. const KIRO_CLI_SESSION_FILE_RE = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\.json$/i; // Lists ~/.kiro/sessions/cli/{uuid}.json files. Includes files whose sibling // .lock is present — we read those as tail-only snapshots so a running // session's completed turns still land in the queue on the next sync. The // .json files are rewritten atomically by kiro-cli on each turn flush, so // a stale read just means we'll pick up the rest next time. // // TASK-014: env.HOME is honored (symmetric with resolveKiroCliDbPath) so // callers can redirect to a tmp home for hermetic tests/CI. function resolveKiroCliSessionFiles(env = process.env) { const home = env.HOME || require("node:os").homedir(); const kiroHome = env.KIRO_HOME || path.join(home, ".kiro"); const sessionsDir = path.join(kiroHome, "sessions", "cli"); if (!fssync.existsSync(sessionsDir)) return []; const files = []; try { for (const entry of fssync.readdirSync(sessionsDir)) { // TASK-003: only canonical {uuid}.json files; backups, scratch, // typos are skipped so they don't feed JSON.parse garbage. if (!KIRO_CLI_SESSION_FILE_RE.test(entry)) continue; files.push(path.join(sessionsDir, entry)); } } catch { // ignore read errors } return files; } // Build char-count maps from a .jsonl sibling file. Lets us approximate // per-turn tokens when the live session's input_token_count / // output_token_count fields are 0 (kiro-cli does not persist real token // counts; billing is credit-based). // // Returns: // byMessage: message_id -> assistant+toolUse char count // messageKind: message_id -> jsonl event kind // turnPromptChars: turn_index -> input chars attributed to that turn // // Input attribution: Kiro CLI's turn.message_ids only records // AssistantMessage / ToolResults ids, NEVER the user Prompt id. So the // Prompt event is invisible if you look it up by message_id. To recover // the per-turn user input, we walk the jsonl in timestamp order and buffer // Prompt chars until the next AssistantMessage that belongs to a turn // (turnMessageIds provides that mapping). The first such AssistantMessage // "claims" the buffered Prompt chars for its turn, and the buffer resets. // Later cycles within the same turn (Assistant → ToolResults → Assistant) // do not re-attribute. async function readKiroCliMessageChars(jsonlPath, turnMessageIds) { const result = { byMessage: new Map(), messageKind: new Map(), turnPromptChars: new Map(), }; if (!jsonlPath || !fssync.existsSync(jsonlPath)) return result; // TASK-005: stream via readline so multi-MB .jsonl files (heavy tool-use // sessions) don't block the sync event loop by buffering whole-file. let stream; try { stream = fssync.createReadStream(jsonlPath, { encoding: "utf8" }); } catch { return result; } const rl = readline.createInterface({ input: stream, crlfDelay: Infinity }); const midToTurn = turnMessageIds instanceof Map ? turnMessageIds : new Map(); const attributedTurns = new Set(); let pendingPromptChars = 0; // Bug-5: wrap the streamed iteration. Mid-read errors (file deleted or // truncated while kiro-cli is writing) would otherwise propagate up and // crash the whole sync pass. On error we return the partial result and // let the next sync re-read fresh. try { for await (const line of rl) { if (!line || !line.trim()) continue; let evt; try { evt = JSON.parse(line); } catch { continue; } const data = evt && evt.data; if (!data || typeof data !== "object") continue; const mid = data.message_id; if (!mid) continue; const content = Array.isArray(data.content) ? data.content : []; let chars = 0; for (const c of content) { if (!c || typeof c !== "object") continue; if (c.kind === "text" && typeof c.data === "string") { chars += c.data.length; } else if (c.kind === "toolUse" && c.data && typeof c.data === "object") { try { chars += JSON.stringify(c.data.input || {}).length; } catch { // ignore } } } result.byMessage.set(mid, (result.byMessage.get(mid) || 0) + chars); if (!result.messageKind.has(mid)) result.messageKind.set(mid, evt.kind); if (evt.kind === "Prompt") { pendingPromptChars += chars; } else if (evt.kind === "AssistantMessage" && midToTurn.has(mid)) { const turnIdx = midToTurn.get(mid); if (!attributedTurns.has(turnIdx)) { result.turnPromptChars.set(turnIdx, pendingPromptChars); attributedTurns.add(turnIdx); pendingPromptChars = 0; } } } } catch { // partial data — return what we have. } return result; } // Extract flat per-turn records from a live session .json + its .jsonl // sibling. Returns [{ request_id, model_id, request_start_timestamp_ms, // input_tokens, output_tokens }]. We use the same request_id dedup slot as // the SQLite path so mutations (turn rewritten on next flush) go through // the subtract-old/add-new path in parseKiroCliIncremental. async function readKiroCliSessionTurns(jsonPath) { if (!jsonPath || !fssync.existsSync(jsonPath)) return []; let parsed; try { parsed = JSON.parse(fssync.readFileSync(jsonPath, "utf8")); } catch { return []; } if (!parsed || typeof parsed !== "object") return []; const turns = Array.isArray( parsed?.session_state?.conversation_metadata?.user_turn_metadatas, ) ? parsed.session_state.conversation_metadata.user_turn_metadatas : []; if (turns.length === 0) return []; const modelInfo = parsed?.session_state?.rts_model_state?.model_info || null; const sessionModelId = (modelInfo && (modelInfo.model_id || modelInfo.model_name)) || null; const sessionId = typeof parsed.session_id === "string" ? parsed.session_id : path.basename(jsonPath, ".json"); // Build turn_index -> Set(message_id) so the jsonl walker can attribute // orphaned Prompt events (not referenced by turn.message_ids) to the // right turn. The turn.message_ids list only contains AssistantMessage // and ToolResults ids; Prompt ids appear in the jsonl stream only. const turnMessageIds = new Map(); for (let i = 0; i < turns.length; i++) { const t = turns[i]; if (!t || !Array.isArray(t.message_ids)) continue; for (const mid of t.message_ids) { if (typeof mid === "string" && mid) turnMessageIds.set(mid, i); } } // Load sibling .jsonl for char-count fallback. const jsonlPath = jsonPath.replace(/\.json$/, ".jsonl"); const charMap = await readKiroCliMessageChars(jsonlPath, turnMessageIds); const flat = []; for (let turnIdx = 0; turnIdx < turns.length; turnIdx++) { const turn = turns[turnIdx]; if (!turn || typeof turn !== "object") continue; // TASK-001: preserve the integer 0. `|| null` would coerce a valid // loop_id.rand=0 into a message_id fallback, splitting the dedup // namespace across runs that see 0 vs runs that don't. const loopRand = turn.loop_id && typeof turn.loop_id === "object" ? turn.loop_id.rand ?? turn.loop_id.seed ?? null : null; const messageIds = Array.isArray(turn.message_ids) ? turn.message_ids : []; const requestId = loopRand != null ? `${sessionId}:${loopRand}` : (messageIds[0] || null); if (!requestId) continue; // Prefer real token counts if kiro-cli populated them. let inputTokens = toNonNegativeInt(turn.input_token_count); let outputTokens = toNonNegativeInt(turn.output_token_count); if (inputTokens === 0 && outputTokens === 0) { // Fall back to char-count approximation. Input chars come from the // sequential Prompt attribution (see readKiroCliMessageChars); // output chars come from AssistantMessage+toolUse bodies referenced // by turn.message_ids. const promptChars = charMap.turnPromptChars.get(turnIdx) || 0; let assistantChars = 0; for (const mid of messageIds) { const chars = charMap.byMessage.get(mid) || 0; const kind = charMap.messageKind.get(mid); if (kind === "AssistantMessage") assistantChars += chars; } inputTokens = Math.floor(promptChars / KIRO_CLI_CHARS_PER_TOKEN); outputTokens = Math.floor(assistantChars / KIRO_CLI_CHARS_PER_TOKEN); } // TASK-006: timestamp precedence matches SQLite's // request_start_timestamp_ms so a turn that migrates SQLite ↔ // session-file buckets identically across tiers (previously a turn // straddling a half-hour boundary bucketed differently per source // because session files use end_timestamp while SQLite uses start). // 1. turn.request_start_timestamp_ms (numeric ms, SQLite shape) // 2. turn.start_timestamp (ISO string) // 3. turn.end_timestamp (ISO string, legacy fallback) let tsMs = NaN; if (Number.isFinite(Number(turn.request_start_timestamp_ms))) { tsMs = Number(turn.request_start_timestamp_ms); } else if (turn.start_timestamp) { tsMs = Date.parse(turn.start_timestamp); } else if (turn.end_timestamp) { tsMs = Date.parse(turn.end_timestamp); } if (!Number.isFinite(tsMs) || tsMs <= 0) continue; flat.push({ request_id: requestId, session_model_id: sessionModelId, message_id: messageIds[0] || null, // Turn-granular migration match: surface the full list so the // cross-source retraction in parseKiroCliIncremental can drop this // specific turn iff any of its assistant/tool_result message_ids // appears in SQLite. Session-level matching over-retracts newer // turns in an active session whose older turns have already // flushed to SQLite. all_message_ids: messageIds.slice(), model_id: turn.model_id || sessionModelId, request_start_timestamp_ms: tsMs, // D-1 / Bug-2: tag with session_id so the retraction pass can match // session-origin entries even when the requestId format has no // colon (no-loop_id fallback uses a bare message_id UUID that would // otherwise be indistinguishable from SQLite's UUID keys). session_id: sessionId, // For the parser, we feed the ALREADY-approximated tokens directly via // a special sentinel field. The parser will divide chars by // KIRO_CLI_CHARS_PER_TOKEN; bypass that by pre-multiplying here. user_prompt_length: inputTokens * KIRO_CLI_CHARS_PER_TOKEN, response_size: outputTokens * KIRO_CLI_CHARS_PER_TOKEN, }); } return flat; } // Canonicalize a Kiro-CLI-emitted model id so IDE and CLI rows collapse when // they refer to the same underlying Bedrock model. Examples: // anthropic.claude-sonnet-4-20250514-v1:0 -> claude-sonnet-4 // claude-opus-4.6 -> claude-opus-4.6 // claude-sonnet-4.5 -> claude-sonnet-4.5 // auto -> null (caller uses 'kiro-cli-agent') // -> null (caller falls back to 'kiro-cli-agent') // // "auto" is treated as unknown because Kiro CLI's auto-routing does not // expose the underlying Bedrock model id in the session file. Returning // null lets pricing fall into the kiro-cli-agent bucket (sonnet-4 rates) // rather than the literal "auto" string which matches Cursor's composer-1 // pricing by accident. function canonicalizeKiroCliModelId(raw) { if (!raw || typeof raw !== "string") return null; let name = raw.trim(); if (!name) return null; name = name.toLowerCase(); if (name === "auto") return null; // Strip provider prefix (anthropic., aws., openai., or a full Bedrock ARN). name = name.replace( /^(?:arn:aws:bedrock:[^:]*:[^:]*:(?:foundation-model\/)?|anthropic\.|openai\.|aws\.)/, "", ); // Strip Bedrock revision suffix `:N`. name = name.replace(/:\d+$/, ""); // Strip date + vN suffix (e.g. "-20250514-v1"), or lone "-vN", or lone date. name = name.replace(/-\d{8}-v\d+$/i, ""); name = name.replace(/-v\d+$/i, ""); name = name.replace(/-\d{8}$/, ""); // Strip trailing ".v1" or similar Anthropic-on-Bedrock tails if present. name = name.replace(/\.v\d+$/i, ""); return name || null; } // Read Kiro CLI requests using SQL-side json_extract so we don't pull the // full (93 MB-ish) conversations_v2 blob back through sqlite3 -json. // // D-1: also surfaces `user_turn_metadata.continuation_id` so the cross- // source retraction pass (parseKiroCliIncremental) can match whichever // UUID kiro-cli used as the session link. The SQL column // `conversation_id` and the inner JSON `continuation_id` are different // UUIDs on observed data; covering both means retraction fires whichever // side matches the live session's `session_id`. function readKiroCliRequests(dbPath, env = process.env, sqliteOptions = {}) { if (!dbPath || !fssync.existsSync(dbPath)) return []; const sql = "SELECT conversation_id, " + "json_extract(value, '$.model_info.model_id') AS session_model_id, " + "json_extract(value, '$.user_turn_metadata.continuation_id') AS continuation_id, " + "json_extract(value, '$.user_turn_metadata.requests') AS requests_json " + "FROM conversations_v2 " + "WHERE json_extract(value, '$.user_turn_metadata.requests') IS NOT NULL"; const rows = readSqliteJsonRows(dbPath, sql, { label: "Kiro CLI", env, maxBuffer: 128 * 1024 * 1024, timeout: 120_000, ...sqliteOptions, }); const flat = []; for (const row of rows) { let requests; try { requests = JSON.parse(row.requests_json || "[]"); } catch { continue; } if (!Array.isArray(requests)) continue; for (const r of requests) { if (!r || typeof r !== "object") continue; flat.push({ conversation_id: row.conversation_id, continuation_id: row.continuation_id || null, session_model_id: row.session_model_id || null, request_id: r.request_id || null, message_id: r.message_id || null, user_prompt_length: r.user_prompt_length, response_size: r.response_size, model_id: r.model_id || null, request_start_timestamp_ms: r.request_start_timestamp_ms, }); } } return flat; } async function parseKiroCliIncremental({ sessionFiles, cursors, queuePath, onProgress, env, sqliteOptions } = {}) { await ensureDir(path.dirname(queuePath)); const kiroCliState = cursors.kiroCli && typeof cursors.kiroCli === "object" ? cursors.kiroCli : {}; const seenIds = new Set(Array.isArray(kiroCliState.seenIds) ? kiroCliState.seenIds : []); // Back-compat branch: if caller explicitly passes sessionFiles (an array of // per-session .json paths, the old contract used in tests/fixtures), read // them as user_turn_metadatas. New default path below reads the SQLite DB. if (Array.isArray(sessionFiles)) { return parseKiroCliFromSessionFiles({ sessionFiles, cursors, queuePath, onProgress, env, kiroCliState, seenIds, }); } const resolvedEnv = env || process.env; const dbPath = resolveKiroCliDbPath(resolvedEnv); // Combine two sources under the same (source='kiro', cursors.kiroCli) // namespace: historical rows from the SQLite DB plus live session state // from ~/.kiro/sessions/cli/{uuid}.json (covers turns from a running // session that hasn't flushed to SQLite yet). Request ID shapes differ: // SQLite carries a persisted request_id UUID; session files synthesize // `${sessionId}:${loop_id.rand}`. When kiro-cli migrates a live session // into SQLite the same turn lands under a new request_id — the cross- // source retraction pass below (D-1 + TASK-007) matches session_id ↔ // SQLite conversation_id OR continuation_id to subtract the orphan // session-file cursor entry before the new SQLite row is processed. const flatDb = fssync.existsSync(dbPath) ? readKiroCliRequests(dbPath, resolvedEnv, sqliteOptions) : []; const sessionFilesList = resolveKiroCliSessionFiles(resolvedEnv); let flatSessions = []; for (const jsonPath of sessionFilesList) { const turns = await readKiroCliSessionTurns(jsonPath); for (const turn of turns) flatSessions.push(turn); } // Per-request state replaces the old seenIds set. Each entry captures // what we contributed for that request_id last time, so a later mutation // (same request_id, different fingerprint) can subtract-old/add-new // instead of being skipped forever. const requestState = kiroCliState.requests && typeof kiroCliState.requests === "object" ? { ...kiroCliState.requests } : {}; const hourlyState = normalizeHourlyState(cursors?.hourly); const touchedBuckets = new Set(); const debugEnabled = ["1", "true"].includes( String(resolvedEnv.TOKENTRACKER_DEBUG || "").toLowerCase(), ); // ── TASK-007 + D-1: cross-source retraction. When a conversation has // migrated from the session-file tier into SQLite, the cursor's // prior session-file entry (keyed `${sessionId}:${loopRand}` OR a // bare message_id UUID when loop_id is absent) never matches the // new SQLite request_id. Without retraction the old contribution // stays in the bucket absolute and the new SQLite row is added on // top — permanent double-count. D-6: typed non-empty check so a // corrupt NULL/empty conv_id can't poison the match set. // // Two match sets are built: // • migratedConvIds — session_id → any row in SQLite. Used to scope // cursor retraction (coarse but safe because // un-migrated turns still present in the session // file are re-added later in this same run). // • migratedMsgIds — r.message_id → exact turn in SQLite. Used to // filter flatSessions at TURN granularity. An // active session with older migrated turns + // newer session-file-only turns must keep the // newer turns; session-level filtering dropped // them and caused Kiro CLI under-count. const migratedConvIds = new Set(); const migratedMsgIds = new Set(); for (const row of flatDb) { if (!row) continue; if (typeof row.conversation_id === "string" && row.conversation_id) migratedConvIds.add(row.conversation_id); if (typeof row.continuation_id === "string" && row.continuation_id) migratedConvIds.add(row.continuation_id); if (typeof row.message_id === "string" && row.message_id) migratedMsgIds.add(row.message_id); } if (migratedConvIds.size > 0) { // Pre-collect to retract so mutation during iteration is safe. // Retraction stays session-level: for every cursor entry whose // session_id has any row in SQLite, subtract its prior contribution. // This is provably correct because turns still live in the session // file get re-added in this same run via the (turn-granular) filter // below, producing a net delta of zero for un-migrated turns. const toRetract = []; for (const [reqId, prev] of Object.entries(requestState)) { if (!prev || typeof prev !== "object") continue; // Bug-2: prefer the stored session_id tag (new schema); fall back // to colon-split for legacy cursors pre-dating this change. let sid = null; if (typeof prev.session_id === "string" && prev.session_id) { sid = prev.session_id; } else { const colon = reqId.indexOf(":"); if (colon > 0) sid = reqId.slice(0, colon); } if (!sid || !migratedConvIds.has(sid)) continue; toRetract.push([reqId, prev, sid]); } for (const [reqId, prev, sid] of toRetract) { if (prev.input_tokens || prev.output_tokens) { const prevBucket = getHourlyBucket( hourlyState, "kiro", prev.model, prev.bucketStart, ); addTotals(prevBucket.totals, { input_tokens: -prev.input_tokens, cached_input_tokens: 0, cache_creation_input_tokens: 0, output_tokens: -prev.output_tokens, reasoning_output_tokens: 0, total_tokens: -(prev.input_tokens + prev.output_tokens), conversation_count: -1, }); touchedBuckets.add(bucketKey("kiro", prev.model, prev.bucketStart)); } delete requestState[reqId]; if (debugEnabled) { process.stderr.write( `[kiro-cli] retracted migrated session entry (conv ${sid})\n`, ); } } // Turn-granular filter: drop a session-file turn only when at least // one of its assistant/tool_result message_ids is present in SQLite // (i.e. this specific turn has been flushed). Newer turns in the // same session that haven't yet landed in SQLite survive. // // Edge: a turn with no message_ids at all cannot be matched. We keep // it — preferring a rare potential double-count (narrow, since such // a turn would also have no request_id under the no-loop_id path and // be discarded upstream) over the reported regression of dropping // legitimate newer turns wholesale. D-14: still O(N) single-pass. const before = flatSessions.length; flatSessions = flatSessions.filter((s) => { if (!s) return false; const mids = Array.isArray(s.all_message_ids) ? s.all_message_ids : s.message_id ? [s.message_id] : []; for (const mid of mids) { if (typeof mid === "string" && mid && migratedMsgIds.has(mid)) { return false; } } return true; }); if (debugEnabled && flatSessions.length !== before) { process.stderr.write( `[kiro-cli] dropped ${before - flatSessions.length} migrated session-file turn(s)\n`, ); } } const flat = flatDb.concat(flatSessions); if (flat.length === 0) { // Bug-1: retraction may have touched buckets even with empty flat. // Clamp + cap BEFORE flushing so the early-return path applies the // same guarantees as the main path (fixes a skip that flushed // negative conversation_counts and left the cap unapplied). const cappedEarly = clampAndCapKiroCliState({ requestState, hourlyState, touchedBuckets, }); const bucketsQueued = await enqueueTouchedBuckets({ queuePath, hourlyState, touchedBuckets, }); const updatedAt = new Date().toISOString(); hourlyState.updatedAt = updatedAt; cursors.hourly = hourlyState; cursors.kiroCli = { ...kiroCliState, requests: cappedEarly, updatedAt }; return { recordsProcessed: 0, eventsAggregated: 0, bucketsQueued }; } const cb = typeof onProgress === "function" ? onProgress : null; let recordsProcessed = 0; let eventsAggregated = 0; for (let i = 0; i < flat.length; i++) { const r = flat[i]; recordsProcessed++; const requestId = r.request_id || r.message_id; if (!requestId) continue; const promptChars = toNonNegativeInt(r.user_prompt_length); const responseChars = toNonNegativeInt(r.response_size); const approxInput = Math.floor(promptChars / KIRO_CLI_CHARS_PER_TOKEN); const approxOutput = Math.floor(responseChars / KIRO_CLI_CHARS_PER_TOKEN); const tsMs = Number(r.request_start_timestamp_ms); if (!Number.isFinite(tsMs) || tsMs <= 0) continue; const bucketStart = toUtcHalfHourStart(new Date(tsMs).toISOString()); if (!bucketStart) continue; const rawModel = r.model_id || r.session_model_id; const canonical = canonicalizeKiroCliModelId(rawModel); const model = canonical || "kiro-cli-agent"; // Fingerprint captures every field whose change should cause a re-bucket. const fingerprint = `${promptChars}:${responseChars}:${model}:${tsMs}`; const prev = requestState[requestId]; if (prev && prev.fingerprint === fingerprint) continue; // unchanged // Subtract the prior contribution (if any) from its prior bucket so the // bucket's absolute totals reflect the CURRENT truth, not the historical // truth. enqueueTouchedBuckets will emit the net delta at flush time. if (prev && (prev.input_tokens || prev.output_tokens)) { const prevBucket = getHourlyBucket(hourlyState, "kiro", prev.model, prev.bucketStart); addTotals(prevBucket.totals, { input_tokens: -prev.input_tokens, cached_input_tokens: 0, cache_creation_input_tokens: 0, output_tokens: -prev.output_tokens, reasoning_output_tokens: 0, total_tokens: -(prev.input_tokens + prev.output_tokens), conversation_count: -1, }); touchedBuckets.add(bucketKey("kiro", prev.model, prev.bucketStart)); } // Add the new contribution. if (approxInput > 0 || approxOutput > 0) { const bucket = getHourlyBucket(hourlyState, "kiro", model, bucketStart); addTotals(bucket.totals, { input_tokens: approxInput, cached_input_tokens: 0, cache_creation_input_tokens: 0, output_tokens: approxOutput, reasoning_output_tokens: 0, total_tokens: approxInput + approxOutput, conversation_count: 1, }); touchedBuckets.add(bucketKey("kiro", model, bucketStart)); eventsAggregated++; } // Always record the cursor entry (even for zero-token requests) so we // don't re-count later if Kiro rewrites this request with real data. // Bug-2: tag session-origin entries with session_id so the retraction // pass can identify them regardless of request_id format (the // no-loop_id fallback produces a bare UUID with no colon, which would // otherwise be indistinguishable from SQLite's UUID keys). requestState[requestId] = { fingerprint, bucketStart, model, input_tokens: approxInput, output_tokens: approxOutput, ...(r.session_id ? { session_id: r.session_id } : {}), }; if (cb && i % 50 === 0) { cb({ index: i + 1, total: flat.length, recordsProcessed, eventsAggregated, bucketsQueued: touchedBuckets.size, }); } } const cappedState = clampAndCapKiroCliState({ requestState, hourlyState, touchedBuckets, }); const bucketsQueued = await enqueueTouchedBuckets({ queuePath, hourlyState, touchedBuckets }); const updatedAt = new Date().toISOString(); hourlyState.updatedAt = updatedAt; cursors.hourly = hourlyState; cursors.kiroCli = { ...kiroCliState, requests: cappedState, updatedAt }; return { recordsProcessed, eventsAggregated, bucketsQueued }; } // TASK-004 + TASK-010 + Bug-1: shared end-of-run clamp + cap for // parseKiroCliIncremental. Centralized so the main path AND the // retraction-only early-return path both apply the same guarantees. // Mutates hourlyState bucket totals in place (clamp) and returns a new // capped requestState object (cap). const KIRO_CLI_CURSOR_MAX_AGE_MS = 90 * 24 * 60 * 60 * 1000; const KIRO_CLI_CURSOR_MAX_ENTRIES = 20_000; function clampAndCapKiroCliState({ requestState, hourlyState, touchedBuckets }) { // TASK-010: clamp conversation_count to >= 0 on Kiro-touched buckets // only. The shared enqueueTouchedBuckets is left untouched so // legitimate negatives from the 10 other parsers are not masked. Kiro // negatives come from the subtract-old pass on mutation or retraction. for (const key of touchedBuckets) { const bucket = hourlyState.buckets && hourlyState.buckets[key]; if (bucket && bucket.totals && bucket.totals.conversation_count < 0) { bucket.totals.conversation_count = 0; } } // TASK-004: cap cursors.kiroCli.requests by age + count. Runs LAST so // nothing active or just-retracted is pruned mid-flight. const ageCutoffMs = Date.now() - KIRO_CLI_CURSOR_MAX_AGE_MS; const cappedEntries = []; for (const [reqId, entry] of Object.entries(requestState)) { if (!entry || typeof entry !== "object") continue; const ts = entry.bucketStart ? Date.parse(entry.bucketStart) : NaN; if (!Number.isFinite(ts) || ts < ageCutoffMs) continue; cappedEntries.push([reqId, entry, ts]); } if (cappedEntries.length > KIRO_CLI_CURSOR_MAX_ENTRIES) { cappedEntries.sort((a, b) => b[2] - a[2]); // newest first cappedEntries.length = KIRO_CLI_CURSOR_MAX_ENTRIES; } const capped = {}; for (const [reqId, entry] of cappedEntries) capped[reqId] = entry; return capped; } // Back-compat path: per-session .json files (the old fixture shape). Emits // exact tokens if the fixture happens to carry them (which the test fixture // does). Used only by the test/rollout-parser.test.js fixture tests. async function parseKiroCliFromSessionFiles({ sessionFiles, cursors, queuePath, onProgress, env, kiroCliState, seenIds, }) { const fileOffsets = kiroCliState.fileOffsets && typeof kiroCliState.fileOffsets === "object" ? { ...kiroCliState.fileOffsets } : {}; if (sessionFiles.length === 0) { cursors.kiroCli = { ...kiroCliState, seenIds: Array.from(seenIds), fileOffsets, updatedAt: new Date().toISOString(), }; return { recordsProcessed: 0, eventsAggregated: 0, bucketsQueued: 0 }; } const hourlyState = normalizeHourlyState(cursors?.hourly); const touchedBuckets = new Set(); const cb = typeof onProgress === "function" ? onProgress : null; let recordsProcessed = 0; let eventsAggregated = 0; for (let fileIdx = 0; fileIdx < sessionFiles.length; fileIdx++) { const filePath = sessionFiles[fileIdx]; let stat; try { stat = fssync.statSync(filePath); } catch { continue; } const prevEntry = fileOffsets[filePath] || {}; const prevMtime = Number(prevEntry.mtimeMs) || 0; const prevLastIndex = Number.isFinite(Number(prevEntry.lastIndex)) ? Number(prevEntry.lastIndex) : -1; if (prevMtime && stat.mtimeMs <= prevMtime) continue; let parsed; try { parsed = JSON.parse(fssync.readFileSync(filePath, "utf8")); } catch { continue; } if (!parsed || typeof parsed !== "object") continue; const turns = Array.isArray( parsed?.session_state?.conversation_metadata?.user_turn_metadatas, ) ? parsed.session_state.conversation_metadata.user_turn_metadatas : []; const sessionId = typeof parsed.session_id === "string" ? parsed.session_id : filePath; const sessionModelId = (parsed?.session_state?.rts_model_state?.model_info && (parsed.session_state.rts_model_state.model_info.model_id || parsed.session_state.rts_model_state.model_info.modelId)) || null; let maxIndex = prevLastIndex; for (let i = 0; i < turns.length; i++) { if (i <= prevLastIndex) continue; const turn = turns[i]; if (!turn || typeof turn !== "object") continue; recordsProcessed++; const input = toNonNegativeInt(turn.input_tokens); const output = toNonNegativeInt(turn.output_tokens); const cacheRead = toNonNegativeInt( turn.cache_read_input_tokens ?? turn.cached_input_tokens, ); const cacheCreation = toNonNegativeInt( turn.cache_creation_input_tokens ?? turn.cache_write_input_tokens, ); const reasoning = toNonNegativeInt(turn.reasoning_output_tokens); if (input === 0 && output === 0 && cacheRead === 0 && cacheCreation === 0) { maxIndex = i; continue; } const ts = turn.timestamp || turn.created_at || turn.updated_at; if (!ts) continue; const bucketStart = toUtcHalfHourStart(ts); if (!bucketStart) continue; const turnMessageId = typeof turn.message_id === "string" && turn.message_id ? turn.message_id : null; const dedupKey = turnMessageId ? `${sessionId}:${turnMessageId}` : null; if (dedupKey && seenIds.has(dedupKey)) { maxIndex = i; continue; } const rawModel = turn.model_id || turn.modelId || (turn.model_info && (turn.model_info.model_id || turn.model_info.modelId)) || sessionModelId; const normalized = rawModel ? normalizeKiroModelName(rawModel) : null; const model = normalized || "kiro-cli-agent"; const delta = { input_tokens: input, cached_input_tokens: cacheRead, cache_creation_input_tokens: cacheCreation, output_tokens: output, reasoning_output_tokens: reasoning, total_tokens: input + output + cacheRead + cacheCreation + reasoning, conversation_count: 1, }; const bucket = getHourlyBucket(hourlyState, "kiro", model, bucketStart); addTotals(bucket.totals, delta); touchedBuckets.add(bucketKey("kiro", model, bucketStart)); if (dedupKey) seenIds.add(dedupKey); maxIndex = i; eventsAggregated++; if (cb) { cb({ index: fileIdx + 1, total: sessionFiles.length, recordsProcessed, eventsAggregated, bucketsQueued: touchedBuckets.size, }); } } fileOffsets[filePath] = { mtimeMs: stat.mtimeMs, size: stat.size, lastIndex: maxIndex, }; } const seenArr = Array.from(seenIds); const cappedSeen = seenArr.length > 10_000 ? seenArr.slice(seenArr.length - 10_000) : seenArr; const bucketsQueued = await enqueueTouchedBuckets({ queuePath, hourlyState, touchedBuckets }); const updatedAt = new Date().toISOString(); hourlyState.updatedAt = updatedAt; cursors.hourly = hourlyState; cursors.kiroCli = { ...kiroCliState, seenIds: cappedSeen, fileOffsets, updatedAt }; return { recordsProcessed, eventsAggregated, bucketsQueued }; } // ───────────────────────────────────────────────────────────────────────────── function resolveKimiWireFiles(env = process.env) { const home = require("node:os").homedir(); const kimiHome = env.KIMI_HOME || path.join(home, ".kimi"); const sessionsDir = path.join(kimiHome, "sessions"); if (!fssync.existsSync(sessionsDir)) return []; const files = []; try { for (const ws of fssync.readdirSync(sessionsDir)) { const wsDir = path.join(sessionsDir, ws); let wsStat; try { wsStat = fssync.statSync(wsDir); } catch { continue; } if (!wsStat.isDirectory()) continue; for (const sess of fssync.readdirSync(wsDir)) { const wireFile = path.join(wsDir, sess, "wire.jsonl"); if (fssync.existsSync(wireFile)) files.push(wireFile); } } } catch { /* ignore */ } return files; } async function parseKimiIncremental({ wireFiles, cursors, queuePath, onProgress, env, model } = {}) { await ensureDir(path.dirname(queuePath)); const kimiState = cursors.kimi && typeof cursors.kimi === "object" ? cursors.kimi : {}; const seenIds = new Set(Array.isArray(kimiState.seenIds) ? kimiState.seenIds : []); const fileOffsets = kimiState.fileOffsets && typeof kimiState.fileOffsets === "object" ? { ...kimiState.fileOffsets } : {}; const files = Array.isArray(wireFiles) ? wireFiles : resolveKimiWireFiles(env || process.env); const kimiModel = model || resolveKimiDefaultModel(env || process.env); if (files.length === 0) { cursors.kimi = { ...kimiState, seenIds: Array.from(seenIds), fileOffsets, updatedAt: new Date().toISOString() }; return { recordsProcessed: 0, eventsAggregated: 0, bucketsQueued: 0 }; } const hourlyState = normalizeHourlyState(cursors?.hourly); const touchedBuckets = new Set(); const cb = typeof onProgress === "function" ? onProgress : null; let recordsProcessed = 0; let eventsAggregated = 0; for (let fileIdx = 0; fileIdx < files.length; fileIdx++) { const filePath = files[fileIdx]; let stat; try { stat = fssync.statSync(filePath); } catch { continue; } const prevEntry = fileOffsets[filePath] || {}; const prevSize = Number(prevEntry.size) || 0; const prevIno = prevEntry.ino; const inodeChanged = typeof prevIno === "number" && prevIno !== stat.ino; const startOffset = stat.size < prevSize || inodeChanged ? 0 : prevSize; if (stat.size <= startOffset) continue; let stream; try { stream = fssync.createReadStream(filePath, { encoding: "utf8", start: startOffset }); } catch { continue; } const rl = readline.createInterface({ input: stream, crlfDelay: Infinity }); for await (const line of rl) { if (!line || !line.trim()) continue; let entry; try { entry = JSON.parse(line); } catch { continue; } const msg = entry.message; if (!msg || msg.type !== "StatusUpdate") continue; const payload = msg.payload; if (!payload) continue; const { token_usage, message_id } = payload; if (!token_usage || !message_id) continue; if (seenIds.has(message_id)) continue; recordsProcessed++; const input = toNonNegativeInt(token_usage.input_other); const output = toNonNegativeInt(token_usage.output); const cacheRead = toNonNegativeInt(token_usage.input_cache_read); const cacheCreation = toNonNegativeInt(token_usage.input_cache_creation); if (input === 0 && output === 0 && cacheRead === 0 && cacheCreation === 0) { seenIds.add(message_id); continue; } const epochSec = entry.timestamp ?? payload.timestamp; if (epochSec == null || !Number.isFinite(Number(epochSec))) continue; const tsIso = new Date(Number(epochSec) * 1000).toISOString(); const bucketStart = toUtcHalfHourStart(tsIso); if (!bucketStart) continue; const delta = { input_tokens: input, cached_input_tokens: cacheRead, cache_creation_input_tokens: cacheCreation, output_tokens: output, reasoning_output_tokens: 0, total_tokens: input + output + cacheRead + cacheCreation, conversation_count: 1, }; const bucket = getHourlyBucket(hourlyState, "kimi", kimiModel, bucketStart); addTotals(bucket.totals, delta); touchedBuckets.add(bucketKey("kimi", kimiModel, bucketStart)); seenIds.add(message_id); eventsAggregated++; if (cb) { cb({ index: fileIdx + 1, total: files.length, recordsProcessed, eventsAggregated, bucketsQueued: touchedBuckets.size, }); } } let postStat = stat; try { postStat = fssync.statSync(filePath); } catch {} fileOffsets[filePath] = { size: postStat.size, mtimeMs: postStat.mtimeMs, ino: postStat.ino }; } // Cap seenIds to last 10k to bound cursor state size const seenArr = Array.from(seenIds); const cappedSeen = seenArr.length > 10_000 ? seenArr.slice(seenArr.length - 10_000) : seenArr; const bucketsQueued = await enqueueTouchedBuckets({ queuePath, hourlyState, touchedBuckets }); const updatedAt = new Date().toISOString(); hourlyState.updatedAt = updatedAt; cursors.hourly = hourlyState; cursors.kimi = { ...kimiState, seenIds: cappedSeen, fileOffsets, updatedAt }; return { recordsProcessed, eventsAggregated, bucketsQueued }; } // ───────────────────────────────────────────────────────────────────────────── // Kimi Code (official @moonshot-ai/kimi-code) — passive JSONL reader. // // Distinct from the legacy community `kimi-cli` above (Python, ~/.kimi). The // official single-binary product stores under ~/.kimi-code/ with a different // session layout and wire protocol: // // ~/.kimi-code/sessions///agents//wire.jsonl // // proto 1.x events are namespaced and carry `type` at the top level. Per-step // token usage rides on a `step.end` loop event (wrapped in // `context.append_loop_event`) with an Anthropic-style usage object: // // {"type":"context.append_loop_event", // "event":{"type":"step.end","uuid":"","turnId":"..","step":N, // "usage":{"input_tokens":N,"output_tokens":N, // "cache_read_input_tokens":N,"cache_creation_input_tokens":N}}, // "time":} // // Model comes from the per-session `config.update` event's `modelAlias` // (e.g. "kimi-code/kimi-k2.6" -> "kimi-k2.6"). Emitted under source "kimi" so // new + legacy sessions aggregate together. Independent cursor (cursors.kimiCode) // keeps state from colliding with the legacy reader's cursors.kimi. function resolveKimiCodeHome(env = process.env) { const home = require("node:os").homedir(); const explicit = typeof env?.KIMI_CODE_HOME === "string" ? env.KIMI_CODE_HOME.trim() : ""; return explicit ? path.resolve(explicit) : path.join(home, ".kimi-code"); } function resolveKimiCodeWireFiles(env = process.env) { const sessionsDir = path.join(resolveKimiCodeHome(env), "sessions"); if (!fssync.existsSync(sessionsDir)) return []; const files = []; const walk = (dir, depth) => { if (depth > 5) return; let entries; try { entries = fssync.readdirSync(dir, { withFileTypes: true }); } catch { return; } for (const ent of entries) { const full = path.join(dir, ent.name); if (ent.isDirectory()) walk(full, depth + 1); else if (ent.name === "wire.jsonl") files.push(full); } }; walk(sessionsDir, 0); return files; } function resolveKimiCodeDefaultModel(env = process.env) { const fallback = "kimi-for-coding"; try { const cfgPath = path.join(resolveKimiCodeHome(env), "config.toml"); const raw = fssync.readFileSync(cfgPath, "utf8"); const m = raw.match(/^\s*default_model\s*=\s*"([^"]+)"/m); if (!m) return fallback; return m[1].includes("/") ? m[1].split("/").pop() : m[1] || fallback; } catch { return fallback; } } function kimiCodeModelAlias(value) { if (typeof value !== "string" || !value) return null; return value.includes("/") ? value.split("/").pop() : value; } async function parseKimiCodeIncremental({ wireFiles, cursors, queuePath, onProgress, env, model } = {}) { await ensureDir(path.dirname(queuePath)); const state = cursors.kimiCode && typeof cursors.kimiCode === "object" ? cursors.kimiCode : {}; const seenIds = new Set(Array.isArray(state.seenIds) ? state.seenIds : []); const fileOffsets = state.fileOffsets && typeof state.fileOffsets === "object" ? { ...state.fileOffsets } : {}; const files = Array.isArray(wireFiles) ? wireFiles : resolveKimiCodeWireFiles(env || process.env); const fallbackModel = model || resolveKimiCodeDefaultModel(env || process.env); if (files.length === 0) { cursors.kimiCode = { ...state, seenIds: Array.from(seenIds), fileOffsets, updatedAt: new Date().toISOString() }; return { recordsProcessed: 0, eventsAggregated: 0, bucketsQueued: 0 }; } const hourlyState = normalizeHourlyState(cursors?.hourly); const touchedBuckets = new Set(); const cb = typeof onProgress === "function" ? onProgress : null; let recordsProcessed = 0; let eventsAggregated = 0; for (let fileIdx = 0; fileIdx < files.length; fileIdx++) { const filePath = files[fileIdx]; let stat; try { stat = fssync.statSync(filePath); } catch { continue; } const prevEntry = fileOffsets[filePath] || {}; const prevSize = Number(prevEntry.size) || 0; const prevIno = prevEntry.ino; const inodeChanged = typeof prevIno === "number" && prevIno !== stat.ino; const startOffset = stat.size < prevSize || inodeChanged ? 0 : prevSize; // Model is declared in a `config.update` near the file head; persist it on // the cursor so incremental resumes (which start past that line) keep it. let fileModel = (typeof prevEntry.model === "string" && prevEntry.model) || fallbackModel; if (stat.size <= startOffset) continue; let stream; try { stream = fssync.createReadStream(filePath, { encoding: "utf8", start: startOffset }); } catch { continue; } const rl = readline.createInterface({ input: stream, crlfDelay: Infinity }); for await (const line of rl) { if (!line || !line.trim()) continue; let entry; try { entry = JSON.parse(line); } catch { continue; } if (entry.type === "config.update") { const alias = kimiCodeModelAlias(entry.modelAlias); if (alias) fileModel = alias; continue; } const evt = entry.type === "context.append_loop_event" && entry.event && typeof entry.event === "object" ? entry.event : entry; if (!evt || evt.type !== "step.end") continue; const usage = evt.usage; if (!usage || typeof usage !== "object") continue; const id = evt.uuid; if (!id || seenIds.has(id)) continue; recordsProcessed++; // Anthropic-style usage: input_tokens already excludes cache. OpenAI-compat // models instead fold cached reads into input_tokens and expose them via // input_tokens_details.cached_tokens — mirror kimi-code's own extractUsage // so we never double-count cache as fresh input. const cacheCreation = toNonNegativeInt(usage.cache_creation_input_tokens); let cacheRead; let input; if (usage.cache_read_input_tokens != null) { cacheRead = toNonNegativeInt(usage.cache_read_input_tokens); input = toNonNegativeInt(usage.input_tokens); } else { const details = usage.input_tokens_details && typeof usage.input_tokens_details === "object" ? usage.input_tokens_details : null; const cached = toNonNegativeInt(details ? details.cached_tokens : 0); cacheRead = cached; input = Math.max(0, toNonNegativeInt(usage.input_tokens) - cached); } const output = toNonNegativeInt(usage.output_tokens); if (input === 0 && output === 0 && cacheRead === 0 && cacheCreation === 0) { seenIds.add(id); continue; } const ms = entry.time ?? evt.time; if (ms == null || !Number.isFinite(Number(ms))) continue; const tsIso = new Date(Number(ms)).toISOString(); const bucketStart = toUtcHalfHourStart(tsIso); if (!bucketStart) continue; const delta = { input_tokens: input, cached_input_tokens: cacheRead, cache_creation_input_tokens: cacheCreation, output_tokens: output, reasoning_output_tokens: 0, total_tokens: input + output + cacheRead + cacheCreation, conversation_count: 1, }; const bucket = getHourlyBucket(hourlyState, "kimi", fileModel, bucketStart); addTotals(bucket.totals, delta); touchedBuckets.add(bucketKey("kimi", fileModel, bucketStart)); seenIds.add(id); eventsAggregated++; if (cb) { cb({ index: fileIdx + 1, total: files.length, recordsProcessed, eventsAggregated, bucketsQueued: touchedBuckets.size, }); } } let postStat = stat; try { postStat = fssync.statSync(filePath); } catch {} fileOffsets[filePath] = { size: postStat.size, mtimeMs: postStat.mtimeMs, ino: postStat.ino, model: fileModel }; } const seenArr = Array.from(seenIds); const cappedSeen = seenArr.length > 10_000 ? seenArr.slice(seenArr.length - 10_000) : seenArr; const bucketsQueued = await enqueueTouchedBuckets({ queuePath, hourlyState, touchedBuckets }); const updatedAt = new Date().toISOString(); hourlyState.updatedAt = updatedAt; cursors.hourly = hourlyState; cursors.kimiCode = { ...state, seenIds: cappedSeen, fileOffsets, updatedAt }; return { recordsProcessed, eventsAggregated, bucketsQueued }; } // ───────────────────────────────────────────────────────────────────────────── // CodeBuddy CLI — passive JSONL reader (~/.codebuddy/projects//.jsonl) // // Tencent's CodeBuddy CLI is structurally cloned from Claude Code: // ~/.codebuddy/projects//.jsonl — conversation log // ~/.codebuddy/sessions/.json — session metadata // ~/.codebuddy/settings.json — `{"model": "..."}` // // CodeBuddy ships NO hook system — we incrementally tail the JSONL files on // each sync (passive scan only, same shape as Kimi's wire.jsonl reader). // // Per-line record types: message, reasoning, topic, file-history-snapshot. // Only `type=="message" && role=="assistant"` carry token usage. The shape: // // providerData.rawUsage = { // prompt_tokens: 22223, // OpenAI-style — INCLUDES cached // completion_tokens: 250, // prompt_tokens_details: { cached_tokens: 512, reasoning_tokens?: number }, // cache_read_input_tokens: 0, // Anthropic-style mirror (often 0) // cache_creation_input_tokens: 0, // } // // Token math (matches the repo's queue convention; do NOT pass prompt_tokens // through unchanged — that double-counts cached input): // input_tokens = prompt_tokens - prompt_tokens_details.cached_tokens // cached_input_tokens = prompt_tokens_details.cached_tokens // cache_creation_input_tokens = cache_creation_input_tokens (often 0) // output_tokens = completion_tokens // reasoning_output_tokens = prompt_tokens_details.reasoning_tokens || 0 // total_tokens = sum of the above // ───────────────────────────────────────────────────────────────────────────── function resolveCodebuddyHome(env = process.env) { const home = env.HOME || require("node:os").homedir(); return env.CODEBUDDY_HOME || path.join(home, ".codebuddy"); } function resolveCodebuddyDefaultModel(env = process.env) { const fallback = "codebuddy-unknown"; try { const settingsPath = path.join(resolveCodebuddyHome(env), "settings.json"); const raw = fssync.readFileSync(settingsPath, "utf8"); const parsed = JSON.parse(raw); if (parsed && typeof parsed === "object" && typeof parsed.model === "string" && parsed.model.trim()) { return parsed.model.trim(); } } catch (_e) { // settings missing or malformed — fall through } return fallback; } function resolveCodebuddyProjectFiles(env = process.env) { const projectsDir = path.join(resolveCodebuddyHome(env), "projects"); if (!fssync.existsSync(projectsDir)) return []; const files = []; try { for (const cwd of fssync.readdirSync(projectsDir)) { const cwdDir = path.join(projectsDir, cwd); let stat; try { stat = fssync.statSync(cwdDir); } catch { continue; } if (!stat.isDirectory()) continue; let entries; try { entries = fssync.readdirSync(cwdDir); } catch { continue; } for (const entry of entries) { if (!entry.endsWith(".jsonl")) continue; files.push(path.join(cwdDir, entry)); } } } catch { // ignore — return what we have } files.sort((a, b) => a.localeCompare(b)); return files; } async function parseCodebuddyIncremental({ projectFiles, cursors, queuePath, onProgress, env, defaultModel, } = {}) { await ensureDir(path.dirname(queuePath)); const codebuddyState = cursors.codebuddy && typeof cursors.codebuddy === "object" ? cursors.codebuddy : {}; const seenIds = new Set( Array.isArray(codebuddyState.seenIds) ? codebuddyState.seenIds : [], ); const fileOffsets = codebuddyState.fileOffsets && typeof codebuddyState.fileOffsets === "object" ? { ...codebuddyState.fileOffsets } : {}; const files = Array.isArray(projectFiles) ? projectFiles : resolveCodebuddyProjectFiles(env || process.env); const fallbackModel = defaultModel || resolveCodebuddyDefaultModel(env || process.env); if (files.length === 0) { cursors.codebuddy = { ...codebuddyState, seenIds: Array.from(seenIds), fileOffsets, updatedAt: new Date().toISOString(), }; return { recordsProcessed: 0, eventsAggregated: 0, bucketsQueued: 0 }; } const hourlyState = normalizeHourlyState(cursors?.hourly); const touchedBuckets = new Set(); const cb = typeof onProgress === "function" ? onProgress : null; let recordsProcessed = 0; let eventsAggregated = 0; for (let fileIdx = 0; fileIdx < files.length; fileIdx++) { const filePath = files[fileIdx]; let stat; try { stat = fssync.statSync(filePath); } catch { continue; } const prevEntry = fileOffsets[filePath] || {}; const prevSize = Number(prevEntry.size) || 0; const prevIno = prevEntry.ino; // Re-read from start if file shrunk (truncate/rewrite) or inode changed // (file deleted + recreated). Otherwise pick up after the last read offset. const inodeChanged = typeof prevIno === "number" && prevIno !== stat.ino; const startOffset = stat.size < prevSize || inodeChanged ? 0 : prevSize; if (stat.size <= startOffset) continue; let stream; try { stream = fssync.createReadStream(filePath, { encoding: "utf8", start: startOffset, }); } catch { continue; } const rl = readline.createInterface({ input: stream, crlfDelay: Infinity }); for await (const line of rl) { if (!line || !line.trim()) continue; let entry; try { entry = JSON.parse(line); } catch { continue; } // Only assistant message events carry token usage. if (!entry || entry.type !== "message" || entry.role !== "assistant") continue; const provider = entry.providerData; const rawUsage = provider && typeof provider === "object" ? provider.rawUsage : null; if (!rawUsage || typeof rawUsage !== "object") continue; // Dedup per-message — message id (uuid) is most stable, then session + // timestamp as fallback. const sessionId = typeof entry.sessionId === "string" && entry.sessionId ? entry.sessionId : path.basename(filePath, ".jsonl"); const tsMs = Number.isFinite(Number(entry.timestamp)) && Number(entry.timestamp) > 0 ? Number(entry.timestamp) : null; const messageId = typeof entry.uuid === "string" && entry.uuid ? entry.uuid : typeof entry.id === "string" && entry.id ? entry.id : tsMs != null ? `${sessionId}:${tsMs}` : null; if (!messageId) continue; if (seenIds.has(messageId)) continue; recordsProcessed++; const promptTokens = toNonNegativeInt(rawUsage.prompt_tokens); const completionTokens = toNonNegativeInt(rawUsage.completion_tokens); const details = rawUsage.prompt_tokens_details && typeof rawUsage.prompt_tokens_details === "object" ? rawUsage.prompt_tokens_details : {}; const cachedTokens = toNonNegativeInt(details.cached_tokens); // Anthropic-style mirror; CodeBuddy emits these too even if usually 0. const cacheReadAlt = toNonNegativeInt(rawUsage.cache_read_input_tokens); const cacheCreation = toNonNegativeInt(rawUsage.cache_creation_input_tokens); const reasoningTokens = toNonNegativeInt(details.reasoning_tokens); // CRITICAL: prompt_tokens is OpenAI-style and INCLUDES cached. // Subtract cached so input_tokens is pure non-cached input — matches // the repo's normalization convention (see CLAUDE.md "Token // Normalization Convention"). cache_read takes the larger of the two // mirrored fields (rawUsage.cache_read_input_tokens vs // prompt_tokens_details.cached_tokens) since CodeBuddy populates one // or the other depending on upstream provider. const cacheRead = Math.max(cachedTokens, cacheReadAlt); const inputTokens = Math.max(0, promptTokens - cacheRead); if ( inputTokens === 0 && completionTokens === 0 && cacheRead === 0 && cacheCreation === 0 ) { seenIds.add(messageId); continue; } if (tsMs == null) { seenIds.add(messageId); continue; } const tsIso = new Date(tsMs).toISOString(); const bucketStart = toUtcHalfHourStart(tsIso); if (!bucketStart) continue; const model = normalizeModelInput(provider?.model) || normalizeModelInput(entry.model) || fallbackModel; const delta = { input_tokens: inputTokens, cached_input_tokens: cacheRead, cache_creation_input_tokens: cacheCreation, output_tokens: completionTokens, reasoning_output_tokens: reasoningTokens, total_tokens: inputTokens + completionTokens + cacheRead + cacheCreation + reasoningTokens, conversation_count: 1, }; const bucket = getHourlyBucket(hourlyState, "codebuddy", model, bucketStart); addTotals(bucket.totals, delta); touchedBuckets.add(bucketKey("codebuddy", model, bucketStart)); seenIds.add(messageId); eventsAggregated++; if (cb) { cb({ index: fileIdx + 1, total: files.length, recordsProcessed, eventsAggregated, bucketsQueued: touchedBuckets.size, }); } } let postStat = stat; try { postStat = fssync.statSync(filePath); } catch {} fileOffsets[filePath] = { size: postStat.size, mtimeMs: postStat.mtimeMs, ino: postStat.ino, }; } // Cap dedup set to last 10k IDs to bound cursor state size — same convention // as Kimi/Copilot so cursors.json doesn't grow unbounded. const seenArr = Array.from(seenIds); const cappedSeen = seenArr.length > 10_000 ? seenArr.slice(seenArr.length - 10_000) : seenArr; const bucketsQueued = await enqueueTouchedBuckets({ queuePath, hourlyState, touchedBuckets, }); const updatedAt = new Date().toISOString(); hourlyState.updatedAt = updatedAt; cursors.hourly = hourlyState; cursors.codebuddy = { ...codebuddyState, seenIds: cappedSeen, fileOffsets, updatedAt, }; return { recordsProcessed, eventsAggregated, bucketsQueued }; } // ───────────────────────────────────────────────────────────────────────────── // oh-my-pi (omp) — passive JSONL reader (~/.omp/agent/sessions/**/*.jsonl) // // oh-my-pi writes one append-only JSONL per session: // ~/.omp/agent/sessions/----/_.jsonl // // Per-line record types: the first line is type:"session" (header). // Only type:"message" lines with message.role=="assistant" carry token usage. // The shape (verbatim from oh-my-pi docs/session.md): // // { // "type": "message", // "id": "a1b2c3d4", ← 8-char dedup key // "parentId": "...", // "timestamp": "2026-02-16T10:21:00.000Z", // "message": { // "role": "assistant", // "provider": "anthropic", // "model": "claude-sonnet-4-5", // "usage": { // "input": 100, "output": 20, "cacheRead": 0, "cacheWrite": 0, // "totalTokens": 120, "reasoningTokens": 0 // }, // "timestamp": 1760000000000 ← ms epoch, preferred for bucketing // } // } // // oh-my-pi is a router — dispatches to upstream providers (Anthropic, OpenAI, // etc.) and records the upstream model name per message. There is no global // default model setting; model is always per-message (fallback: "omp-unknown"). // ───────────────────────────────────────────────────────────────────────────── function resolveOmpHome(env = process.env) { const home = env.HOME || require("node:os").homedir(); // Honor TokenTracker override first, then oh-my-pi upstream env vars. if (env.OMP_HOME) return env.OMP_HOME; if (env.PI_CONFIG_DIR) return path.join(home, env.PI_CONFIG_DIR); return path.join(home, ".omp"); } // PI_CODING_AGENT_DIR is documented by both pi-coding-agent and oh-my-pi as // their agent directory override. When set, attribute it to whichever tool the // user actually has installed: ~/.pi present → "pi", otherwise "omp" (the // historical default in this codebase, preserved for back-compat). // // Users with both tools installed can disambiguate explicitly with // TOKENTRACKER_PI_AGENT_DIR / TOKENTRACKER_OMP_AGENT_DIR, which take // precedence in their respective resolvers. function decidePiCodingAgentDirOwner(env = process.env) { const home = env.HOME || require("node:os").homedir(); // Require an actual directory — a stray file (lockfile, junk) at ~/.pi // shouldn't reroute an existing oh-my-pi user's PI_CODING_AGENT_DIR override. try { if (fssync.statSync(path.join(home, ".pi")).isDirectory()) return "pi"; } catch { // ENOENT or EACCES — treat as "no pi install signal". } return "omp"; } function expandHomePath(dir, env = process.env) { if (typeof dir !== "string" || !dir) return dir; if (dir !== "~" && !dir.startsWith("~/")) return dir; const home = env.HOME || require("node:os").homedir(); return dir === "~" ? home : path.join(home, dir.slice(2)); } function resolveOmpAgentDir(env = process.env) { if (env.TOKENTRACKER_OMP_AGENT_DIR) { return expandHomePath(env.TOKENTRACKER_OMP_AGENT_DIR, env); } if (env.PI_CODING_AGENT_DIR && decidePiCodingAgentDirOwner(env) === "omp") { return expandHomePath(env.PI_CODING_AGENT_DIR, env); } return path.join(resolveOmpHome(env), "agent"); } function resolveOmpSessionFiles(env = process.env) { const sessionsDir = path.join(resolveOmpAgentDir(env), "sessions"); if (!fssync.existsSync(sessionsDir)) return []; const files = []; try { for (const cwdDir of fssync.readdirSync(sessionsDir)) { const cwdPath = path.join(sessionsDir, cwdDir); let stat; try { stat = fssync.statSync(cwdPath); } catch { continue; } if (!stat.isDirectory()) continue; let entries; try { entries = fssync.readdirSync(cwdPath); } catch { continue; } for (const entry of entries) { if (!entry.endsWith(".jsonl")) continue; files.push(path.join(cwdPath, entry)); } } } catch { // ignore — return what we have } files.sort((a, b) => a.localeCompare(b)); return files; } function resolveOmpDefaultModel() { // oh-my-pi has no global default model setting; model is per-message. return "omp-unknown"; } // ───────────────────────────────────────────────────────────────────────────── // Kilo Code VS Code extension — passive reader for VS Code-family // globalStorage/kilocode.kilo-code/tasks//ui_messages.json files. // // Each task folder contains a ui_messages.json (JSON array, not JSONL). Token // usage records are messages where `say == "api_req_started"`; the `text` // field is a JSON-stringified payload: // // { // "apiProtocol": "openai" | "anthropic" | ..., // "tokensIn": 28673, // request input (already excludes cache) // "tokensOut": 31, // completion // "cacheWrites": 0, // "cacheReads": 5120, // "cost": 0, // "usageMissing": false, // "inferenceProvider": "Moonshot AI" | "minimax" | ..., // } // // We scan every supported VS Code-family install (Cursor, Code, CodeBuddy, // Windsurf, …) under both Library/Application Support (macOS) and Linux/Win // equivalents. Files are small (median ~30KB) and rewritten on each turn — we // can't byte-tail them, so we read the whole file on every sync and dedupe by // (taskId, ts). Per-file mtime caching skips unchanged files. // ───────────────────────────────────────────────────────────────────────────── function resolveKilocodeRoots(env = process.env) { if (typeof env.TOKENTRACKER_KILOCODE_ROOTS === "string" && env.TOKENTRACKER_KILOCODE_ROOTS.trim()) { return env.TOKENTRACKER_KILOCODE_ROOTS.split(":") .map((r) => r.trim()) .filter(Boolean); } const home = env.HOME || require("node:os").homedir(); const candidates = []; if (process.platform === "darwin") { const base = path.join(home, "Library", "Application Support"); candidates.push( path.join(base, "Code"), path.join(base, "Code - Insiders"), path.join(base, "Cursor"), path.join(base, "CodeBuddy"), path.join(base, "Windsurf"), path.join(base, "VSCodium"), path.join(base, "Trae"), path.join(base, "Trae CN"), ); } else if (process.platform === "win32") { const appData = env.APPDATA || path.join(home, "AppData", "Roaming"); candidates.push( path.join(appData, "Code"), path.join(appData, "Code - Insiders"), path.join(appData, "Cursor"), path.join(appData, "CodeBuddy"), path.join(appData, "Windsurf"), path.join(appData, "VSCodium"), ); } else { const xdg = env.XDG_CONFIG_HOME || path.join(home, ".config"); candidates.push( path.join(xdg, "Code"), path.join(xdg, "Code - Insiders"), path.join(xdg, "Cursor"), path.join(xdg, "CodeBuddy"), path.join(xdg, "Windsurf"), path.join(xdg, "VSCodium"), ); } return candidates; } function resolveKilocodeTaskFiles(env = process.env) { const roots = resolveKilocodeRoots(env); const out = []; for (const root of roots) { const tasksDir = path.join(root, "User", "globalStorage", "kilocode.kilo-code", "tasks"); if (!fssync.existsSync(tasksDir)) continue; let entries; try { entries = fssync.readdirSync(tasksDir); } catch { continue; } for (const taskUuid of entries) { const filePath = path.join(tasksDir, taskUuid, "ui_messages.json"); if (!fssync.existsSync(filePath)) continue; out.push({ filePath, taskUuid, ide: path.basename(root) }); } } out.sort((a, b) => a.filePath.localeCompare(b.filePath)); return out; } // Kilo Code only persists the inference provider (e.g. "minimax", // "Moonshot AI", "Stealth") in ui_messages.json — the actual model id is // stored in workspace state but isn't attributed to individual turns and may // change across sessions, so we cannot map a row back to a model id reliably. // We surface the provider explicitly so the dashboard's Model column doesn't // imply this is a model. function normalizeKilocodeProviderToModel(providerName) { if (typeof providerName !== "string" || !providerName.trim()) return "provider:unknown"; const slug = providerName .trim() .toLowerCase() .replace(/\s+/g, "-") .replace(/[^a-z0-9._-]/g, ""); // A slug consisting only of separators (dashes/dots/underscores) carries no // information — treat it as unknown. if (!slug || !/[a-z0-9]/.test(slug)) return "provider:unknown"; return `provider:${slug}`; } // ───────────────────────────────────────────────────────────────────────────── // Roo Code (rooveterinaryinc.roo-cline) // // Same Cline-derived ui_messages.json format as Kilo Code, but two real // differences worth noting: // // 1. The model name is NOT in the per-turn payload (Roo Code only writes // provider via `apiProtocol`). It lives in a sibling // `api_conversation_history.json` inside `` blocks: // // // claude-3-7-sonnet-20250219 // // // We read the most recent occurrence — Roo can switch models mid-task, // so the last-seen value is the most accurate attribution; if the file // or tag is missing we fall back to `protocol:` (e.g. // `protocol:anthropic`) and finally to "unknown". // // 2. Same multi-IDE root scan as Kilo Code (Cursor, Code, CodeBuddy, …) — // we reuse resolveKilocodeRoots so both parsers stay in sync when a new // VS Code fork ships. // ───────────────────────────────────────────────────────────────────────────── function resolveRoocodeTaskFiles(env = process.env) { const roots = resolveKilocodeRoots(env); const out = []; for (const root of roots) { const tasksDir = path.join(root, "User", "globalStorage", "rooveterinaryinc.roo-cline", "tasks"); if (!fssync.existsSync(tasksDir)) continue; let entries; try { entries = fssync.readdirSync(tasksDir); } catch { continue; } for (const taskUuid of entries) { const filePath = path.join(tasksDir, taskUuid, "ui_messages.json"); if (!fssync.existsSync(filePath)) continue; out.push({ filePath, taskUuid, ide: path.basename(root) }); } } out.sort((a, b) => a.filePath.localeCompare(b.filePath)); return out; } // Pull the most recent from a Roo Code task's // api_conversation_history.json (each Cline turn appends a fresh // block). Returns null when the sibling file is // missing, unreadable, or contains no tag. Bounded to first 1MB to avoid // pathological history files starving sync. function readRoocodeTaskModel(uiMessagesPath) { const historyPath = path.join(path.dirname(uiMessagesPath), "api_conversation_history.json"); let raw; try { raw = fssync.readFileSync(historyPath, "utf8"); } catch { return null; } if (raw.length > 1_048_576) { // Naive `slice(raw.length - 1MB)` can split a `` // block mid-tag — e.g. the keep window starts at "...` start in the keep window so // every retained tag is intact. const naive = raw.slice(raw.length - 1_048_576); const blockStart = naive.indexOf(""); raw = blockStart >= 0 ? naive.slice(blockStart) : naive; } let lastModel = null; const re = /\s*([^<\s][^<]*?)\s*<\/model>/g; let m; while ((m = re.exec(raw)) !== null) { const value = m[1].trim(); if (value) lastModel = value; } return lastModel; } function normalizeRoocodeModel({ explicitModel, apiProtocol }) { const trimmed = typeof explicitModel === "string" ? explicitModel.trim() : ""; if (trimmed) return trimmed; if (typeof apiProtocol === "string" && apiProtocol.trim()) { const slug = apiProtocol.trim().toLowerCase().replace(/[^a-z0-9._-]/g, ""); if (slug) return `protocol:${slug}`; } return "unknown"; } async function parseRoocodeIncremental({ taskFiles, cursors, queuePath, onProgress, env, } = {}) { await ensureDir(path.dirname(queuePath)); const roocodeState = cursors.roocode && typeof cursors.roocode === "object" ? cursors.roocode : {}; const seenIds = new Set( Array.isArray(roocodeState.seenIds) ? roocodeState.seenIds : [], ); const fileOffsets = roocodeState.fileOffsets && typeof roocodeState.fileOffsets === "object" ? { ...roocodeState.fileOffsets } : {}; const files = Array.isArray(taskFiles) ? taskFiles : resolveRoocodeTaskFiles(env || process.env); if (files.length === 0) { cursors.roocode = { ...roocodeState, seenIds: Array.from(seenIds), fileOffsets, updatedAt: new Date().toISOString(), }; return { recordsProcessed: 0, eventsAggregated: 0, bucketsQueued: 0 }; } const hourlyState = normalizeHourlyState(cursors?.hourly); const touchedBuckets = new Set(); const cb = typeof onProgress === "function" ? onProgress : null; let recordsProcessed = 0; let eventsAggregated = 0; for (let fileIdx = 0; fileIdx < files.length; fileIdx++) { const entry = files[fileIdx]; const { filePath, taskUuid } = entry; let stat; try { stat = fssync.statSync(filePath); } catch { continue; } const prevEntry = fileOffsets[filePath]; if ( prevEntry && Number(prevEntry.size) === stat.size && Number(prevEntry.mtimeMs) === stat.mtimeMs ) { continue; } let raw; try { raw = fssync.readFileSync(filePath, "utf8"); } catch { continue; } let data; try { data = JSON.parse(raw); } catch { continue; } if (!Array.isArray(data)) continue; // Read sibling history once per task — model can change mid-task but is // stable enough at this granularity that re-reading on every entry would // just burn IO. Task attribution at the bucket layer is hourly anyway. const taskModel = readRoocodeTaskModel(filePath); for (const msg of data) { if (!msg || typeof msg !== "object") continue; // Like Kilo Code, accept both api_req_started (live) and api_req_deleted // (user-removed turn whose tokens were already consumed). if (msg.say !== "api_req_started" && msg.say !== "api_req_deleted") continue; if (typeof msg.text !== "string" || !msg.text.startsWith("{")) continue; let payload; try { payload = JSON.parse(msg.text); } catch { continue; } if (!payload || typeof payload !== "object") continue; const ts = Number(msg.ts); if (!Number.isFinite(ts) || ts <= 0) continue; const dedupKey = `${taskUuid}:${ts}`; recordsProcessed++; if (seenIds.has(dedupKey)) continue; const tokensIn = toNonNegativeInt(payload.tokensIn); const tokensOut = toNonNegativeInt(payload.tokensOut); const cacheReads = toNonNegativeInt(payload.cacheReads); const cacheWrites = toNonNegativeInt(payload.cacheWrites); if (tokensIn === 0 && tokensOut === 0 && cacheReads === 0 && cacheWrites === 0) { seenIds.add(dedupKey); continue; } const tsIso = new Date(ts).toISOString(); const bucketStart = toUtcHalfHourStart(tsIso); if (!bucketStart) continue; const delta = { input_tokens: tokensIn, cached_input_tokens: cacheReads, cache_creation_input_tokens: cacheWrites, output_tokens: tokensOut, reasoning_output_tokens: 0, total_tokens: tokensIn + tokensOut + cacheReads + cacheWrites, conversation_count: 1, }; const model = normalizeRoocodeModel({ explicitModel: taskModel, apiProtocol: payload.apiProtocol, }); const bucket = getHourlyBucket(hourlyState, "roocode", model, bucketStart); addTotals(bucket.totals, delta); touchedBuckets.add(bucketKey("roocode", model, bucketStart)); seenIds.add(dedupKey); eventsAggregated++; } fileOffsets[filePath] = { size: stat.size, mtimeMs: stat.mtimeMs, ino: stat.ino }; if (cb) { cb({ index: fileIdx + 1, total: files.length, recordsProcessed, eventsAggregated, bucketsQueued: touchedBuckets.size, }); } } const seenArr = Array.from(seenIds); const cappedSeen = seenArr.length > 50_000 ? seenArr.slice(seenArr.length - 50_000) : seenArr; const bucketsQueued = await enqueueTouchedBuckets({ queuePath, hourlyState, touchedBuckets }); const updatedAt = new Date().toISOString(); hourlyState.updatedAt = updatedAt; cursors.hourly = hourlyState; cursors.roocode = { ...roocodeState, seenIds: cappedSeen, fileOffsets, updatedAt }; return { recordsProcessed, eventsAggregated, bucketsQueued }; } // ───────────────────────────────────────────────────────────────────────────── // Zed Agent (hosted models only, provider == "zed.dev") // // Data: SQLite at // macOS: ~/Library/Application Support/Zed/threads/threads.db // Linux: $XDG_DATA_HOME/zed/threads/threads.db (defaults to ~/.local/share) // Windows: %LOCALAPPDATA%\Zed\threads\threads.db // // `threads` table stores one row per thread with a BLOB `data` column — // either raw JSON or zstd-compressed JSON (governed by `data_type`). Each // thread's JSON carries `cumulative_token_usage` and/or // `request_token_usage` (a map or array of per-request usages with // input_tokens / output_tokens / cache_read_input_tokens / // cache_creation_input_tokens). // // Threads grow over multiple turns — the row is rewritten with a larger // cumulative on every send, so naive dedup-by-id would freeze our count at // whatever the thread looked like the first time we saw it. We mirror the // antigravity cumulative-delta pattern: keep last-seen totals per thread in // `cursors.zed.threadTotals`, emit (current - previous) on each sync. // // External ACP agents are skipped (their CLIs already report to us through // their own parsers — counting them via the Zed UI would double-count). // ───────────────────────────────────────────────────────────────────────────── const ZED_HOSTED_PROVIDER = "zed.dev"; const MAX_ZED_THREAD_JSON_BYTES = 32 * 1024 * 1024; function resolveZedDbPath(env = process.env) { if (typeof env.TOKENTRACKER_ZED_DB === "string" && env.TOKENTRACKER_ZED_DB.trim()) { return env.TOKENTRACKER_ZED_DB.trim(); } const home = env.HOME || require("node:os").homedir(); if (process.platform === "darwin") { return path.join(home, "Library", "Application Support", "Zed", "threads", "threads.db"); } if (process.platform === "win32") { const local = env.LOCALAPPDATA || path.join(home, "AppData", "Local"); return path.join(local, "Zed", "threads", "threads.db"); } const xdg = env.XDG_DATA_HOME || path.join(home, ".local", "share"); return path.join(xdg, "zed", "threads", "threads.db"); } // Decode a row's BLOB payload into UTF-8 JSON text. Zed marks zstd-compressed // blobs with data_type="zstd"; older / smaller threads use data_type="json" // and store the bytes verbatim. Node 24+ has native zstd; Node 20 needs the // @mongodb-js/zstd fallback. Cap decoded size to mirror tokscale's safety net. async function decodeZedThreadBlob({ dataType, data }) { const type = (dataType || "").trim().toLowerCase(); if (type === "json") { if (data.length > MAX_ZED_THREAD_JSON_BYTES) { throw new Error(`json blob exceeds ${MAX_ZED_THREAD_JSON_BYTES} bytes`); } return data.toString("utf8"); } if (type === "zstd") { const zlib = require("node:zlib"); const out = typeof zlib.zstdDecompressSync === "function" ? zlib.zstdDecompressSync(data) : Buffer.from(await require("@mongodb-js/zstd").decompress(data)); if (out.length > MAX_ZED_THREAD_JSON_BYTES) { throw new Error(`decoded zstd blob exceeds ${MAX_ZED_THREAD_JSON_BYTES} bytes`); } return out.toString("utf8"); } throw new Error(`unsupported data_type: ${dataType}`); } // Pull the 4-tuple (input/output/cache_read/cache_write) out of one Zed // TokenUsage shape. Zed stores integers but some historical rows used // strings — match tokscale's permissive coercion. function readZedUsage(value) { if (!value || typeof value !== "object") return null; const coerce = (v) => { if (typeof v === "number") return Math.max(0, Math.floor(v)); if (typeof v === "string") { const n = Number.parseInt(v, 10); return Number.isFinite(n) && n > 0 ? n : 0; } return 0; }; return { input: coerce(value.input_tokens), output: coerce(value.output_tokens), cache_read: coerce(value.cache_read_input_tokens), cache_write: coerce(value.cache_creation_input_tokens), }; } function sumZedRequestUsage(value) { const total = { input: 0, output: 0, cache_read: 0, cache_write: 0 }; if (!value) return total; const iter = Array.isArray(value) ? value : typeof value === "object" ? Object.values(value) : []; for (const entry of iter) { const u = readZedUsage(entry); if (!u) continue; total.input += u.input; total.output += u.output; total.cache_read += u.cache_read; total.cache_write += u.cache_write; } return total; } // Extract token totals from a parsed Zed thread object. Prefer summed // request_token_usage (per-turn breakdown) and fall back to // cumulative_token_usage when the per-turn map is empty. function extractZedTotals(thread) { if (!thread || thread.imported === true) return null; const model = thread.model; if (!model || typeof model !== "object") return null; const provider = typeof model.provider === "string" ? model.provider.trim() : ""; if (provider.toLowerCase() !== ZED_HOSTED_PROVIDER) return null; const modelId = typeof model.model === "string" ? model.model.trim() : ""; if (!modelId) return null; const request = sumZedRequestUsage(thread.request_token_usage); if (request.input + request.output + request.cache_read + request.cache_write > 0) { return { totals: request, model: modelId }; } const cumulative = readZedUsage(thread.cumulative_token_usage); if ( cumulative && cumulative.input + cumulative.output + cumulative.cache_read + cumulative.cache_write > 0 ) { return { totals: cumulative, model: modelId }; } return null; } // Build a SELECT that only references columns we know exist — Zed has shipped // several `threads` schemas; older versions may omit created_at / // folder_paths. We dynamically detect via PRAGMA so the query never fails on // a missing column. function buildZedThreadsQuery(dbPath, cursorUpdatedAt, sqliteOptions = {}) { const pragmaRows = readSqliteJsonRows(dbPath, "PRAGMA table_info(threads)", { label: "Zed", maxBuffer: 4 * 1024 * 1024, timeout: 10_000, ...sqliteOptions, }); const columns = new Set( pragmaRows .map((row) => row?.name) .filter(Boolean), ); const optional = (col) => (columns.has(col) ? col : `NULL AS ${col}`); // Incremental: only fetch threads updated after the last sync watermark. // Without this we'd zstd-decode every thread on every sync (~250MB for a // 5k-thread DB on every menu-bar tick). Empty cursor → full scan (first // sync). updated_at is stored as ISO 8601 text, so lexical comparison == // chronological comparison. const escaped = typeof cursorUpdatedAt === "string" && cursorUpdatedAt ? cursorUpdatedAt.replace(/'/g, "''") : null; const where = escaped ? ` WHERE updated_at > '${escaped}'` : ""; return `SELECT id, updated_at, ${optional("created_at")}, data_type, hex(data) AS data_hex FROM threads${where}`; } function readZedThreadRowsFromSqlite(dbPath, cursorUpdatedAt, sqliteOptions = {}) { const query = buildZedThreadsQuery(dbPath, cursorUpdatedAt, sqliteOptions); return readSqliteJsonRows(dbPath, query, { label: "Zed", maxBuffer: 256 * 1024 * 1024, timeout: 60_000, ...sqliteOptions, }); } async function parseZedIncremental({ dbPath, cursors, queuePath, onProgress, env, sqliteOptions, } = {}) { await ensureDir(path.dirname(queuePath)); const resolvedDb = dbPath || resolveZedDbPath(env || process.env); const zedState = cursors.zed && typeof cursors.zed === "object" ? cursors.zed : {}; const threadTotals = zedState.threadTotals && typeof zedState.threadTotals === "object" ? { ...zedState.threadTotals } : {}; const cursorUpdatedAt = typeof zedState.lastUpdatedAt === "string" ? zedState.lastUpdatedAt : null; const cursorDbMtime = Number.isFinite(zedState.lastDbMtimeMs) ? zedState.lastDbMtimeMs : 0; // mtime short-circuit: if the SQLite file hasn't been touched since the // last sync there's nothing to read — skip the ~250MB copyFile + zstd // round-trip entirely. We still re-stat on the next call, so a Zed write // is picked up within one sync interval. let currentMtime = 0; try { currentMtime = fssync.statSync(resolvedDb).mtimeMs; } catch (e) { if (e && e.code === "ENOENT") { cursors.zed = { ...zedState, threadTotals, updatedAt: new Date().toISOString() }; return { recordsProcessed: 0, eventsAggregated: 0, bucketsQueued: 0 }; } throw e; } if (currentMtime > 0 && currentMtime === cursorDbMtime) { cursors.zed = { ...zedState, threadTotals, updatedAt: new Date().toISOString() }; return { recordsProcessed: 0, eventsAggregated: 0, bucketsQueued: 0 }; } // Snapshot via the shared helper so we get WAL/SHM/journal sidecar copies // too. Without sidecars, an active Zed write that's still in the WAL // would be missed (the .db has older pages until checkpoint). const snap = snapshotSqliteDb(resolvedDb); let rows = []; try { rows = readZedThreadRowsFromSqlite(snap.path, cursorUpdatedAt, sqliteOptions); } finally { snap.cleanup(); } if (rows.length === 0) { cursors.zed = { ...zedState, threadTotals, updatedAt: new Date().toISOString() }; return { recordsProcessed: 0, eventsAggregated: 0, bucketsQueued: 0 }; } const hourlyState = normalizeHourlyState(cursors?.hourly); const touchedBuckets = new Set(); const cb = typeof onProgress === "function" ? onProgress : null; let recordsProcessed = 0; let eventsAggregated = 0; for (let i = 0; i < rows.length; i++) { const row = rows[i]; recordsProcessed++; if (!row || typeof row.id !== "string" || !row.data_hex) continue; let blob; try { blob = Buffer.from(row.data_hex, "hex"); } catch { continue; } let jsonText; try { jsonText = await decodeZedThreadBlob({ dataType: row.data_type, data: blob }); } catch { continue; } let thread; try { thread = JSON.parse(jsonText); } catch { continue; } const extracted = extractZedTotals(thread); if (!extracted) continue; const prev = threadTotals[row.id] || { input: 0, output: 0, cache_read: 0, cache_write: 0 }; const curr = extracted.totals; const prevSum = prev.input + prev.output + prev.cache_read + prev.cache_write; const currSum = curr.input + curr.output + curr.cache_read + curr.cache_write; // Detect cumulative reset: a thread can be re-created with the same id // but lower totals (rare — Zed may purge & rewrite on import/export). // Naive `Math.max(0, curr - prev)` would clamp the delta to 0 and quietly // update the cursor to the smaller `curr`, so the next sync sees growth // from the reset and re-counts everything since. Treat reset as a // fresh-start emit of `curr`. const isReset = currSum > 0 && currSum < prevSum; const delta = isReset ? { ...curr } : { input: Math.max(0, curr.input - prev.input), output: Math.max(0, curr.output - prev.output), cache_read: Math.max(0, curr.cache_read - prev.cache_read), cache_write: Math.max(0, curr.cache_write - prev.cache_write), }; const totalDelta = delta.input + delta.output + delta.cache_read + delta.cache_write; if (totalDelta <= 0) { if ( curr.input !== prev.input || curr.output !== prev.output || curr.cache_read !== prev.cache_read || curr.cache_write !== prev.cache_write ) { threadTotals[row.id] = curr; } continue; } const tsIso = (typeof row.updated_at === "string" && row.updated_at) || (typeof row.created_at === "string" && row.created_at) || (typeof thread.updated_at === "string" && thread.updated_at) || new Date().toISOString(); const bucketStart = toUtcHalfHourStart(tsIso); if (!bucketStart) continue; const bucketDelta = { input_tokens: delta.input, cached_input_tokens: delta.cache_read, cache_creation_input_tokens: delta.cache_write, output_tokens: delta.output, reasoning_output_tokens: 0, total_tokens: totalDelta, conversation_count: 1, }; const bucket = getHourlyBucket(hourlyState, "zed", extracted.model, bucketStart); addTotals(bucket.totals, bucketDelta); touchedBuckets.add(bucketKey("zed", extracted.model, bucketStart)); threadTotals[row.id] = curr; eventsAggregated++; if (cb) { cb({ index: i + 1, total: rows.length, recordsProcessed, eventsAggregated, bucketsQueued: touchedBuckets.size, }); } } // Compute nextCursor BEFORE the 10k cap. If we capped first, a low-volume // zed.dev thread evicted in the cap step would no longer be in // threadTotals, so its updated_at would not advance the cursor — and the // next sync's WHERE filter would re-read & re-decode the same blob forever. // We record everything we touched this run regardless of post-cap eviction. let nextCursor = cursorUpdatedAt; for (const r of rows) { if ( typeof r.updated_at === "string" && threadTotals[r.id] !== undefined && (nextCursor == null || r.updated_at > nextCursor) ) { nextCursor = r.updated_at; } } const entries = Object.entries(threadTotals); if (entries.length > 10_000) { entries.sort((a, b) => { const ta = a[1].input + a[1].output + a[1].cache_read + a[1].cache_write; const tb = b[1].input + b[1].output + b[1].cache_read + b[1].cache_write; return tb - ta; }); const capped = Object.fromEntries(entries.slice(0, 10_000)); for (const k of Object.keys(threadTotals)) delete threadTotals[k]; Object.assign(threadTotals, capped); } const bucketsQueued = await enqueueTouchedBuckets({ queuePath, hourlyState, touchedBuckets }); const updatedAt = new Date().toISOString(); hourlyState.updatedAt = updatedAt; cursors.hourly = hourlyState; cursors.zed = { ...zedState, threadTotals, lastUpdatedAt: nextCursor, lastDbMtimeMs: currentMtime, updatedAt, }; return { recordsProcessed, eventsAggregated, bucketsQueued }; } // ───────────────────────────────────────────────────────────────────────────── // Goose (Block AI agent — github.com/block/goose) // // Data: SQLite at // macOS: ~/Library/Application Support/goose/sessions/sessions.db // Linux: $XDG_DATA_HOME/goose/sessions/sessions.db (~/.local/share) // Legacy: ~/.local/share/Block/goose/sessions/sessions.db // Windows: %APPDATA%\goose\sessions\sessions.db // Override: $GOOSE_PATH_ROOT/data/sessions/sessions.db // // `sessions` table: one row per session, columns: // id, model_config_json ({"model_name":"..."}), // provider_name, created_at, // total_tokens / input_tokens / output_tokens (latest turn), // accumulated_total_tokens / accumulated_input_tokens / // accumulated_output_tokens (whole-session cumulative). // // We prefer accumulated_* (gives lifetime usage), with single-turn fallback. // Goose has no cache fields; if total > input+output, the excess is treated // as reasoning_output_tokens (same heuristic as tokscale). // // Session rows grow over time → same cumulative-delta pattern as Zed // (cursors.goose.sessionTotals tracks last-seen per session). // ───────────────────────────────────────────────────────────────────────────── function resolveGooseDbPath(env = process.env) { if (typeof env.TOKENTRACKER_GOOSE_DB === "string" && env.TOKENTRACKER_GOOSE_DB.trim()) { return env.TOKENTRACKER_GOOSE_DB.trim(); } const root = typeof env.GOOSE_PATH_ROOT === "string" ? env.GOOSE_PATH_ROOT.trim() : ""; if (root) return path.join(root, "data", "sessions", "sessions.db"); const home = env.HOME || require("node:os").homedir(); const candidates = []; if (process.platform === "darwin") { candidates.push( path.join(home, "Library", "Application Support", "goose", "sessions", "sessions.db"), ); } else if (process.platform === "win32") { const appData = env.APPDATA || path.join(home, "AppData", "Roaming"); candidates.push(path.join(appData, "goose", "sessions", "sessions.db")); } const xdg = env.XDG_DATA_HOME || path.join(home, ".local", "share"); candidates.push( path.join(xdg, "goose", "sessions", "sessions.db"), path.join(xdg, "Block", "goose", "sessions", "sessions.db"), ); // Default to first existing; if none, return the platform-canonical path so // status can report it cleanly without throwing. for (const c of candidates) { if (fssync.existsSync(c)) return c; } return candidates[0]; } function parseGooseModelName(modelConfigJson) { if (typeof modelConfigJson !== "string" || !modelConfigJson.trim()) return null; try { const obj = JSON.parse(modelConfigJson); if (obj && typeof obj.model_name === "string") { const trimmed = obj.model_name.trim(); return trimmed || null; } } catch (_e) { /* ignore */ } return null; } // Goose stores created_at in multiple formats across versions: RFC3339 // (preferred), "YYYY-MM-DD HH:MM:SS" (naive UTC), or bare "YYYY-MM-DD". // Return ISO 8601 string, or null on failure. function parseGooseCreatedAt(s) { if (typeof s !== "string" || !s.trim()) return null; const trimmed = s.trim(); // Match naive UTC formats FIRST — otherwise `new Date("2026-05-21 14:30:00")` // is interpreted in the local zone, shifting the bucket by ±N hours. const dt = /^(\d{4})-(\d{2})-(\d{2})[T ](\d{2}):(\d{2}):(\d{2})$/.exec(trimmed); if (dt) { const d = new Date(Date.UTC(+dt[1], +dt[2] - 1, +dt[3], +dt[4], +dt[5], +dt[6])); return d.toISOString(); } const dateOnly = /^(\d{4})-(\d{2})-(\d{2})$/.exec(trimmed); if (dateOnly) { const d = new Date(Date.UTC(+dateOnly[1], +dateOnly[2] - 1, +dateOnly[3])); return d.toISOString(); } // Anything else — RFC3339, "Z"-suffixed, "+HH:MM" — let Date handle it. const iso = new Date(trimmed); if (!Number.isNaN(iso.getTime())) return iso.toISOString(); return null; } function readGooseSessionsFromSqlite(dbPath, sqliteOptions = {}) { // Probe columns: the `accumulated_*` fields were added in a later Goose // version; we keep the query forgiving so older installs still work. const pragmaRows = readSqliteJsonRows(dbPath, "PRAGMA table_info(sessions)", { label: "Goose", maxBuffer: 4 * 1024 * 1024, timeout: 10_000, ...sqliteOptions, }); const columns = new Set( pragmaRows .map((row) => row?.name) .filter(Boolean), ); const optional = (col) => (columns.has(col) ? col : `NULL AS ${col}`); const sql = ` SELECT id, model_config_json, ${optional("provider_name")}, created_at, ${optional("total_tokens")}, ${optional("input_tokens")}, ${optional("output_tokens")}, ${optional("accumulated_total_tokens")}, ${optional("accumulated_input_tokens")}, ${optional("accumulated_output_tokens")} FROM sessions WHERE model_config_json IS NOT NULL AND TRIM(model_config_json) != '' `.trim(); return readSqliteJsonRows(dbPath, sql, { label: "Goose", maxBuffer: 64 * 1024 * 1024, timeout: 60_000, ...sqliteOptions, }); } async function parseGooseIncremental({ dbPath, cursors, queuePath, onProgress, env, sqliteOptions, } = {}) { await ensureDir(path.dirname(queuePath)); const resolvedDb = dbPath || resolveGooseDbPath(env || process.env); const gooseState = cursors.goose && typeof cursors.goose === "object" ? cursors.goose : {}; const sessionTotals = gooseState.sessionTotals && typeof gooseState.sessionTotals === "object" ? { ...gooseState.sessionTotals } : {}; const cursorDbMtime = Number.isFinite(gooseState.lastDbMtimeMs) ? gooseState.lastDbMtimeMs : 0; let currentMtime = 0; try { currentMtime = fssync.statSync(resolvedDb).mtimeMs; } catch (e) { if (e && e.code === "ENOENT") { cursors.goose = { ...gooseState, sessionTotals, updatedAt: new Date().toISOString() }; return { recordsProcessed: 0, eventsAggregated: 0, bucketsQueued: 0 }; } throw e; } // mtime short-circuit: skip the full sessions table scan when the DB // hasn't been touched since the last sync. if (currentMtime > 0 && currentMtime === cursorDbMtime) { cursors.goose = { ...gooseState, sessionTotals, updatedAt: new Date().toISOString() }; return { recordsProcessed: 0, eventsAggregated: 0, bucketsQueued: 0 }; } // Snapshot via the shared helper to capture WAL/SHM sidecars — Goose // writes async, so without them an in-flight session would read stale. const snap = snapshotSqliteDb(resolvedDb); let rows = []; try { rows = readGooseSessionsFromSqlite(snap.path, sqliteOptions); } finally { snap.cleanup(); } if (rows.length === 0) { cursors.goose = { ...gooseState, sessionTotals, updatedAt: new Date().toISOString() }; return { recordsProcessed: 0, eventsAggregated: 0, bucketsQueued: 0 }; } const hourlyState = normalizeHourlyState(cursors?.hourly); const touchedBuckets = new Set(); const cb = typeof onProgress === "function" ? onProgress : null; let recordsProcessed = 0; let eventsAggregated = 0; for (let i = 0; i < rows.length; i++) { const row = rows[i]; recordsProcessed++; if (!row || typeof row.id !== "string") continue; const model = parseGooseModelName(row.model_config_json); if (!model) continue; // Prefer accumulated_*; fall back to single-turn columns. const totalNow = Math.max( 0, Number(row.accumulated_total_tokens ?? row.total_tokens ?? 0) || 0, ); const inputNow = Math.max( 0, Number(row.accumulated_input_tokens ?? row.input_tokens ?? 0) || 0, ); const outputNow = Math.max( 0, Number(row.accumulated_output_tokens ?? row.output_tokens ?? 0) || 0, ); if (totalNow === 0 && inputNow === 0 && outputNow === 0) continue; const prev = sessionTotals[row.id] || { input: 0, output: 0, total: 0 }; // Goose can wipe a session and re-create with the same id during // database migration. Treat shrinking cumulative as a reset and emit // the full curr value, otherwise the next sync's growth would // double-count everything from the reset. const isReset = totalNow > 0 && totalNow < prev.total; const dInput = isReset ? inputNow : Math.max(0, inputNow - prev.input); const dOutput = isReset ? outputNow : Math.max(0, outputNow - prev.output); const dTotal = isReset ? totalNow : Math.max(0, totalNow - prev.total); if (dInput === 0 && dOutput === 0 && dTotal === 0) { if ( prev.input !== inputNow || prev.output !== outputNow || prev.total !== totalNow ) { sessionTotals[row.id] = { input: inputNow, output: outputNow, total: totalNow }; } continue; } // If total grew more than (input + output), treat the excess as reasoning // — matches Goose's accounting (it lumps reasoning into `total_tokens`). const accountedDelta = dInput + dOutput; const reasoningDelta = Math.max(0, dTotal - accountedDelta); const tsIso = parseGooseCreatedAt(row.created_at) || new Date().toISOString(); const bucketStart = toUtcHalfHourStart(tsIso); if (!bucketStart) continue; // Token normalization: input_tokens = non-cached input; Goose has no // cache fields → all input lands in input_tokens. Total stays consistent // with: input + output + reasoning (no cache columns). const bucketDelta = { input_tokens: dInput, cached_input_tokens: 0, cache_creation_input_tokens: 0, output_tokens: dOutput, reasoning_output_tokens: reasoningDelta, total_tokens: dInput + dOutput + reasoningDelta, conversation_count: 1, }; const bucket = getHourlyBucket(hourlyState, "goose", model, bucketStart); addTotals(bucket.totals, bucketDelta); touchedBuckets.add(bucketKey("goose", model, bucketStart)); sessionTotals[row.id] = { input: inputNow, output: outputNow, total: totalNow }; eventsAggregated++; if (cb) { cb({ index: i + 1, total: rows.length, recordsProcessed, eventsAggregated, bucketsQueued: touchedBuckets.size, }); } } // Cap cursor at 10k sessions (largest by lifetime usage). const entries = Object.entries(sessionTotals); if (entries.length > 10_000) { entries.sort((a, b) => b[1].total - a[1].total); const capped = Object.fromEntries(entries.slice(0, 10_000)); for (const k of Object.keys(sessionTotals)) delete sessionTotals[k]; Object.assign(sessionTotals, capped); } const bucketsQueued = await enqueueTouchedBuckets({ queuePath, hourlyState, touchedBuckets }); const updatedAt = new Date().toISOString(); hourlyState.updatedAt = updatedAt; cursors.hourly = hourlyState; cursors.goose = { ...gooseState, sessionTotals, lastDbMtimeMs: currentMtime, updatedAt, }; return { recordsProcessed, eventsAggregated, bucketsQueued }; } // ───────────────────────────────────────────────────────────────────────────── // Droid (Factory CLI) — passive reader for ~/.factory/sessions/**/*.settings.json // // Each Droid session has two sibling files: // .jsonl — per-message transcript (no token counts) // .settings.json — JSON object whose tokenUsage holds the // CUMULATIVE session-level total: // { // "model": "custom:GLM-5.1-[Proxy]-0", // "providerLock": "anthropic", // "providerLockTimestamp": "2026-05-21T12:34:56.000Z", // "tokenUsage": { // "inputTokens": 12345, // already excludes cached reads // "outputTokens": 678, // "cacheCreationTokens": 0, // "cacheReadTokens": 0, // "thinkingTokens": 0 // } // } // // Droid records totals at session granularity (not per message). We treat each // settings file as a cumulative counter and emit (current - previous) deltas, // the same cumulative-delta pattern as Goose/Cursor. Bucket timestamp is the // settings file's mtime — the file is rewritten each turn, so mtime is the // most accurate "when did these new tokens land" signal we have. // ───────────────────────────────────────────────────────────────────────────── function resolveDroidSessionsDirs(env = process.env) { if (typeof env.DROID_SESSIONS_DIR === "string" && env.DROID_SESSIONS_DIR.trim()) { return env.DROID_SESSIONS_DIR.split(",") .map((d) => expandHomePath(d.trim(), env)) .filter(Boolean); } if (typeof env.FACTORY_DIR === "string" && env.FACTORY_DIR.trim()) { return [path.join(expandHomePath(env.FACTORY_DIR.trim(), env), "sessions")]; } const home = env.HOME || require("node:os").homedir(); return [path.join(home, ".factory", "sessions")]; } function resolveDroidSessionsDir(env = process.env) { return resolveDroidSessionsDirs(env)[0]; } function listDroidSettingsFiles(env = process.env) { const dirs = resolveDroidSessionsDirs(env); const out = []; const walk = (dir) => { let entries; try { entries = fssync.readdirSync(dir, { withFileTypes: true }); } catch { return; } for (const entry of entries) { const full = path.join(dir, entry.name); if (entry.isDirectory()) { walk(full); } else if (entry.isFile() && entry.name.endsWith(".settings.json")) { out.push(full); } } }; for (const dir of dirs) { if (!fssync.existsSync(dir)) continue; walk(dir); } out.sort((a, b) => a.localeCompare(b)); return out; } // Strip Droid's wrapper to leave a comparable model id. Mirrors ccusage's // `normalize_droid_model_name` (rust/crates/ccusage/src/adapter/droid/parser.rs) // so the same input produces the same bucket key across both tools: // "custom:GLM-5.1-[Proxy]-0" -> "glm-5-1-0" // "anthropic/claude-sonnet-4-5" -> "anthropic/claude-sonnet-4-5" // "glm_5_1" -> "glm_5_1" (underscore preserved) // IMPORTANT: only whitespace, `.`, and existing dashes collapse to a single // `-`. Underscores are kept verbatim — diverging here would split `glm_5_1` // rows from ccusage's equivalent rows in cross-tool comparisons. function normalizeDroidModelName(raw) { if (typeof raw !== "string") return ""; let s = raw.startsWith("custom:") ? raw.slice("custom:".length) : raw; s = s.replace(/\[[^\]]*\]/g, ""); s = s.toLowerCase(); s = s.replace(/[\s.]+/g, "-"); s = s.replace(/-+/g, "-"); s = s.replace(/^-+|-+$/g, ""); return s; } // Mirror ccusage's `normalize_droid_provider`: collapse aliases for the four // known upstream families. Anything else falls through to the literal value // (or "unknown" when the input is empty/garbage). function normalizeDroidProvider(raw) { if (typeof raw !== "string") return "unknown"; const v = raw.trim().toLowerCase().replace(/-/g, "_"); if (!v) return "unknown"; if (v === "claude" || v === "anthropic") return "anthropic"; if (v === "openai") return "openai"; if ( v === "google" || v === "google_ai" || v === "gemini" || v === "vertex" || v === "vertex_ai" ) return "google"; if (v === "xai" || v === "x_ai" || v === "grok") return "xai"; return v; } // When `providerLock` is missing, ccusage infers the family from the model // name itself. We replicate the same heuristic so empty-providerLock sessions // still bucket into `claude-unknown` / `gpt-unknown` / etc. rather than a // generic "unknown". function inferDroidProviderFromModel(model) { if (typeof model !== "string" || !model) return "unknown"; const m = model.toLowerCase(); if ( m.includes("claude") || m.includes("opus") || m.includes("sonnet") || m.includes("haiku") ) return "anthropic"; if ( m.startsWith("gpt-") || m.includes("-gpt-") || m.includes("chatgpt") || /^o\d/.test(m) ) return "openai"; if (m.includes("gemini")) return "google"; if (m.includes("grok")) return "xai"; return "unknown"; } function defaultDroidModelForProvider(provider) { switch (provider) { case "anthropic": return "claude-unknown"; case "openai": return "gpt-unknown"; case "google": return "gemini-unknown"; case "xai": return "grok-unknown"; default: return "unknown"; } } // When `settings.model` is missing, ccusage scans the sibling `.jsonl` // transcript for a line containing `Model:` and pulls the name from there. // We mirror that exactly — same first-500-lines cap, same terminator chars // (`"`, `\`, `[`) — so empty-model droid sessions don't all bucket under // "unknown". function extractDroidModelFromSidecarJsonl(settingsPath) { if (typeof settingsPath !== "string") return ""; if (!settingsPath.endsWith(".settings.json")) return ""; const sidecar = settingsPath.slice(0, -".settings.json".length) + ".jsonl"; let raw; try { raw = fssync.readFileSync(sidecar, "utf8"); } catch { return ""; } const lines = raw.split("\n"); const limit = Math.min(lines.length, 500); for (let i = 0; i < limit; i++) { const idx = lines[i].indexOf("Model:"); if (idx < 0) continue; const tail = lines[i].slice(idx + "Model:".length); // Stop at the first quote, backslash, or bracket — mirrors ccusage. let cut = tail.length; for (const ch of ['"', "\\", "["]) { const p = tail.indexOf(ch); if (p >= 0 && p < cut) cut = p; } const candidate = tail.slice(0, cut).trim(); if (!candidate) continue; const normalized = normalizeDroidModelName(candidate); if (normalized) return normalized; } return ""; } // ccusage's `apply_total_token_fallback`: if the five detail counters // underflow the session's `totalTokens`, attribute the gap. Prefer assigning // it to output (the field most likely to be missing on older settings.json // schemas); if output is already populated, fold the extra into the thinking // (reasoning_output_tokens) channel so total stays consistent. Mirrors // rust/crates/ccusage/src/utils.rs verbatim. function applyDroidTotalFallback(usage) { const known = usage.input + usage.output + usage.cacheCreation + usage.cacheRead + usage.thinking; const total = usage.totalTokens || 0; const missing = total > known ? total - known : 0; if (missing === 0) return usage; if (usage.output === 0) { return { ...usage, output: missing }; } return { ...usage, thinking: usage.thinking + missing }; } // Session id = basename minus `.settings.json`, mirroring ccusage's keying. // Stable across FACTORY_DIR / HOME / mount-point moves because Droid uses // UUID-style session ids (collision risk between projects is negligible). function droidSessionIdFromPath(filePath) { if (typeof filePath !== "string" || !filePath) return ""; const base = path.basename(filePath); if (!base.endsWith(".settings.json")) return ""; return base.slice(0, -".settings.json".length); } async function parseDroidIncremental({ settingsFiles, cursors, queuePath, onProgress, env, // `prune: true` (the production default) drops cursor entries whose session // id was not observed this run — handles `.settings.json` files removed // off disk so the cursor doesn't grow unbounded. Tests that pass an // intentionally partial `settingsFiles` list should set `prune: false` to // keep unobserved entries. prune = true, } = {}) { await ensureDir(path.dirname(queuePath)); const droidState = cursors.droid && typeof cursors.droid === "object" ? cursors.droid : {}; const sessionTotals = droidState.sessionTotals && typeof droidState.sessionTotals === "object" ? { ...droidState.sessionTotals } : {}; const files = Array.isArray(settingsFiles) ? settingsFiles : listDroidSettingsFiles(env || process.env); if (files.length === 0) { cursors.droid = { ...droidState, sessionTotals, updatedAt: new Date().toISOString(), }; return { recordsProcessed: 0, eventsAggregated: 0, bucketsQueued: 0 }; } const hourlyState = normalizeHourlyState(cursors?.hourly); const touchedBuckets = new Set(); const cb = typeof onProgress === "function" ? onProgress : null; let recordsProcessed = 0; let eventsAggregated = 0; // Track which session ids we observed this run so we can prune cursor // entries for files that disappeared off disk — keeps the cursor bounded // by actual session count without the false-first-sight re-emit bug that // a fixed-N cap would introduce (evicted-but-still-on-disk entries would // resurrect as zero-prev on the next sync and re-count their cumulative). const seenSessionIds = new Set(); for (let i = 0; i < files.length; i++) { const filePath = files[i]; recordsProcessed++; let mtimeMs = 0; try { mtimeMs = fssync.statSync(filePath).mtimeMs; } catch (e) { if (e && e.code === "ENOENT") continue; throw e; } // Key by session id (the UUID-style filename without `.settings.json`) // so the cursor survives FACTORY_DIR / HOME / mount-point migrations. // Mirrors ccusage's session_id derivation (parser.rs::load_settings_file). const sessionId = droidSessionIdFromPath(filePath); if (!sessionId) continue; seenSessionIds.add(sessionId); const prev = sessionTotals[sessionId] || { input: 0, output: 0, cacheCreation: 0, cacheRead: 0, thinking: 0, mtimeMs: 0, }; const isFirstSeenSession = !sessionTotals[sessionId]; if (mtimeMs && mtimeMs === prev.mtimeMs) continue; let raw; try { raw = fssync.readFileSync(filePath, "utf8"); } catch { continue; } let settings; try { settings = JSON.parse(raw); } catch { continue; } if (!settings || typeof settings !== "object") continue; const tokenUsage = settings.tokenUsage; if (!tokenUsage || typeof tokenUsage !== "object") continue; const filled = applyDroidTotalFallback({ input: Math.max(0, Number(tokenUsage.inputTokens || 0)), output: Math.max(0, Number(tokenUsage.outputTokens || 0)), cacheCreation: Math.max(0, Number(tokenUsage.cacheCreationTokens || 0)), cacheRead: Math.max(0, Number(tokenUsage.cacheReadTokens || 0)), thinking: Math.max(0, Number(tokenUsage.thinkingTokens || 0)), totalTokens: Math.max(0, Number(tokenUsage.totalTokens || 0)), }); const inputNow = filled.input; const outputNow = filled.output; const cacheCreationNow = filled.cacheCreation; const cacheReadNow = filled.cacheRead; const thinkingNow = filled.thinking; const sumNow = inputNow + outputNow + cacheCreationNow + cacheReadNow + thinkingNow; const sumPrev = prev.input + prev.output + prev.cacheCreation + prev.cacheRead + prev.thinking; // Transient empty: settings.json was observed with zero tokens (mid-write // or a brief wipe before the next turn restores totals). Do NOT clobber // the existing per-field baseline — only bump mtimeMs so we don't re-read // the same empty payload next sync. If we overwrote prev with zeros, a // later non-empty read would emit the full cumulative as a fresh delta. if (sumNow === 0) { if (sumPrev > 0) { sessionTotals[sessionId] = { ...prev, mtimeMs }; } else { sessionTotals[sessionId] = { input: 0, output: 0, cacheCreation: 0, cacheRead: 0, thinking: 0, mtimeMs, }; } continue; } // Reset only when the TOTAL shrinks — a real session reuse (Droid wiped // tokenUsage and started over). A single field dropping while the sum // grows is a schema change or cache eviction; clamping per-field deltas // to >=0 is the right behavior for those. const isReset = sumNow < sumPrev; const dInput = isReset ? inputNow : Math.max(0, inputNow - prev.input); const dOutput = isReset ? outputNow : Math.max(0, outputNow - prev.output); const dCacheCreation = isReset ? cacheCreationNow : Math.max(0, cacheCreationNow - prev.cacheCreation); const dCacheRead = isReset ? cacheReadNow : Math.max(0, cacheReadNow - prev.cacheRead); const dThinking = isReset ? thinkingNow : Math.max(0, thinkingNow - prev.thinking); if (dInput + dOutput + dCacheCreation + dCacheRead + dThinking === 0) { sessionTotals[sessionId] = { input: inputNow, output: outputNow, cacheCreation: cacheCreationNow, cacheRead: cacheReadNow, thinking: thinkingNow, mtimeMs, }; continue; } const bucketStart = toUtcHalfHourStart( new Date(mtimeMs || Date.now()).toISOString(), ); if (!bucketStart) continue; // Model resolution mirrors ccusage's chain: settings.model → sidecar // .jsonl scrape → `-unknown` derived from providerLock or // inferred from the model fragment we did find. Same fallback string set // (claude-unknown / gpt-unknown / gemini-unknown / grok-unknown) so // empty-model sessions bucket identically across both tools. let model = normalizeDroidModelName(settings.model); if (!model) model = extractDroidModelFromSidecarJsonl(filePath); if (!model) { let provider = normalizeDroidProvider(settings.providerLock); if (provider === "unknown") { provider = inferDroidProviderFromModel(settings.model || ""); } model = defaultDroidModelForProvider(provider); } // Token normalization: inputTokens already excludes cache reads (matches // Anthropic API convention), so cache columns slot in directly. Thinking // is reasoning_output_tokens — folded into cost via existing pricing path. const bucketDelta = { input_tokens: dInput, cached_input_tokens: dCacheRead, cache_creation_input_tokens: dCacheCreation, output_tokens: dOutput, reasoning_output_tokens: dThinking, total_tokens: dInput + dOutput + dCacheCreation + dCacheRead + dThinking, conversation_count: isFirstSeenSession || isReset ? 1 : 0, }; const bucket = getHourlyBucket(hourlyState, "droid", model, bucketStart); addTotals(bucket.totals, bucketDelta); touchedBuckets.add(bucketKey("droid", model, bucketStart)); sessionTotals[sessionId] = { input: inputNow, output: outputNow, cacheCreation: cacheCreationNow, cacheRead: cacheReadNow, thinking: thinkingNow, mtimeMs, }; eventsAggregated++; if (cb) { cb({ index: i + 1, total: files.length, recordsProcessed, eventsAggregated, bucketsQueued: touchedBuckets.size, }); } } // Prune cursor entries for sessions that no longer appear on disk. Driven // by an explicit `prune` flag (default true) — not by the shape of // `settingsFiles` — so production callers that pass an explicit file list // still get pruning, while tests passing an intentionally partial subset // can opt out with `prune: false`. if (prune) { for (const id of Object.keys(sessionTotals)) { if (!seenSessionIds.has(id)) delete sessionTotals[id]; } } const bucketsQueued = await enqueueTouchedBuckets({ queuePath, hourlyState, touchedBuckets, }); const updatedAt = new Date().toISOString(); hourlyState.updatedAt = updatedAt; cursors.hourly = hourlyState; cursors.droid = { ...droidState, sessionTotals, updatedAt, }; return { recordsProcessed, eventsAggregated, bucketsQueued }; } async function parseKilocodeIncremental({ taskFiles, cursors, queuePath, onProgress, env, } = {}) { await ensureDir(path.dirname(queuePath)); const kilocodeState = cursors.kilocode && typeof cursors.kilocode === "object" ? cursors.kilocode : {}; const seenIds = new Set( Array.isArray(kilocodeState.seenIds) ? kilocodeState.seenIds : [], ); const fileOffsets = kilocodeState.fileOffsets && typeof kilocodeState.fileOffsets === "object" ? { ...kilocodeState.fileOffsets } : {}; const files = Array.isArray(taskFiles) ? taskFiles : resolveKilocodeTaskFiles(env || process.env); if (files.length === 0) { cursors.kilocode = { ...kilocodeState, seenIds: Array.from(seenIds), fileOffsets, updatedAt: new Date().toISOString(), }; return { recordsProcessed: 0, eventsAggregated: 0, bucketsQueued: 0 }; } const hourlyState = normalizeHourlyState(cursors?.hourly); const touchedBuckets = new Set(); const cb = typeof onProgress === "function" ? onProgress : null; let recordsProcessed = 0; let eventsAggregated = 0; for (let fileIdx = 0; fileIdx < files.length; fileIdx++) { const entry = files[fileIdx]; const { filePath, taskUuid } = entry; let stat; try { stat = fssync.statSync(filePath); } catch { continue; } const prevEntry = fileOffsets[filePath]; if ( prevEntry && Number(prevEntry.size) === stat.size && Number(prevEntry.mtimeMs) === stat.mtimeMs ) { continue; } let raw; try { raw = fssync.readFileSync(filePath, "utf8"); } catch { continue; } let data; try { data = JSON.parse(raw); } catch { continue; } if (!Array.isArray(data)) continue; for (const msg of data) { if (!msg || typeof msg !== "object") continue; // `api_req_started` is the live billing record; `api_req_deleted` keeps // the same payload when a user removes a turn from the task (Cline-style // edit-and-retry) — tokens were already consumed by the provider, so we // still count them. if (msg.say !== "api_req_started" && msg.say !== "api_req_deleted") continue; if (typeof msg.text !== "string" || !msg.text.startsWith("{")) continue; let payload; try { payload = JSON.parse(msg.text); } catch { continue; } if (!payload || typeof payload !== "object") continue; const ts = Number(msg.ts); if (!Number.isFinite(ts) || ts <= 0) continue; const dedupKey = `${taskUuid}:${ts}`; recordsProcessed++; if (seenIds.has(dedupKey)) continue; const tokensIn = toNonNegativeInt(payload.tokensIn); const tokensOut = toNonNegativeInt(payload.tokensOut); const cacheReads = toNonNegativeInt(payload.cacheReads); const cacheWrites = toNonNegativeInt(payload.cacheWrites); if (tokensIn === 0 && tokensOut === 0 && cacheReads === 0 && cacheWrites === 0) { seenIds.add(dedupKey); continue; } const tsIso = new Date(ts).toISOString(); const bucketStart = toUtcHalfHourStart(tsIso); if (!bucketStart) continue; const delta = { input_tokens: tokensIn, cached_input_tokens: cacheReads, cache_creation_input_tokens: cacheWrites, output_tokens: tokensOut, reasoning_output_tokens: 0, total_tokens: tokensIn + tokensOut + cacheReads + cacheWrites, conversation_count: 1, }; const model = normalizeKilocodeProviderToModel(payload.inferenceProvider); const bucket = getHourlyBucket(hourlyState, "kilo-code", model, bucketStart); addTotals(bucket.totals, delta); touchedBuckets.add(bucketKey("kilo-code", model, bucketStart)); seenIds.add(dedupKey); eventsAggregated++; } fileOffsets[filePath] = { size: stat.size, mtimeMs: stat.mtimeMs, ino: stat.ino }; if (cb) { cb({ index: fileIdx + 1, total: files.length, recordsProcessed, eventsAggregated, bucketsQueued: touchedBuckets.size, }); } } // Cap seenIds to last 50k to bound cursor state size const seenArr = Array.from(seenIds); const cappedSeen = seenArr.length > 50_000 ? seenArr.slice(seenArr.length - 50_000) : seenArr; const bucketsQueued = await enqueueTouchedBuckets({ queuePath, hourlyState, touchedBuckets }); const updatedAt = new Date().toISOString(); hourlyState.updatedAt = updatedAt; cursors.hourly = hourlyState; cursors.kilocode = { ...kilocodeState, seenIds: cappedSeen, fileOffsets, updatedAt }; return { recordsProcessed, eventsAggregated, bucketsQueued }; } async function parseOmpIncremental({ sessionFiles, cursors, queuePath, onProgress, env, defaultModel, } = {}) { await ensureDir(path.dirname(queuePath)); const ompState = cursors.omp && typeof cursors.omp === "object" ? cursors.omp : {}; const seenIds = new Set(Array.isArray(ompState.seenIds) ? ompState.seenIds : []); const fileOffsets = ompState.fileOffsets && typeof ompState.fileOffsets === "object" ? { ...ompState.fileOffsets } : {}; const files = Array.isArray(sessionFiles) ? sessionFiles : resolveOmpSessionFiles(env || process.env); const fallbackModel = defaultModel || resolveOmpDefaultModel(); if (files.length === 0) { cursors.omp = { ...ompState, seenIds: Array.from(seenIds), fileOffsets, updatedAt: new Date().toISOString(), }; return { recordsProcessed: 0, eventsAggregated: 0, bucketsQueued: 0 }; } const hourlyState = normalizeHourlyState(cursors?.hourly); const touchedBuckets = new Set(); const cb = typeof onProgress === "function" ? onProgress : null; let recordsProcessed = 0; let eventsAggregated = 0; for (let fileIdx = 0; fileIdx < files.length; fileIdx++) { const filePath = files[fileIdx]; let stat; try { stat = fssync.statSync(filePath); } catch { continue; } const prevEntry = fileOffsets[filePath] || {}; const prevSize = Number(prevEntry.size) || 0; const prevIno = prevEntry.ino; // Re-read from start if file shrunk (truncate/rewrite) or inode changed. const inodeChanged = typeof prevIno === "number" && prevIno !== stat.ino; const startOffset = stat.size < prevSize || inodeChanged ? 0 : prevSize; if (stat.size <= startOffset) continue; let stream; try { stream = fssync.createReadStream(filePath, { encoding: "utf8", start: startOffset, }); } catch { continue; } const rl = readline.createInterface({ input: stream, crlfDelay: Infinity }); for await (const line of rl) { if (!line || !line.trim()) continue; let entry; try { entry = JSON.parse(line); } catch { continue; } // First line of each file is type:"session" (header) — skip all // non-message records. if (!entry || entry.type !== "message") continue; // Only assistant messages carry token usage. const msg = entry.message; if (!msg || msg.role !== "assistant") continue; const usage = msg.usage; if (!usage || typeof usage !== "object") continue; // Dedup by top-level entry id (8-char string assigned by oh-my-pi). const entryId = typeof entry.id === "string" && entry.id ? entry.id : null; if (!entryId) continue; if (seenIds.has(entryId)) continue; recordsProcessed++; const input = toNonNegativeInt(usage.input); const output = toNonNegativeInt(usage.output); const cacheRead = toNonNegativeInt(usage.cacheRead); const cacheWrite = toNonNegativeInt(usage.cacheWrite); const reasoningTokens = toNonNegativeInt(usage.reasoningTokens); if (input === 0 && output === 0 && cacheRead === 0 && cacheWrite === 0) { seenIds.add(entryId); continue; } // Prefer message-level timestamp (ms epoch); fall back to entry-level // ISO string. Entries with no resolvable timestamp are skipped — they // cannot be placed in a bucket. let tsMs = null; if (Number.isFinite(Number(msg.timestamp)) && Number(msg.timestamp) > 0) { tsMs = Number(msg.timestamp); } else if (typeof entry.timestamp === "string" && entry.timestamp) { const parsed = Date.parse(entry.timestamp); if (Number.isFinite(parsed) && parsed > 0) tsMs = parsed; } if (tsMs == null) { seenIds.add(entryId); continue; } const tsIso = new Date(tsMs).toISOString(); const bucketStart = toUtcHalfHourStart(tsIso); if (!bucketStart) continue; // Use provided totalTokens when available; otherwise sum all components. const totalTokens = Number.isFinite(Number(usage.totalTokens)) && Number(usage.totalTokens) > 0 ? toNonNegativeInt(usage.totalTokens) : input + output + cacheRead + cacheWrite + reasoningTokens; const model = normalizeModelInput(msg.model) || fallbackModel; const delta = { input_tokens: input, cached_input_tokens: cacheRead, cache_creation_input_tokens: cacheWrite, output_tokens: output, reasoning_output_tokens: reasoningTokens, total_tokens: totalTokens, conversation_count: 1, }; const bucket = getHourlyBucket(hourlyState, "omp", model, bucketStart); addTotals(bucket.totals, delta); touchedBuckets.add(bucketKey("omp", model, bucketStart)); seenIds.add(entryId); eventsAggregated++; if (cb) { cb({ index: fileIdx + 1, total: files.length, recordsProcessed, eventsAggregated, bucketsQueued: touchedBuckets.size, }); } } let postStat = stat; try { postStat = fssync.statSync(filePath); } catch {} fileOffsets[filePath] = { size: postStat.size, mtimeMs: postStat.mtimeMs, ino: postStat.ino, }; } // Cap dedup set to last 10k IDs to bound cursor state size — same convention // as Kimi/CodeBuddy/Copilot so cursors.json doesn't grow unbounded. const seenArr = Array.from(seenIds); const cappedSeen = seenArr.length > 10_000 ? seenArr.slice(seenArr.length - 10_000) : seenArr; const bucketsQueued = await enqueueTouchedBuckets({ queuePath, hourlyState, touchedBuckets, }); const updatedAt = new Date().toISOString(); hourlyState.updatedAt = updatedAt; cursors.hourly = hourlyState; cursors.omp = { ...ompState, seenIds: cappedSeen, fileOffsets, updatedAt, }; return { recordsProcessed, eventsAggregated, bucketsQueued }; } // ───────────────────────────────────────────────────────────────────────────── // pi (@mariozechner/pi-coding-agent) — passive JSONL reader // (~/.pi/agent/sessions/**/*.jsonl) // // Same on-disk session format as oh-my-pi (omp): one JSONL file per session, // first line type:"session" header, then a tree of message/model_change/etc. // records. Token usage lives on type:"message" entries with role:"assistant" // under message.usage. // // PI_CODING_AGENT_DIR is shared with omp (both upstream tools document it). // resolvePiAgentDir / resolveOmpAgentDir use decidePiCodingAgentDirOwner to // route the override to exactly one provider so the same sessions dir is // never scanned twice. // ───────────────────────────────────────────────────────────────────────────── function resolvePiHome(env = process.env) { const home = env.HOME || require("node:os").homedir(); return path.join(home, ".pi"); } function resolvePiAgentDir(env = process.env) { if (env.TOKENTRACKER_PI_AGENT_DIR) { return expandHomePath(env.TOKENTRACKER_PI_AGENT_DIR, env); } if (env.PI_CODING_AGENT_DIR && decidePiCodingAgentDirOwner(env) === "pi") { return expandHomePath(env.PI_CODING_AGENT_DIR, env); } return path.join(resolvePiHome(env), "agent"); } // Defense in depth for invariant 2 (no double-count). Two explicit overrides // pointing at the same path (e.g. TOKENTRACKER_OMP_AGENT_DIR === TOKENTRACKER_PI_AGENT_DIR, // or TOKENTRACKER_OMP_AGENT_DIR === PI_CODING_AGENT_DIR with ~/.pi present) bypass // the install-signal disambiguator and would otherwise have both providers scan // the same sessions directory under different `source` tags. function piAgentDirCollidesWithOmp(env = process.env) { return path.resolve(resolvePiAgentDir(env)) === path.resolve(resolveOmpAgentDir(env)); } function resolvePiSessionFiles(env = process.env) { const sessionsDir = path.join(resolvePiAgentDir(env), "sessions"); if (!fssync.existsSync(sessionsDir)) return []; const files = []; try { for (const cwdDir of fssync.readdirSync(sessionsDir)) { const cwdPath = path.join(sessionsDir, cwdDir); let stat; try { stat = fssync.statSync(cwdPath); } catch { continue; } if (!stat.isDirectory()) continue; let entries; try { entries = fssync.readdirSync(cwdPath); } catch { continue; } for (const entry of entries) { if (!entry.endsWith(".jsonl")) continue; files.push(path.join(cwdPath, entry)); } } } catch { // ignore — return what we have } files.sort((a, b) => a.localeCompare(b)); return files; } function resolvePiDefaultModel() { // pi has no global default model; model is per-message. return "pi-unknown"; } async function parsePiIncremental({ sessionFiles, cursors, queuePath, onProgress, env, defaultModel, } = {}) { await ensureDir(path.dirname(queuePath)); const piState = cursors.pi && typeof cursors.pi === "object" ? cursors.pi : {}; const seenIds = new Set(Array.isArray(piState.seenIds) ? piState.seenIds : []); const fileOffsets = piState.fileOffsets && typeof piState.fileOffsets === "object" ? { ...piState.fileOffsets } : {}; const files = Array.isArray(sessionFiles) ? sessionFiles : resolvePiSessionFiles(env || process.env); const fallbackModel = defaultModel || resolvePiDefaultModel(); if (files.length === 0) { cursors.pi = { ...piState, seenIds: Array.from(seenIds), fileOffsets, updatedAt: new Date().toISOString(), }; return { recordsProcessed: 0, eventsAggregated: 0, bucketsQueued: 0 }; } const hourlyState = normalizeHourlyState(cursors?.hourly); const touchedBuckets = new Set(); const cb = typeof onProgress === "function" ? onProgress : null; let recordsProcessed = 0; let eventsAggregated = 0; for (let fileIdx = 0; fileIdx < files.length; fileIdx++) { const filePath = files[fileIdx]; let stat; try { stat = fssync.statSync(filePath); } catch { continue; } const prevEntry = fileOffsets[filePath] || {}; const prevSize = Number(prevEntry.size) || 0; const prevIno = prevEntry.ino; const inodeChanged = typeof prevIno === "number" && prevIno !== stat.ino; const startOffset = stat.size < prevSize || inodeChanged ? 0 : prevSize; if (stat.size <= startOffset) continue; let stream; try { stream = fssync.createReadStream(filePath, { encoding: "utf8", start: startOffset, }); } catch { continue; } const rl = readline.createInterface({ input: stream, crlfDelay: Infinity }); for await (const line of rl) { if (!line || !line.trim()) continue; let entry; try { entry = JSON.parse(line); } catch { continue; } if (!entry || entry.type !== "message") continue; const msg = entry.message; if (!msg || msg.role !== "assistant") continue; const usage = msg.usage; if (!usage || typeof usage !== "object") continue; const entryId = typeof entry.id === "string" && entry.id ? entry.id : null; if (!entryId) continue; if (seenIds.has(entryId)) continue; recordsProcessed++; const input = toNonNegativeInt(usage.input); const output = toNonNegativeInt(usage.output); const cacheRead = toNonNegativeInt(usage.cacheRead); const cacheWrite = toNonNegativeInt(usage.cacheWrite); const reasoningTokens = toNonNegativeInt(usage.reasoningTokens); if (input === 0 && output === 0 && cacheRead === 0 && cacheWrite === 0) { seenIds.add(entryId); continue; } let tsMs = null; if (Number.isFinite(Number(msg.timestamp)) && Number(msg.timestamp) > 0) { tsMs = Number(msg.timestamp); } else if (typeof entry.timestamp === "string" && entry.timestamp) { const parsed = Date.parse(entry.timestamp); if (Number.isFinite(parsed) && parsed > 0) tsMs = parsed; } if (tsMs == null) { seenIds.add(entryId); continue; } const tsIso = new Date(tsMs).toISOString(); const bucketStart = toUtcHalfHourStart(tsIso); if (!bucketStart) continue; const totalTokens = Number.isFinite(Number(usage.totalTokens)) && Number(usage.totalTokens) > 0 ? toNonNegativeInt(usage.totalTokens) : input + output + cacheRead + cacheWrite + reasoningTokens; const model = normalizeModelInput(msg.model) || fallbackModel; const delta = { input_tokens: input, cached_input_tokens: cacheRead, cache_creation_input_tokens: cacheWrite, output_tokens: output, reasoning_output_tokens: reasoningTokens, total_tokens: totalTokens, conversation_count: 1, }; const bucket = getHourlyBucket(hourlyState, "pi", model, bucketStart); addTotals(bucket.totals, delta); touchedBuckets.add(bucketKey("pi", model, bucketStart)); seenIds.add(entryId); eventsAggregated++; if (cb) { cb({ index: fileIdx + 1, total: files.length, recordsProcessed, eventsAggregated, bucketsQueued: touchedBuckets.size, }); } } let postStat = stat; try { postStat = fssync.statSync(filePath); } catch {} fileOffsets[filePath] = { size: postStat.size, mtimeMs: postStat.mtimeMs, ino: postStat.ino, }; } const seenArr = Array.from(seenIds); const cappedSeen = seenArr.length > 10_000 ? seenArr.slice(seenArr.length - 10_000) : seenArr; const bucketsQueued = await enqueueTouchedBuckets({ queuePath, hourlyState, touchedBuckets, }); const updatedAt = new Date().toISOString(); hourlyState.updatedAt = updatedAt; cursors.hourly = hourlyState; cursors.pi = { ...piState, seenIds: cappedSeen, fileOffsets, updatedAt, }; return { recordsProcessed, eventsAggregated, bucketsQueued }; } // ───────────────────────────────────────────────────────────────────────────── // Craft Agents (lukilabs/craft-agents-oss) — passive JSONL reader // // Craft is a desktop Electron agent that wraps the Claude Agent SDK plus // multiple LLM backends (Anthropic, OpenAI, Google, GitHub Copilot, OpenRouter, // Groq, Mistral, DeepSeek, xAI, Bedrock, Vertex). It writes per-session JSONL // files with a pre-aggregated SessionTokenUsage block on the FIRST line: // // line 1: SessionHeader // { // "id": "260430-swift-river", // "model": "claude-sonnet-4-6", // "llmConnection": "anthropic-default", // "lastMessageAt": 1745003600000, // "tokenUsage": { // "inputTokens": 1234, ← pure non-cached input // "outputTokens": 567, // "totalTokens": 9876, // "cacheReadTokens": 5500, // "cacheCreationTokens": 1100 // } // } // line 2..N: StoredMessage records (we do not need them for token totals) // // Disk layout: // ~/.craft-agent/ ← config dir (override: CRAFT_CONFIG_DIR) // config.json ← workspaces[].rootPath list // workspaces//sessions//session.jsonl (default) // /sessions//session.jsonl (custom workspaces) // // Workspaces can be relocated outside ~/.craft-agent, so we MUST read // config.json to enumerate every rootPath rather than just globbing the // default directory. // // Token semantics map directly onto TokenTracker conventions — `inputTokens` // is already pure non-cached input (no Codex-style trap, see // feedback_rollout_input_semantics.md). Re-parses are idempotent: the header // is rewritten as the session grows, and we dedup by sessionId combined with // the most-recent header byte length so a growing total replaces the old // snapshot instead of double-counting. // ───────────────────────────────────────────────────────────────────────────── function resolveCraftConfigDir(env = process.env) { if (env.CRAFT_CONFIG_DIR) return env.CRAFT_CONFIG_DIR; const home = env.HOME || require("node:os").homedir(); return path.join(home, ".craft-agent"); } function resolveCraftWorkspaceRoots(env = process.env) { const configDir = resolveCraftConfigDir(env); const roots = new Set(); // Always include the default workspaces directory so a fresh install (no // config.json yet) still gets discovered. const defaultWorkspaces = path.join(configDir, "workspaces"); if (fssync.existsSync(defaultWorkspaces)) { try { for (const entry of fssync.readdirSync(defaultWorkspaces)) { const wsPath = path.join(defaultWorkspaces, entry); let stat; try { stat = fssync.statSync(wsPath); } catch { continue; } if (stat.isDirectory()) roots.add(wsPath); } } catch { // ignore } } // Layer in user-relocated workspaces from config.json. const configPath = path.join(configDir, "config.json"); if (fssync.existsSync(configPath)) { try { const raw = fssync.readFileSync(configPath, "utf8"); const cfg = JSON.parse(raw); const list = Array.isArray(cfg?.workspaces) ? cfg.workspaces : []; for (const ws of list) { const root = ws && typeof ws.rootPath === "string" ? ws.rootPath : null; if (root && fssync.existsSync(root)) roots.add(root); } } catch { // malformed config.json — fall back to default discovery only } } return Array.from(roots).sort((a, b) => a.localeCompare(b)); } function resolveCraftSessionFiles(env = process.env) { const roots = resolveCraftWorkspaceRoots(env); if (roots.length === 0) return []; const files = []; for (const root of roots) { const sessionsDir = path.join(root, "sessions"); if (!fssync.existsSync(sessionsDir)) continue; let entries; try { entries = fssync.readdirSync(sessionsDir); } catch { continue; } for (const sessionId of entries) { const sessionDir = path.join(sessionsDir, sessionId); let stat; try { stat = fssync.statSync(sessionDir); } catch { continue; } if (!stat.isDirectory()) continue; const filePath = path.join(sessionDir, "session.jsonl"); if (fssync.existsSync(filePath)) files.push(filePath); } } files.sort((a, b) => a.localeCompare(b)); return files; } function resolveCraftDefaultModel() { // Craft is a router. Per-session header carries the actual model. return "craft-unknown"; } async function parseCraftIncremental({ sessionFiles, cursors, queuePath, onProgress, env, defaultModel, } = {}) { await ensureDir(path.dirname(queuePath)); const craftState = cursors.craft && typeof cursors.craft === "object" ? cursors.craft : {}; // Per-session previous totals so each re-parse only contributes the delta // of the running token totals (the header rewrites in place as the session // grows). Shape: { [sessionId]: { input, output, cacheRead, cacheWrite, total } } const sessionTotals = craftState.sessionTotals && typeof craftState.sessionTotals === "object" ? { ...craftState.sessionTotals } : {}; const files = Array.isArray(sessionFiles) ? sessionFiles : resolveCraftSessionFiles(env || process.env); const fallbackModel = defaultModel || resolveCraftDefaultModel(); if (files.length === 0) { cursors.craft = { ...craftState, sessionTotals, updatedAt: new Date().toISOString(), }; return { recordsProcessed: 0, eventsAggregated: 0, bucketsQueued: 0 }; } const hourlyState = normalizeHourlyState(cursors?.hourly); const touchedBuckets = new Set(); const cb = typeof onProgress === "function" ? onProgress : null; let recordsProcessed = 0; let eventsAggregated = 0; for (let fileIdx = 0; fileIdx < files.length; fileIdx++) { const filePath = files[fileIdx]; let stat; try { stat = fssync.statSync(filePath); } catch { continue; } // Read only the FIRST line — the SessionHeader carries the running totals. // Streaming the whole file would be wasted work since we don't use // per-message records for token accounting. We cap at 1 MiB to bound // memory if the first line is unexpectedly huge; real headers observed // in v0.9.0 are ~1–2 KiB so this is generous. let header = null; let parseError = null; let stream; try { stream = fssync.createReadStream(filePath, { encoding: "utf8", end: 1024 * 1024 - 1, }); } catch { continue; } const rl = readline.createInterface({ input: stream, crlfDelay: Infinity }); for await (const line of rl) { if (!line || !line.trim()) continue; try { header = JSON.parse(line); } catch (e) { parseError = e; header = null; } break; } rl.close(); try { stream.destroy(); } catch {} if (!header || typeof header !== "object") { if (parseError && process.env.TOKENTRACKER_DEBUG) { process.stderr.write( `[craft] header parse failed for ${filePath}: ${parseError.message}\n`, ); } continue; } const usage = header.tokenUsage; if (!usage || typeof usage !== "object") continue; const sessionId = typeof header.id === "string" && header.id ? header.id : (typeof header.sdkSessionId === "string" && header.sdkSessionId ? header.sdkSessionId : null); if (!sessionId) continue; recordsProcessed++; const totalInput = toNonNegativeInt(usage.inputTokens); const totalOutput = toNonNegativeInt(usage.outputTokens); const totalCacheRead = toNonNegativeInt(usage.cacheReadTokens); const totalCacheWrite = toNonNegativeInt(usage.cacheCreationTokens); const totalReported = Number.isFinite(Number(usage.totalTokens)) && Number(usage.totalTokens) > 0 ? toNonNegativeInt(usage.totalTokens) : totalInput + totalOutput + totalCacheRead + totalCacheWrite; const prev = sessionTotals[sessionId] || { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0, }; // Compute the delta since the last sync. Negative deltas mean the session // was reset/truncated — clamp to 0 and replace the snapshot. const dInput = Math.max(0, totalInput - prev.input); const dOutput = Math.max(0, totalOutput - prev.output); const dCacheRead = Math.max(0, totalCacheRead - prev.cacheRead); const dCacheWrite = Math.max(0, totalCacheWrite - prev.cacheWrite); const dTotal = Math.max(0, totalReported - prev.total); const nowMs = Date.now(); if (dInput === 0 && dOutput === 0 && dCacheRead === 0 && dCacheWrite === 0) { // No new usage since last parse — but still update the snapshot in case // an earlier truncate left it stale, and refresh lastSeenAt so the // eviction policy treats the session as live. sessionTotals[sessionId] = { input: totalInput, output: totalOutput, cacheRead: totalCacheRead, cacheWrite: totalCacheWrite, total: totalReported, lastSeenAt: nowMs, }; continue; } // Bucket on lastMessageAt (preferred) or createdAt — both ms epoch. let tsMs = null; const tsCandidates = [header.lastMessageAt, header.lastUsedAt, header.createdAt]; for (const cand of tsCandidates) { if (Number.isFinite(Number(cand)) && Number(cand) > 0) { tsMs = Number(cand); break; } } if (tsMs == null) tsMs = stat.mtimeMs; if (!Number.isFinite(tsMs) || tsMs <= 0) continue; const tsIso = new Date(tsMs).toISOString(); const bucketStart = toUtcHalfHourStart(tsIso); if (!bucketStart) continue; const model = normalizeModelInput(header.model) || fallbackModel; // conversation_count: 1 the first time we see a session, 0 on subsequent // syncs of the same session. NOTE: this differs from omp/Claude which // count one-per-assistant-message. Cross-provider "conversations" totals // are therefore not directly comparable — Craft's are per-session. const delta = { input_tokens: dInput, cached_input_tokens: dCacheRead, cache_creation_input_tokens: dCacheWrite, output_tokens: dOutput, reasoning_output_tokens: 0, total_tokens: dTotal > 0 ? dTotal : dInput + dOutput + dCacheRead + dCacheWrite, conversation_count: prev.total === 0 ? 1 : 0, }; const bucket = getHourlyBucket(hourlyState, "craft", model, bucketStart); addTotals(bucket.totals, delta); touchedBuckets.add(bucketKey("craft", model, bucketStart)); eventsAggregated++; sessionTotals[sessionId] = { input: totalInput, output: totalOutput, cacheRead: totalCacheRead, cacheWrite: totalCacheWrite, total: totalReported, lastSeenAt: nowMs, }; if (cb) { cb({ index: fileIdx + 1, total: files.length, recordsProcessed, eventsAggregated, bucketsQueued: touchedBuckets.size, }); } } // Cap session-totals map at 5k entries to bound cursor state size. Evict by // lastSeenAt (least-recently-seen first) so that long-lived sessions stay // tracked even when many newer one-shot sessions cycle through. Insertion // order would silently re-zero a long-running session and double-count its // total on the next sync. const entries = Object.entries(sessionTotals); let capped = sessionTotals; if (entries.length > 5000) { entries.sort((a, b) => (a[1]?.lastSeenAt || 0) - (b[1]?.lastSeenAt || 0)); capped = Object.fromEntries(entries.slice(entries.length - 5000)); } const bucketsQueued = await enqueueTouchedBuckets({ queuePath, hourlyState, touchedBuckets, }); const updatedAt = new Date().toISOString(); hourlyState.updatedAt = updatedAt; cursors.hourly = hourlyState; cursors.craft = { ...craftState, sessionTotals: capped, updatedAt, }; return { recordsProcessed, eventsAggregated, bucketsQueued }; } // ───────────────────────────────────────────────────────────────────────────── // GitHub Copilot CLI — OpenTelemetry JSONL exporter // User must opt in by setting: // COPILOT_OTEL_ENABLED=true // COPILOT_OTEL_EXPORTER_TYPE=file // COPILOT_OTEL_FILE_EXPORTER_PATH=$HOME/.copilot/otel/copilot-otel-...jsonl // We scan the default directory plus the env-overridden path. // ───────────────────────────────────────────────────────────────────────────── function resolveCopilotOtelPaths(env = process.env) { const home = env.HOME || require("node:os").homedir(); const paths = new Set(); const defaultDir = path.join(home, ".copilot", "otel"); if (fssync.existsSync(defaultDir)) { try { for (const entry of fssync.readdirSync(defaultDir)) { if (entry.endsWith(".jsonl")) paths.add(path.join(defaultDir, entry)); } } catch (_e) {} } const explicit = env.COPILOT_OTEL_FILE_EXPORTER_PATH; if (typeof explicit === "string" && explicit.trim() && fssync.existsSync(explicit)) { paths.add(explicit); } return Array.from(paths).sort(); } function isCopilotChatSpan(record) { if (!record || typeof record !== "object") return false; // Skip metric records (resource + scopeMetrics) which have no chat usage data if (record.scopeMetrics) return false; const opName = record?.attributes?.["gen_ai.operation.name"]; // Both Copilot CLI (Span shape with type:"span") and Copilot Chat extension // (OTEL JS SDK LogRecord shape with event.name:"gen_ai.client.inference.operation.details") // mark chat completions with gen_ai.operation.name === "chat". if (opName === "chat") return true; if (record.type === "span" && typeof record.name === "string" && record.name.startsWith("chat ")) { return true; } return false; } function copilotOtelTimeToMs(value) { if (!Array.isArray(value) || value.length < 2) return null; const seconds = Number(value[0]); const nanos = Number(value[1]); if (!Number.isFinite(seconds)) return null; const ns = Number.isFinite(nanos) ? nanos : 0; return Math.round(seconds * 1000 + ns / 1_000_000); } function pickCopilotModel(attrs) { const candidates = [attrs?.["gen_ai.response.model"], attrs?.["gen_ai.request.model"]]; for (const c of candidates) { if (typeof c === "string" && c.trim()) return c.trim(); } return null; } const COPILOT_PARSER_VERSION = 2; function isCopilotV1ChatSpan(record) { if (!record || record.type !== "span") return false; const opName = record?.attributes?.["gen_ai.operation.name"]; if (opName === "chat") return true; return typeof record.name === "string" && record.name.startsWith("chat "); } function copilotLineHash(line) { return crypto.createHash("sha256").update(line).digest("hex"); } function incrementMapCount(map, key, amount = 1) { map.set(key, (map.get(key) || 0) + amount); } function getCopilotDedupKey(record, attrs = record?.attributes || {}) { const traceId = record?.traceId || record?.spanContext?.traceId || ""; const spanId = record?.spanId || record?.spanContext?.spanId || ""; const responseId = typeof attrs["gen_ai.response.id"] === "string" ? attrs["gen_ai.response.id"] : ""; return traceId && spanId ? `${traceId}:${spanId}` : responseId ? `resp:${responseId}` : null; } // Migration helper: stream the bytes v1 already saw (0 -> prevSize), classify // whether the file contains old CLI spans v1 processed, and whether it also // contains v2-only chat records v1 skipped. Mixed files must be replayed, but // their old CLI lines are skipped by hash so history does not double-count. async function scanCopilotV1MigrationFile(filePath, maxBytes) { const result = { v1Processed: false, v2OnlyChat: false, v1LineHashes: new Map(), }; if (!maxBytes || maxBytes <= 0) return result; try { const stream = fssync.createReadStream(filePath, { encoding: "utf8", start: 0, end: maxBytes - 1, }); const rl = readline.createInterface({ input: stream, crlfDelay: Infinity }); try { for await (const line of rl) { if (!line || !line.trim()) continue; let record; try { record = JSON.parse(line); } catch (_e) { continue; } // Must mirror v1's isCopilotChatSpan exactly: BOTH the // gen_ai.operation.name path AND the legacy name-prefix fallback. // Missing the second path lets metric-free files of name-only CLI spans // look like "v1 skipped" -> offset reset -> re-read -> double-count. if (isCopilotV1ChatSpan(record)) { result.v1Processed = true; incrementMapCount(result.v1LineHashes, copilotLineHash(line)); } else if (isCopilotChatSpan(record)) { result.v2OnlyChat = true; } } } finally { rl.close(); stream.destroy(); } } catch (_e) {} return result; } async function parseCopilotIncremental({ otelPaths, cursors, queuePath, onProgress, env } = {}) { await ensureDir(path.dirname(queuePath)); const copilotState = cursors.copilot && typeof cursors.copilot === "object" ? cursors.copilot : {}; const seenIds = new Set(Array.isArray(copilotState.seenIds) ? copilotState.seenIds : []); const priorVersion = Number(copilotState.version) || 1; const fileOffsetsRaw = copilotState.fileOffsets && typeof copilotState.fileOffsets === "object" ? copilotState.fileOffsets : {}; const fileOffsets = { ...fileOffsetsRaw }; const migrationSkipLineHashes = new Map(); // One-shot v1->v2 migration: // - pure v2-only files: clear offset and re-read all skipped Chat records // - pure v1 CLI files: preserve offset to avoid replaying history beyond seenIds // - mixed files: clear offset, but skip old v1 CLI lines by hash during replay if (priorVersion < COPILOT_PARSER_VERSION) { for (const filePath of Object.keys(fileOffsets)) { const prevSize = Number(fileOffsets[filePath]?.size) || 0; const scan = await scanCopilotV1MigrationFile(filePath, prevSize); if (!scan.v1Processed) { delete fileOffsets[filePath]; } else if (scan.v2OnlyChat) { delete fileOffsets[filePath]; migrationSkipLineHashes.set(filePath, scan.v1LineHashes); } } } const files = Array.isArray(otelPaths) && otelPaths.length > 0 ? otelPaths : resolveCopilotOtelPaths(env || process.env); if (files.length === 0) { cursors.copilot = { ...copilotState, version: COPILOT_PARSER_VERSION, seenIds: Array.from(seenIds), fileOffsets, updatedAt: new Date().toISOString(), }; return { recordsProcessed: 0, eventsAggregated: 0, bucketsQueued: 0 }; } const hourlyState = normalizeHourlyState(cursors?.hourly); const touchedBuckets = new Set(); const cb = typeof onProgress === "function" ? onProgress : null; let recordsProcessed = 0; let eventsAggregated = 0; for (let fileIdx = 0; fileIdx < files.length; fileIdx++) { const filePath = files[fileIdx]; let stat; try { stat = fssync.statSync(filePath); } catch (_e) { continue; } const prevEntry = fileOffsets[filePath] || {}; const prevSize = Number(prevEntry.size) || 0; const prevIno = prevEntry.ino; // Re-read from start if (a) file shrunk (truncate/rewrite in place) or // (b) inode changed (rotator deleted + recreated at same path). Without // the inode check, a rotator producing a same-or-larger file would leave // the old offset stuck and skip the new file's prefix forever. const inodeChanged = typeof prevIno === "number" && prevIno !== stat.ino; const startOffset = stat.size < prevSize || inodeChanged ? 0 : prevSize; if (stat.size <= startOffset) continue; let stream; try { stream = fssync.createReadStream(filePath, { encoding: "utf8", start: startOffset }); } catch (_e) { continue; } const rl = readline.createInterface({ input: stream, crlfDelay: Infinity }); for await (const line of rl) { if (!line || !line.trim()) continue; const skipLineHashes = migrationSkipLineHashes.get(filePath); if (skipLineHashes && skipLineHashes.size > 0) { const lineHash = copilotLineHash(line); const skipCount = skipLineHashes.get(lineHash) || 0; if (skipCount > 0) { if (skipCount === 1) skipLineHashes.delete(lineHash); else skipLineHashes.set(lineHash, skipCount - 1); continue; } } let record; try { record = JSON.parse(line); } catch (_e) { continue; } recordsProcessed++; if (!isCopilotChatSpan(record)) continue; const attrs = record.attributes || {}; // Dedup: CLI puts traceId/spanId at the top level; the Chat extension // file exporter writes LogRecord-shaped entries without either, but // gen_ai.response.id is per-LLM-call unique. const dedupKey = getCopilotDedupKey(record, attrs); if (dedupKey && seenIds.has(dedupKey)) continue; const inputRaw = toNonNegativeInt(attrs["gen_ai.usage.input_tokens"]); const output = toNonNegativeInt(attrs["gen_ai.usage.output_tokens"]); const cacheRead = toNonNegativeInt(attrs["gen_ai.usage.cache_read.input_tokens"]); // Copilot CLI: cache_write.input_tokens; Copilot Chat extension: cache_creation.input_tokens const cacheWrite = toNonNegativeInt( attrs["gen_ai.usage.cache_write.input_tokens"] ?? attrs["gen_ai.usage.cache_creation.input_tokens"], ); // Copilot CLI: reasoning.output_tokens; Copilot Chat extension: reasoning_tokens const reasoning = toNonNegativeInt( attrs["gen_ai.usage.reasoning.output_tokens"] ?? attrs["gen_ai.usage.reasoning_tokens"], ); // OTEL input_tokens INCLUDES cache_read — subtract per project convention const cacheReadClamped = Math.min(cacheRead, inputRaw); const input = Math.max(0, inputRaw - cacheReadClamped); const totalInteresting = input + output + cacheReadClamped + cacheWrite + reasoning; if (totalInteresting === 0) continue; // CLI Span uses endTime/startTime; Chat extension LogRecord uses hrTime/hrTimeObserved. const tsMs = copilotOtelTimeToMs(record.endTime) || copilotOtelTimeToMs(record.startTime) || copilotOtelTimeToMs(record.hrTime) || copilotOtelTimeToMs(record.hrTimeObserved); if (!tsMs) continue; const tsIso = new Date(tsMs).toISOString(); const bucketStart = toUtcHalfHourStart(tsIso); if (!bucketStart) continue; const model = normalizeModelInput(pickCopilotModel(attrs)) || "github-copilot"; const delta = { input_tokens: input, cached_input_tokens: cacheReadClamped, cache_creation_input_tokens: cacheWrite, output_tokens: output, reasoning_output_tokens: reasoning, total_tokens: input + output + cacheReadClamped + cacheWrite + reasoning, conversation_count: 1, }; const bucket = getHourlyBucket(hourlyState, "copilot", model, bucketStart); addTotals(bucket.totals, delta); touchedBuckets.add(bucketKey("copilot", model, bucketStart)); eventsAggregated++; if (dedupKey) seenIds.add(dedupKey); if (cb) { cb({ index: fileIdx + 1, total: files.length, recordsProcessed, eventsAggregated, bucketsQueued: touchedBuckets.size, }); } } // Re-stat after readline drains: file may have been appended during the // parse loop. Without this, those new lines would be replayed next run // (dedup catches records with traceId+spanId, but spans missing either // would be double-counted). let postStat = stat; try { postStat = fssync.statSync(filePath); } catch (_e) {} fileOffsets[filePath] = { size: postStat.size, mtimeMs: postStat.mtimeMs, ino: postStat.ino }; } // Cap dedup set to last 10k IDs to bound state size const seenArr = Array.from(seenIds); const cappedSeen = seenArr.length > 10_000 ? seenArr.slice(seenArr.length - 10_000) : seenArr; const bucketsQueued = await enqueueTouchedBuckets({ queuePath, hourlyState, touchedBuckets }); const updatedAt = new Date().toISOString(); hourlyState.updatedAt = updatedAt; cursors.hourly = hourlyState; cursors.copilot = { ...copilotState, version: COPILOT_PARSER_VERSION, seenIds: cappedSeen, fileOffsets, updatedAt, }; return { recordsProcessed, eventsAggregated, bucketsQueued }; } // ───────────────────────────────────────────────────────────────────────────── // Grok Build (xAI) — passive reader for ~/.grok/sessions/**/updates.jsonl + signals.json // Triggered either by full scan in sync or by the SessionEnd hook writing a signal. // updates.jsonl exposes cumulative totalTokens metadata. Grok still does not // expose a stable prompt/output/cache split locally, so these rows keep the // estimated input/output split while using better local telemetry for totals. // ───────────────────────────────────────────────────────────────────────────── const GROK_ESTIMATED_INPUT_RATIO = 0.8; const GROK_CURSOR_VERSION = 3; function resolveGrokBuildHome(env = process.env) { return ( env.TOKENTRACKER_GROK_HOME || env.GROK_HOME || path.join(require("node:os").homedir(), ".grok") ); } function resolveGrokBuildSessions(env = process.env) { const home = resolveGrokBuildHome(env); const sessionsRoot = path.join(home, "sessions"); if (!fssync.existsSync(sessionsRoot)) return []; const results = []; let cwdDirs = []; try { cwdDirs = fssync.readdirSync(sessionsRoot); } catch { return []; } for (const cwdDir of cwdDirs) { const cwdPath = path.join(sessionsRoot, cwdDir); let stat; try { stat = fssync.statSync(cwdPath); } catch { continue; } if (!stat.isDirectory()) continue; let sessionIds = []; try { sessionIds = fssync.readdirSync(cwdPath); } catch { continue; } for (const sid of sessionIds) { const sessionDir = path.join(cwdPath, sid); const signalsPath = path.join(sessionDir, "signals.json"); const updatesPath = path.join(sessionDir, "updates.jsonl"); if (fssync.existsSync(signalsPath) || fssync.existsSync(updatesPath)) { results.push({ sessionDir, updatesPath, signalsPath, summaryPath: path.join(sessionDir, "summary.json"), sessionId: sid, encodedCwd: cwdDir }); } } } return results; } function normalizeGrokSessionSnapshots(grokState) { const snapshots = {}; if (grokState?.sessionSnapshots && typeof grokState.sessionSnapshots === "object") { for (const [sessionId, snapshot] of Object.entries(grokState.sessionSnapshots)) { const safeSessionId = normalizeModelInput(sessionId); if (!safeSessionId || !snapshot || typeof snapshot !== "object") continue; const totalTokens = normalizeNonNegativeNumber(snapshot.totalTokens); snapshots[safeSessionId] = { totalTokens, messageCount: normalizeNonNegativeNumber(snapshot.messageCount), model: normalizeModelInput(snapshot.model) || null, source: normalizeModelInput(snapshot.source) || null, lastEventId: normalizeModelInput(snapshot.lastEventId) || null, lastEventTimestamp: normalizeModelInput(snapshot.lastEventTimestamp) || null, updatedAt: normalizeModelInput(snapshot.updatedAt) || null, legacySeen: snapshot.legacySeen === true, }; } } if (Array.isArray(grokState?.seenSessions)) { for (const sessionId of grokState.seenSessions) { const safeSessionId = normalizeModelInput(sessionId); if (!safeSessionId || snapshots[safeSessionId]) continue; snapshots[safeSessionId] = { totalTokens: 0, messageCount: 0, model: null, updatedAt: normalizeModelInput(grokState.updatedAt) || null, legacySeen: true, }; } } return snapshots; } function capGrokSessionSnapshots(sessionSnapshots) { const entries = Object.entries(sessionSnapshots); if (entries.length <= 10_000) return sessionSnapshots; return Object.fromEntries(entries.slice(entries.length - 10_000)); } function readGrokJsonFile(filePath) { if (!filePath) return null; try { return JSON.parse(fssync.readFileSync(filePath, "utf8")); } catch { return null; } } function grokUpdatesPathForSession(sess) { if (typeof sess?.updatesPath === "string" && sess.updatesPath.trim()) return sess.updatesPath; if (typeof sess?.sessionDir === "string" && sess.sessionDir.trim()) { return path.join(sess.sessionDir, "updates.jsonl"); } return null; } function grokSessionIdFor(sess) { return ( normalizeModelInput(sess?.sessionId) || (normalizeModelInput(sess?.sessionDir) ? path.basename(sess.sessionDir) : null) ); } function grokModelFromSignals(signals) { return ( normalizeModelInput(signals?.primaryModelId) || normalizeModelInput(Array.isArray(signals?.modelsUsed) ? signals.modelsUsed[0] : null) || normalizeModelInput(signals?.model) || "grok-build" ); } function grokLastActiveFromSignals(signals, summary) { return ( normalizeModelInput(signals?.lastActiveAt) || normalizeModelInput(signals?.updatedAt) || normalizeModelInput(signals?.lastActive) || normalizeModelInput(summary?.updated_at) || normalizeModelInput(summary?.updatedAt) || new Date().toISOString() ); } function grokMessageCountFromSignals(signals) { return normalizeNonNegativeNumber( signals?.assistantMessageCount ?? signals?.turnCount ?? signals?.num_chat_messages ?? signals?.messageCount, ); } function grokEffectiveTotalFromSignals(signals) { if (!signals || typeof signals !== "object") return 0; const beforeCompaction = normalizeNonNegativeNumber(signals.totalTokensBeforeCompaction); const totalTokens = normalizeNonNegativeNumber(signals.totalTokens); if (signals.contextTokensUsed == null) { return beforeCompaction + totalTokens; } return Math.max( totalTokens, beforeCompaction + normalizeNonNegativeNumber(signals.contextTokensUsed), ); } function grokTimestampToIso(value) { if (value == null) return null; if (typeof value === "number") { if (!Number.isFinite(value) || value <= 0) return null; const millis = value < 10_000_000_000 ? value * 1000 : value; const dt = new Date(millis); return Number.isFinite(dt.getTime()) ? dt.toISOString() : null; } if (typeof value === "string") { const trimmed = value.trim(); if (!trimmed) return null; if (/^[0-9]+(?:\.[0-9]+)?$/.test(trimmed)) { return grokTimestampToIso(Number(trimmed)); } const dt = new Date(trimmed); return Number.isFinite(dt.getTime()) ? dt.toISOString() : null; } return null; } function grokTimestampFromUpdate(meta, record, fallback) { return ( grokTimestampToIso(meta?.agentTimestampMs) || grokTimestampToIso(meta?.timestampMs) || grokTimestampToIso(record?.timestamp_ms) || grokTimestampToIso(record?.timestamp) || grokTimestampToIso(record?.time) || fallback || null ); } function grokEventId(value, fallback) { if (typeof value === "string" && value.trim()) return value.trim(); if (typeof value === "number" && Number.isFinite(value)) return String(value); return fallback; } async function readGrokUpdateTokenEvents(updatesPath, fallbackTimestamp) { if (!updatesPath) return { events: [], recordsProcessed: 0 }; try { const stat = fssync.statSync(updatesPath); if (!stat.isFile()) return { events: [], recordsProcessed: 0 }; } catch { return { events: [], recordsProcessed: 0 }; } const events = []; let lineIndex = 0; const input = fssync.createReadStream(updatesPath, { encoding: "utf8" }); const rl = readline.createInterface({ input, crlfDelay: Infinity }); try { for await (const line of rl) { lineIndex++; if (!line || !line.trim()) continue; let record; try { record = JSON.parse(line); } catch { continue; } const meta = record?.params?._meta || record?._meta; if (!meta || typeof meta !== "object") continue; const totalTokens = normalizeNonNegativeNumber(meta.totalTokens); if (totalTokens <= 0) continue; const timestamp = grokTimestampFromUpdate(meta, record, fallbackTimestamp); events.push({ totalTokens, timestamp, eventId: grokEventId(meta.eventId ?? record?.eventId ?? record?.id, String(lineIndex)), }); } } catch { return { events, recordsProcessed: events.length }; } return { events, recordsProcessed: events.length }; } function estimateGrokTokenDelta(totalTokens, conversationCount, options = {}) { const total = Math.trunc(normalizeNonNegativeNumber(totalTokens)); const inputTokens = Math.round(total * GROK_ESTIMATED_INPUT_RATIO); const outputTokens = Math.max(0, total - inputTokens); const rawConversations = Math.trunc(normalizeNonNegativeNumber(conversationCount)); const conversations = options.allowZeroConversationCount ? rawConversations : Math.max(1, rawConversations); return { input_tokens: inputTokens, cached_input_tokens: 0, cache_creation_input_tokens: 0, output_tokens: outputTokens, reasoning_output_tokens: 0, total_tokens: total, billable_total_tokens: total, conversation_count: conversations, }; } async function parseGrokBuildIncremental({ sessions, cursors = {}, queuePath, onProgress, env = process.env } = {}) { if (queuePath) await ensureDir(path.dirname(queuePath)); const hourlyState = normalizeHourlyState(cursors?.hourly); const grokState = cursors.grok && typeof cursors.grok === "object" ? { ...cursors.grok } : {}; let sessionSnapshots = normalizeGrokSessionSnapshots(grokState); const touchedBuckets = new Set(); const sessionList = Array.isArray(sessions) && sessions.length > 0 ? sessions : resolveGrokBuildSessions(env); let eventsAggregated = 0; for (let index = 0; index < sessionList.length; index++) { const sess = sessionList[index]; const sessionId = grokSessionIdFor(sess); if (!sessionId) { if (onProgress) onProgress({ index: index + 1, total: sessionList.length, bucketsQueued: touchedBuckets.size }); continue; } const signals = sess?.signals && typeof sess.signals === "object" ? sess.signals : readGrokJsonFile(sess?.signalsPath); const safeSignals = signals && typeof signals === "object" ? signals : {}; const summary = sess?.summary && typeof sess.summary === "object" ? sess.summary : readGrokJsonFile(sess?.summaryPath) || {}; const previous = sessionSnapshots[sessionId] || {}; const previousTotal = normalizeNonNegativeNumber(previous.totalTokens); const previousMessageCount = normalizeNonNegativeNumber(previous.messageCount); const messageCount = grokMessageCountFromSignals(safeSignals); const model = grokModelFromSignals(safeSignals); const lastActive = grokLastActiveFromSignals(safeSignals, summary); let highWatermark = previousTotal; let observedTotal = previousTotal; let tokenDeltaForSession = 0; let finalTouchedHourStart = null; let source = previous.source || null; let lastEventId = previous.lastEventId || null; let lastEventTimestamp = previous.lastEventTimestamp || null; const pendingTokenDeltas = []; const recordTokenDelta = (deltaTokens, timestamp, deltaSource) => { const hourStartStr = toUtcHalfHourStart(timestamp) || toUtcHalfHourStart(Date.now()); if (!hourStartStr) return false; pendingTokenDeltas.push({ deltaTokens, hourStartStr }); tokenDeltaForSession += deltaTokens; finalTouchedHourStart = hourStartStr; source = deltaSource; lastEventTimestamp = timestamp || lastEventTimestamp; return true; }; const updates = await readGrokUpdateTokenEvents(grokUpdatesPathForSession(sess), lastActive); for (const event of updates.events) { observedTotal = Math.max(observedTotal, event.totalTokens); lastEventId = event.eventId || lastEventId; lastEventTimestamp = event.timestamp || lastEventTimestamp; if (event.totalTokens <= highWatermark) continue; const deltaTokens = event.totalTokens - highWatermark; highWatermark = event.totalTokens; recordTokenDelta(deltaTokens, event.timestamp || lastActive, "updates"); } const effectiveSignalTotal = grokEffectiveTotalFromSignals(safeSignals); observedTotal = Math.max(observedTotal, effectiveSignalTotal); if (effectiveSignalTotal > highWatermark) { const deltaTokens = effectiveSignalTotal - highWatermark; highWatermark = effectiveSignalTotal; recordTokenDelta(deltaTokens, lastActive, "signals"); } const finalTotal = Math.max(previousTotal, highWatermark, observedTotal); const legacyBaselineOnly = previous.legacySeen && previousTotal === 0 && finalTotal > 0; if (!legacyBaselineOnly) { for (const pending of pendingTokenDeltas) { const delta = estimateGrokTokenDelta(pending.deltaTokens, 0, { allowZeroConversationCount: true }); const bucket = getHourlyBucket(hourlyState, "grok", model, pending.hourStartStr); addTotals(bucket.totals, delta); touchedBuckets.add(bucketKey("grok", model, pending.hourStartStr)); eventsAggregated++; } } if (!legacyBaselineOnly && tokenDeltaForSession > 0 && finalTouchedHourStart) { const deltaMessageCount = messageCount > previousMessageCount ? messageCount - previousMessageCount : 1; const bucket = getHourlyBucket(hourlyState, "grok", model, finalTouchedHourStart); addTotals(bucket.totals, { conversation_count: deltaMessageCount }); touchedBuckets.add(bucketKey("grok", model, finalTouchedHourStart)); } if (finalTotal > 0 && (tokenDeltaForSession > 0 || previousTotal > 0 || legacyBaselineOnly)) { sessionSnapshots[sessionId] = { totalTokens: finalTotal, messageCount: Math.max(previousMessageCount, messageCount), model, source: source || previous.source || null, lastEventId, lastEventTimestamp, updatedAt: new Date().toISOString(), }; } if (onProgress) { onProgress({ index: index + 1, total: sessionList.length, bucketsQueued: touchedBuckets.size }); } } const bucketsQueued = queuePath ? await enqueueTouchedBuckets({ queuePath, hourlyState, touchedBuckets }) : 0; hourlyState.updatedAt = new Date().toISOString(); cursors.hourly = hourlyState; sessionSnapshots = capGrokSessionSnapshots(sessionSnapshots); cursors.grok = { ...grokState, version: GROK_CURSOR_VERSION, sessionSnapshots, seenSessions: Object.keys(sessionSnapshots), updatedAt: new Date().toISOString() }; return { recordsProcessed: eventsAggregated, eventsAggregated, bucketsQueued }; } function resolveAntigravityBrainDirs(geminiHome) { if (!geminiHome || typeof geminiHome !== "string") return []; return [ path.join(geminiHome, "antigravity", "brain"), path.join(geminiHome, "antigravity-ide", "brain"), path.join(geminiHome, "antigravity-cli", "brain"), ]; } async function listAntigravitySessionFiles(brainDir) { const out = []; if (!brainDir || typeof brainDir !== "string") return out; const entries = await safeReadDir(brainDir).catch(() => []); for (const entry of entries) { if (!entry.isDirectory()) continue; const logsDir = path.join(brainDir, entry.name, ".system_generated", "logs"); const transcriptPath = path.join(logsDir, "transcript.jsonl"); const st = await fs.stat(transcriptPath).catch(() => null); if (st && st.isFile()) { out.push(transcriptPath); } } out.sort((a, b) => a.localeCompare(b)); return out; } async function listAntigravityTranscripts(geminiHome) { const dirs = resolveAntigravityBrainDirs(geminiHome); const lists = await Promise.all(dirs.map((dir) => listAntigravitySessionFiles(dir))); return lists.flat(); } async function parseAntigravityIncremental({ sessionFiles, cursors, queuePath, projectQueuePath, onProgress, source, publicRepoResolver, }) { await ensureDir(path.dirname(queuePath)); let filesProcessed = 0; let eventsAggregated = 0; const cb = typeof onProgress === "function" ? onProgress : null; const files = Array.isArray(sessionFiles) ? sessionFiles : []; const totalFiles = files.length; const hourlyState = normalizeHourlyState(cursors?.hourly); const projectEnabled = typeof projectQueuePath === "string" && projectQueuePath.length > 0; const projectState = projectEnabled ? normalizeProjectState(cursors?.projectHourly) : null; const projectTouchedBuckets = projectEnabled ? new Set() : null; const projectMetaCache = projectEnabled ? new Map() : null; const publicRepoCache = projectEnabled ? new Map() : null; const touchedBuckets = new Set(); const defaultSource = normalizeSourceInput(source) || "antigravity"; if (!cursors.files || typeof cursors.files !== "object") { cursors.files = {}; } for (let idx = 0; idx < files.length; idx++) { const entry = files[idx]; const filePath = typeof entry === "string" ? entry : entry?.path; if (!filePath) continue; const fileSource = typeof entry === "string" ? defaultSource : normalizeSourceInput(entry?.source) || defaultSource; const st = await fs.stat(filePath).catch(() => null); if (!st || !st.isFile()) continue; const key = filePath; const prev = cursors.files[key] || null; const inode = st.ino || 0; const size = Number.isFinite(st.size) ? st.size : 0; const mtimeMs = Number.isFinite(st.mtimeMs) ? st.mtimeMs : 0; const unchanged = prev && prev.inode === inode && prev.size === size && prev.mtimeMs === mtimeMs; if (unchanged) { filesProcessed += 1; if (cb) { cb({ index: idx + 1, total: totalFiles, filePath, filesProcessed, eventsAggregated, bucketsQueued: touchedBuckets.size, }); } continue; } const sameFile = prev && prev.inode === inode; const lastLine = sameFile ? Number(prev.lastLine || 0) : 0; const initialContextTokens = sameFile ? Number(prev.contextTokens || 0) : 0; const initialPrevContext = sameFile ? Number(prev.previousContextTokens || 0) : 0; const initialModel = sameFile && typeof prev.currentModel === "string" ? prev.currentModel : null; const projectContext = projectEnabled ? await resolveProjectContextForFile({ filePath, projectMetaCache, publicRepoCache, publicRepoResolver, projectState, }) : null; const projectRef = projectContext?.projectRef || null; const projectKey = projectContext?.projectKey || null; const result = await parseAntigravityFile({ filePath, lastLine, initialContextTokens, initialPrevContext, initialModel, hourlyState, touchedBuckets, source: fileSource, projectState, projectTouchedBuckets, projectRef, projectKey, }); cursors.files[key] = { inode, size, mtimeMs, lastLine: result.lastLine, contextTokens: result.contextTokens, previousContextTokens: result.previousContextTokens, currentModel: result.currentModel, updatedAt: new Date().toISOString(), }; filesProcessed += 1; eventsAggregated += result.eventsAggregated; if (cb) { cb({ index: idx + 1, total: totalFiles, filePath, filesProcessed, eventsAggregated, bucketsQueued: touchedBuckets.size, }); } } const bucketsQueued = await enqueueTouchedBuckets({ queuePath, hourlyState, touchedBuckets }); const projectBucketsQueued = projectEnabled ? await enqueueTouchedProjectBuckets({ projectQueuePath, projectState, projectTouchedBuckets }) : 0; hourlyState.updatedAt = new Date().toISOString(); cursors.hourly = hourlyState; if (projectState) { projectState.updatedAt = new Date().toISOString(); cursors.projectHourly = projectState; } return { filesProcessed, eventsAggregated, bucketsQueued, projectBucketsQueued }; } async function parseAntigravityFile({ filePath, lastLine, initialContextTokens, initialPrevContext, initialModel, hourlyState, touchedBuckets, source, projectState, projectTouchedBuckets, projectRef, projectKey, }) { const raw = await fs.readFile(filePath, "utf8").catch(() => ""); if (!raw.trim()) { return { lastLine: 0, eventsAggregated: 0, contextTokens: 0, previousContextTokens: 0, currentModel: null, }; } const lines = raw .split(/\r?\n/) .map((line) => line.trim()) .filter(Boolean); let eventsAggregated = 0; // Resume cached context-token total + model so historical lines (i < lastLine) // don't need to be re-tokenized on every sync. Falls back to a full re-walk // when the cached state is missing (legacy cursor) or the file rotated. const canResume = Number.isFinite(lastLine) && lastLine > 0 && lastLine <= lines.length; const cachedTokens = Number.isFinite(initialContextTokens) ? initialContextTokens : 0; const cachedPrev = Number.isFinite(initialPrevContext) ? initialPrevContext : 0; const cachedModel = typeof initialModel === "string" ? initialModel : null; const resumed = canResume && (cachedTokens > 0 || cachedModel !== null); const scanStart = resumed ? lastLine : 0; let currentModel = resumed ? cachedModel : null; let contextTokens = resumed ? cachedTokens : 0; // Snapshot of contextTokens at the last PLANNER_RESPONSE we billed for. Only // tokens accumulated AFTER that point count as new input on the next planner // call — prevents O(N²) double-counting of the full history every turn. let previousContextTokens = resumed ? cachedPrev : 0; let lastCompletedLine = Math.min(Number.isFinite(lastLine) ? lastLine : 0, lines.length); for (let i = scanStart; i < lines.length; i++) { const line = lines[i]; let parsed; try { parsed = JSON.parse(line); } catch (_e) { if (i >= lastLine) break; continue; } const isNewEvent = i >= lastLine; if (parsed.type === "USER_INPUT" || parsed.type === "USER_SETTINGS_CHANGE") { const content = typeof parsed.content === "string" ? parsed.content : ""; const model = parseAntigravityModelSelection(content); if (model) currentModel = model; } const eventContextTokens = antigravityContextTokens(parsed); if (!isNewEvent) { contextTokens += eventContextTokens; lastCompletedLine = i + 1; continue; } const timestamp = parsed.created_at; if (!timestamp) { contextTokens += eventContextTokens; lastCompletedLine = i + 1; continue; } const bucketStart = toUtcHalfHourStart(timestamp); if (!bucketStart) { contextTokens += eventContextTokens; lastCompletedLine = i + 1; continue; } let model = currentModel || "antigravity-unknown"; let delta = initTotals(); let billedPlanner = false; if (parsed.type === "PLANNER_RESPONSE") { const content = typeof parsed.content === "string" ? parsed.content : ""; const thinking = typeof parsed.thinking === "string" ? parsed.thinking : ""; const inputDelta = Math.max(0, contextTokens - previousContextTokens); const outputTokens = antigravityValueTokens(content) + antigravityValueTokens(parsed.tool_calls); const reasoningTokens = antigravityValueTokens(thinking); delta.input_tokens = inputDelta; delta.output_tokens = outputTokens; delta.reasoning_output_tokens = reasoningTokens; // Match the mainstream convention (Codebuddy / Kilocode / OMP / Hermes): // total_tokens = sum of every token column. No cache columns here. delta.total_tokens = inputDelta + outputTokens + reasoningTokens; delta.conversation_count = 1; billedPlanner = delta.total_tokens > 0; } if (!billedPlanner) { contextTokens += eventContextTokens; lastCompletedLine = i + 1; continue; } const bucket = getHourlyBucket(hourlyState, source, model, bucketStart); addTotals(bucket.totals, delta); touchedBuckets.add(bucketKey(source, model, bucketStart)); if (projectKey && projectState && projectTouchedBuckets) { const projectBucket = getProjectBucket( projectState, projectKey, source, bucketStart, projectRef, ); addTotals(projectBucket.totals, delta); projectTouchedBuckets.add(projectBucketKey(projectKey, source, bucketStart)); } eventsAggregated += 1; // Snapshot the pre-planner context first. The planner's own content+tool_calls // (eventContextTokens, added below) become part of the next turn's history, // so they MUST be billed as input on the next planner — don't fold them into // previousContextTokens or that history vanishes from the totals. previousContextTokens = contextTokens; contextTokens += eventContextTokens; lastCompletedLine = i + 1; } return { lastLine: lastCompletedLine, eventsAggregated, contextTokens, previousContextTokens, currentModel, }; } function parseAntigravityModelSelection(content) { if (typeof content !== "string" || !content) return null; const match = content.match( /changed setting `Model Selection` from .*? to ([^`\n]+?)(?:\s*\([^)]*\))?\.(?:\s+|$)/i, ); if (!match) return null; return normalizeAntigravityTranscriptModel(match[1]); } function normalizeAntigravityTranscriptModel(modelName) { if (!modelName || typeof modelName !== "string") return null; let slug = modelName .trim() .replace(/\([^)]*\)/g, " ") .replace(/\b(thinking|xhigh|high|medium|low|fast)\b/gi, " ") .toLowerCase() .replace(/[^a-z0-9.]+/g, "-") .replace(/^-+|-+$/g, "") .replace(/-{2,}/g, "-"); if (!slug) return null; for (const marker of ["gemini", "claude", "gpt"]) { const idx = slug.indexOf(marker); if (idx >= 0) { slug = slug.slice(idx); break; } } if (/^(gemini|claude|gpt)-/.test(slug)) return slug; return `antigravity-${slug}`; } function antigravityContextTokens(event) { if (!event || typeof event !== "object") return 0; let tokens = antigravityValueTokens(event.content); if (event.type === "PLANNER_RESPONSE" && event.tool_calls) { tokens += antigravityValueTokens(event.tool_calls); } return tokens; } function antigravityValueTokens(value) { if (typeof value === "string") return estimateAntigravityTokens(value); if (value == null) return 0; try { return estimateAntigravityTokens(JSON.stringify(value)); } catch (_e) { return 0; } } function estimateAntigravityTokens(text) { if (typeof text !== "string" || text.length === 0) return 0; let cjk = 0; let other = 0; for (const ch of text) { if (isCjkCodePoint(ch.codePointAt(0))) { cjk += 1; } else { other += 1; } } return cjk + Math.ceil(other / 4); } function isCjkCodePoint(code) { return ( (code >= 0x3400 && code <= 0x4dbf) || (code >= 0x4e00 && code <= 0x9fff) || (code >= 0x3040 && code <= 0x30ff) ); } module.exports = { listRolloutFiles, listClaudeProjectFiles, listGeminiSessionFiles, listOpencodeMessageFiles, readOpencodeDbMessages, resolveKiroDbPath, resolveKiroJsonlPath, resolveHermesPath, resolveHermesDbPath, resolveCopilotOtelPaths, parseRolloutIncremental, parseClaudeIncremental, parseGeminiIncremental, parseOpencodeIncremental, parseOpencodeDbIncremental, parseOpenclawIncremental, parseCursorApiIncremental, parseKiroIncremental, parseHermesIncremental, parseCopilotIncremental, resolveKimiWireFiles, resolveKimiDefaultModel, parseKimiIncremental, resolveKimiCodeHome, resolveKimiCodeWireFiles, resolveKimiCodeDefaultModel, parseKimiCodeIncremental, resolveCodebuddyHome, resolveCodebuddyProjectFiles, resolveCodebuddyDefaultModel, parseCodebuddyIncremental, resolveKiroCliSessionFiles, resolveKiroCliDbPath, parseKiroCliIncremental, resolveOmpHome, resolveOmpAgentDir, resolveOmpSessionFiles, resolveOmpDefaultModel, parseOmpIncremental, resolveKilocodeRoots, resolveKilocodeTaskFiles, normalizeKilocodeProviderToModel, parseKilocodeIncremental, resolveRoocodeTaskFiles, readRoocodeTaskModel, normalizeRoocodeModel, parseRoocodeIncremental, resolveZedDbPath, decodeZedThreadBlob, extractZedTotals, sumZedRequestUsage, readZedUsage, parseZedIncremental, resolveGooseDbPath, parseGooseModelName, parseGooseCreatedAt, parseGooseIncremental, resolveDroidSessionsDir, resolveDroidSessionsDirs, listDroidSettingsFiles, normalizeDroidModelName, normalizeDroidProvider, inferDroidProviderFromModel, defaultDroidModelForProvider, droidSessionIdFromPath, extractDroidModelFromSidecarJsonl, applyDroidTotalFallback, parseDroidIncremental, resolvePiHome, resolvePiAgentDir, resolvePiSessionFiles, resolvePiDefaultModel, parsePiIncremental, piAgentDirCollidesWithOmp, resolveCraftConfigDir, resolveCraftWorkspaceRoots, resolveCraftSessionFiles, resolveCraftDefaultModel, parseCraftIncremental, // Exposed for regression tests covering cache-token accounting. normalizeGeminiTokens, normalizeOpencodeTokens, sameGeminiTotals, diffGeminiTotals, // Exposed so the queue-repair migration can mutate cursors state in the // same key format sync uses elsewhere. bucketKey, totalsKey, claudeMessageDedupKey, groupBucketKey, // Exposed for regression tests covering nested-group remote URLs. canonicalizeProjectRef, deriveProjectKeyFromRef, // Grok Build (xAI) — SessionEnd hook + passive updates.jsonl/signals.json reader resolveGrokBuildHome, resolveGrokBuildSessions, parseGrokBuildIncremental, // Antigravity (Google Gemini) - Session logs parser resolveAntigravityBrainDirs, listAntigravitySessionFiles, listAntigravityTranscripts, parseAntigravityIncremental, estimateAntigravityTokens, isCjkCodePoint, };