/** * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ import { XPCOMUtils } from "resource://gre/modules/XPCOMUtils.sys.mjs"; import { openAIEngine, renderPrompt, checkMajorVersion, } from "moz-src:///browser/components/aiwindow/models/Utils.sys.mjs"; const lazy = XPCOMUtils.declareLazy({ RemoteSettings: "resource://services-settings/remote-settings.sys.mjs", setTimeout: "resource://gre/modules/Timer.sys.mjs", ChatStore: "moz-src:///browser/components/aiwindow/ui/modules/ChatStore.sys.mjs", }); ChromeUtils.defineLazyGetter(lazy, "console", function () { return console.createInstance({ prefix: "LLMTelemetry", maxLogLevelPref: "browser.smartwindow.telemetryLogLevel", }); }); export const TELEMETRY_MAJOR_VERSIONS = Object.freeze({ wasSuccessful: 1, conversationCategory: 1, }); export const TRIGGER_MAJOR_VERSIONS = Object.freeze({ uniform_sample: 1, uniform_sample_at_turn: 1, min_turns: 1, }); // Registry of named check strategies export const TRIGGER_CHECK_STRATEGIES = { uniform_sample: (conversation, _params) => conversation.currentTurnIndex?.() == 0, uniform_sample_at_turn: (conversation, { turn }) => conversation._telemetryUniformSample === true && conversation.currentTurnIndex?.() == turn, min_turns: (conversation, { minTurns }) => (conversation.currentTurnIndex?.() ?? 0) >= minTurns, }; // Probability that a turn-1 conversation is flagged for uniform sampling. export const UNIFORM_SAMPLING_NAME = "uniform_sample"; const UNKNOWN = "unknown"; const RS_TELEMETRY_PROMPTS_COLLECTION = "ai-window-telemetry-prompts"; const RS_TELEMETRY_TRIGGERS_COLLECTION = "ai-window-telemetry-triggers"; // ============================================================ // Trigger // ============================================================ /** * Represents a single telemetry trigger. * * A trigger is evaluated against the conversation after each agent response. * If it fires and passes the sampling check, the trigger is included in the * set passed to runTelemetry. */ export class Trigger { /** * @param {string} name - Unique identifier, must match the "triggers" field in RS records. * @param {function(object): boolean} checkFn - Returns true if the trigger should fire. * @param {number} samplingProbability - Fraction [0, 1] of fires that proceed to evaluation. * @param {string} description - Description of trigger */ constructor(name, checkFn, samplingProbability = 1.0, description = "") { this.name = name; this.check = checkFn; this.samplingProbability = samplingProbability; this.description = description; } } // ============================================================ // telemetryPromptEngine // ============================================================ /** * Remote Settings records in RS_TELEMETRY_COLLECTION are expected to have: * - telemetry_name {string} Unique identifier for the evaluation prompt * - triggers {string[]} Trigger names this prompt applies to * - prompts {string} System prompt content * - model {string} Model ID to use for inference */ export class TelemetryPromptEngine { /** @type {object} */ #promptRecord; /** @type {object} */ #engineInstance; /** * Builds a telemetryPromptEngine for the given RS record. * * @param {object} promptRecord * @returns {Promise} */ static async build(promptRecord) { const engine = new TelemetryPromptEngine(); engine.#promptRecord = promptRecord; const extraHeadersPref = Services.prefs.getStringPref( "browser.smartwindow.extraHeaders", "{}" ); let extraHeaders = {}; try { extraHeaders = JSON.parse(extraHeadersPref); } catch (e) { lazy.console.error( "Failed to parse extra headers for telemetry engine:", e ); } engine.#engineInstance = await openAIEngine._createEngine({ apiKey: "", backend: "openai", baseURL: openAIEngine.endpoint, featureId: "llm-telemetry", flowId: null, modelId: promptRecord.model ?? null, modelRevision: "main", taskName: "text-generation", serviceType: promptRecord.service_type ?? "ai", purpose: promptRecord.purpose ?? "telemetry", extraHeaders, }); return engine; } verifyResult(result) { const schema = this.#promptRecord.output_schema; const finalMapping = {}; let metrics; try { metrics = JSON.parse(result.finalOutput); } catch { return Object.fromEntries(Object.keys(schema).map(key => [key, UNKNOWN])); } for (const key of Object.keys(schema)) { if (!schema[key]?.includes(metrics[key])) { finalMapping[key] = UNKNOWN; } else { finalMapping[key] = metrics[key]; } } return finalMapping; } /** * Runs the evaluation prompt against the conversation. * * @param {object} conversation * @returns {Promise} Raw LLM response */ async run(conversation) { const fxAccountToken = await openAIEngine.getFxAccountToken(); const messages = conversation.getMessagesInOpenAiFormat(); const prompt = renderPrompt(this.#promptRecord.prompt, { chatConversation: JSON.stringify(messages), fields: JSON.stringify(Object.keys(this.#promptRecord.output_schema)), }); const result = await this.#engineInstance.run({ fxAccountToken, args: [{ role: "system", content: prompt }], }); lazy.console.debug("Returning: ", result); return this.verifyResult(result); } } // ============================================================ // TelemetryEngine // ============================================================ /** * Orchestrates conversation telemetry evaluation. Loads trigger definitions * from Remote Settings, checks which triggers fire for a given conversation, * and runs LLM-based prompt evaluations for each matched trigger. * */ export class TelemetryEngine { _triggers = null; /** * * @param {object} conversation * @returns {Trigger[]} */ async _fetchRecords(collection, fallback = []) { try { const client = lazy.RemoteSettings(collection); const records = await client.get(); return records; } catch (e) { lazy.console.error("Telemetry: failed to fetch records:", e); return fallback; } } async getTriggerDefinitions() { if (this._triggers) { return; } const triggerRecords = await this._fetchRecords( RS_TELEMETRY_TRIGGERS_COLLECTION ); this._triggers = triggerRecords .filter(def => TRIGGER_CHECK_STRATEGIES[def.check]) .filter(def => checkMajorVersion(def.version, TRIGGER_MAJOR_VERSIONS[def.check]) ) .map( def => new Trigger( def.name, conversation => TRIGGER_CHECK_STRATEGIES[def.check]( conversation, def.params ?? {} ), def.sampling_probability ?? 1.0, def.description ?? "" ) ); } _getRandom() { return Math.random(); } async getTriggers(conversation) { await this.getTriggerDefinitions(); if (!conversation._checkedTelemetryTriggers) { conversation._checkedTelemetryTriggers = new Set(); } const fired = []; for (const trigger of this._triggers) { if (conversation._checkedTelemetryTriggers.has(trigger.name)) { lazy.console.debug( "checking:", trigger.name, "-> skip (already checked)" ); continue; } lazy.console.debug( "checking:", trigger.name, "-> evaluating", conversation._telemetryUniformSample, conversation.currentTurnIndex?.() ); if (trigger.check(conversation)) { conversation._checkedTelemetryTriggers.add(trigger.name); if (this._getRandom() < trigger.samplingProbability) { lazy.console.debug("checking:", trigger.name, "-> fired"); fired.push(trigger); if (trigger.name == UNIFORM_SAMPLING_NAME) { conversation._telemetryUniformSample = true; conversation._telemetryUniformProbability = trigger.samplingProbability; } } } } return fired; } /** * Runs LLM-based evaluations for all prompt records that match any of the * fired triggers. Prompts that apply to multiple triggers are deduplicated * and run only once. * * @param {Trigger[]} triggers * @param {object} conversation * @returns {Promise>} */ async runTelemetry(triggers, conversation) { if (!triggers.length) { return []; } const allRecords = await this._fetchRecords( RS_TELEMETRY_PROMPTS_COLLECTION ); const triggerByName = new Map(triggers.map(t => [t.name, t])); const seen = new Set(); const promptsToRun = []; const samplingProbabilities = new Map(); for (const record of allRecords) { const recordTriggers = record.triggers ?? []; if ( !seen.has(record.telemetry_name) && // sometimes multiple triggers will use the same telemetry checkMajorVersion( record.version, TELEMETRY_MAJOR_VERSIONS[record.telemetry_name] ) ) { const matchingTriggerName = recordTriggers.find(t => triggerByName.has(t) ); if (matchingTriggerName) { seen.add(record.telemetry_name); promptsToRun.push(record); samplingProbabilities.set( record.telemetry_name, triggerByName.get(matchingTriggerName).samplingProbability ); } } } const results = await this._runPrompts(promptsToRun, conversation); return results.map(r => ({ ...r, samplingProbability: samplingProbabilities.get(r.telemetry_name), })); } /** * Runs LLM-based evaluations for all telemetry prompts passed by name. * Prompt records that have run_terminal flag set to False will not be run * * @param {string[]} promptNames * @param {object} conversation * @returns {Promise>} */ async runTelemetryByName(promptNames, conversation) { if (!promptNames.length) { return []; } const allRecords = await this._fetchRecords( RS_TELEMETRY_PROMPTS_COLLECTION ); const nameSet = new Set(promptNames); const promptsToRun = allRecords .filter(record => nameSet.has(record.telemetry_name)) .filter(record => checkMajorVersion( record.version, TELEMETRY_MAJOR_VERSIONS[record.telemetry_name] ) ) .filter(record => record.run_terminal); return this._runPrompts(promptsToRun, conversation); } async _runPrompts(promptsToRun, conversation) { const results = []; for (const record of promptsToRun) { let lastError; for (let attempt = 0; attempt < 3; attempt++) { if (attempt > 0) { await new Promise(resolve => lazy.setTimeout(resolve, 2000 * attempt) ); } try { const engine = await TelemetryPromptEngine.build(record); const result = await engine.run(conversation); results.push({ telemetry_name: record.telemetry_name, result, telemetry_version: record.version, }); lastError = null; break; } catch (e) { if (e.message?.includes("429")) { // trying to catch 429 / rate-limiting errors; backoff and retry lastError = e; } else { lazy.console.error( `Telemetry: evaluation failed for ${record.telemetry_name}:`, e ); // other errors fail break; } } } if (lastError) { lazy.console.error( `Encountered error running batch telemetry job: ${lastError}` ); } } lazy.console.debug("Returning!", results); return results; } } export function normalizeMetadata(metadata = {}) { const { chat_version = "", record_type = "", uniform_sampling_probability = 0, trigger_sampling_probability = 0, triggers = [], } = metadata; return { chat_version, record_type, uniform_sampling_probability: Math.round( uniform_sampling_probability * 1000 ), trigger_sampling_probability: Math.round( trigger_sampling_probability * 1000 ), triggers: Array.isArray(triggers) ? JSON.stringify(triggers) : (triggers ?? "[]"), }; } export function submitTelemetryResult( telemetryResults, conversation, modelId, metadata ) { for (const resultObject of telemetryResults ?? []) { const normalized_metadata = normalizeMetadata({ ...metadata, trigger_sampling_probability: resultObject?.samplingProbability ?? 0.0, }); const result = resultObject?.result ?? {}; for (const [attributeName, attributeValue] of Object.entries(result)) { Glean.smartWindow.llmajBasedTelemetry.record({ ...normalized_metadata, chat_id: conversation.id, model: modelId, turn_number: conversation.currentTurnIndex(), telemetry_name: resultObject?.telemetry_name ?? "", attribute_name: attributeName, attribute_value: String(attributeValue ?? UNKNOWN), uniform_sampled: (metadata.triggers ?? []).includes( UNIFORM_SAMPLING_NAME ), trigger_sampled: (metadata.triggers ?? []).some( t => t !== UNIFORM_SAMPLING_NAME ), telemetry_version: resultObject?.telemetry_version ?? "", }); } } } /** * Marks conversations as unprocessed (since new turns have been added) and * runs LLM-as-judge-based telemetry * * @param {ChatConversation} conversation * @param {openAIEngine} engineInstance */ export async function runLLMaJTelemetry(conversation, engineInstance) { const turnIndex = conversation.currentTurnIndex(); await lazy.ChatStore.markLLMTelemetryUnprocessed(conversation.id).catch(e => console.error("Failed to mark telemetry unprocessed:", e) ); const telemetryEngine = new TelemetryEngine(); const triggers = await telemetryEngine.getTriggers(conversation); const telemetryStart = ChromeUtils.now(); telemetryEngine .runTelemetry(triggers, conversation) .then(results => { if (!results.length) { return; } submitTelemetryResult(results, conversation, engineInstance.model, { record_type: "midChat", uniform_sampling_probability: conversation._telemetryUniformProbability, triggers: triggers.map(t => t.name), chat_version: engineInstance.getConfig(engineInstance.feature)?.version, }); const prompts = Object.fromEntries( results.map(r => [r.telemetry_name, turnIndex]) ); const probabilities = Object.fromEntries( results.map(r => [r.telemetry_name, r.samplingProbability]) ); lazy.ChatStore.updateLLMTelemetryRecord( conversation.id, prompts, probabilities, conversation._telemetryUniformProbability ).catch(e => console.error("Failed to update telemetry record:", e)); }) .catch(e => console.error("Telemetry run failed:", e)) .finally(() => { ChromeUtils.addProfilerMarker( "SmartWindow", { startTime: telemetryStart }, "LLMaJTelemetry" ); }); }