/* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ import { XPCOMUtils } from "resource://gre/modules/XPCOMUtils.sys.mjs"; import { HPKEConfigManager } from "resource://gre/modules/HPKEConfigManager.sys.mjs"; let lazy = {}; ChromeUtils.defineLazyGetter(lazy, "logConsole", function () { return console.createInstance({ prefix: "DAPSender", maxLogLevelPref: "toolkit.telemetry.dap.logLevel", }); }); ChromeUtils.defineESModuleGetters(lazy, { setTimeout: "resource://gre/modules/Timer.sys.mjs", ObliviousHTTP: "resource://gre/modules/ObliviousHTTP.sys.mjs", }); XPCOMUtils.defineLazyPreferenceGetter( lazy, "gDapEndpoint", "toolkit.telemetry.dap.leader.url" ); XPCOMUtils.defineLazyPreferenceGetter( lazy, "gLeaderHpke", "toolkit.telemetry.dap.leader.hpke" ); XPCOMUtils.defineLazyPreferenceGetter( lazy, "gHelperHpke", "toolkit.telemetry.dap.helper.hpke" ); /** * The purpose of this singleton is to handle the core DAP (Distributed Aggregation Protocol) functionality. * The current DAP draft standard is available here: * https://github.com/ietf-wg-ppm/draft-ietf-ppm-dap * * This module provides the low-level DAP report generation and sending capabilities, * independent of telemetry preferences. It can be used by any feature that needs * privacy-preserving aggregation. * * NOTE: Do not, under any circumstances, use this mechanism directly * to send telemetry. Use DAPTelemetrySender for that instead. */ export const DAPSender = new (class { /** * @typedef { 'sum' | 'sumvec' | 'histogram' } VDAF */ /** * Task configuration must match a configured task on the DAP server. * * @typedef {object} Task * @property {string} id - The task ID in urlsafe_base64 encoding. * @property {VDAF} vdaf - The VDAF used by the task. * @property {number} [bits] - The bit-width of integers in sum/sumvec measurements. * @property {number} [length] - The number of vector/histogram elements. * @property {number} time_precision - The rounding granularity in seconds * that is applied to timestamps attached * to the report. */ /** * Internal testing function to verify the DAP aggregator keys match current * values advertised by servers. */ async checkHpkeKeys() { async function check_key(url, expected) { let response = await fetch(url + "/hpke_config"); let body = await response.arrayBuffer(); let actual = ChromeUtils.base64URLEncode(body, { pad: false }); if (actual != expected) { throw new Error(`HPKE for ${url} does not match`); } } await Promise.allSettled([ await check_key( Services.prefs.getStringPref("toolkit.telemetry.dap.leader.url"), Services.prefs.getStringPref("toolkit.telemetry.dap.leader.hpke") ), await check_key( Services.prefs.getStringPref("toolkit.telemetry.dap.helper.url"), Services.prefs.getStringPref("toolkit.telemetry.dap.helper.hpke") ), ]); } /** * Sends test reports with randomly generated measurements for testing the DAP infrastructure. * * @param {Array} tasks * Array of task definitions to send test reports for. * @param {object} options * Options to pass to sendDAPMeasurement. */ async sendTestReports(tasks, options = {}) { for (let task of tasks) { let measurement; if (task.vdaf == "sum") { measurement = 3; } else if (task.vdaf == "sumvec") { measurement = new Array(20).fill(0); let r = Math.floor(Math.random() * 10); measurement[r] += 1; measurement[19] += 1; } else if (task.vdaf == "histogram") { measurement = Math.floor(Math.random() * 15); } else { throw new Error(`Unknown VDAF ${task.vdaf}`); } await this.sendDAPMeasurement(task, measurement, options); } } /** * Periodically sends test reports on a timer. * * @param {Array} tasks * Array of task definitions to send test reports for. */ async timedSendTestReports(tasks) { lazy.logConsole.debug("Sending on timer."); await this.sendTestReports(tasks); lazy.setTimeout( () => this.timedSendTestReports(tasks), this.timeout_value() ); } /** * @returns {number} A random timeout value between 9-11 minutes in milliseconds. */ timeout_value() { const MINUTE = 60 * 1000; return MINUTE * (9 + Math.random() * 2); // 9 - 11 minutes } /** * Creates a DAP report for a specific task from a measurement and sends it. * * @param {Task} task * Definition of the task for which the measurement was taken. * @param {number|Array} measurement * The measured value for which a report is generated. * @param {object} options * @param {number} options.timeout * The timeout for request in milliseconds. Defaults to 30s. * @param {string} options.reason * A string to indicate the reason for triggering a submission. This is * currently ignored and not recorded. * @param {string} options.ohttp_relay * @param {Uint8Array} options.ohttp_hpke * If an OHTTP relay is specified, the reports are uploaded over OHTTP. */ async sendDAPMeasurement(task, measurement, options = {}) { try { const controller = new AbortController(); lazy.setTimeout(() => controller.abort(), options.timeout ?? 30_000); let keys = { leader_hpke: HPKEConfigManager.decodeKey(lazy.gLeaderHpke), helper_hpke: HPKEConfigManager.decodeKey(lazy.gHelperHpke), }; let report = this.generateReport(task, measurement, keys); await this.sendReport( lazy.gDapEndpoint, task.id, report, controller.signal, options ); } catch (e) { if (e.name === "AbortError") { lazy.logConsole.error("Aborted DAP report generation: ", e); } else { lazy.logConsole.error("DAP report generation failed: " + e); } throw e; } } /** * @typedef {object} AggregatorKeys * @property {Uint8Array} leader_hpke - The leader's DAP HPKE key. * @property {Uint8Array} helper_hpke - The helper's DAP HPKE key. */ /** * Generates the encrypted DAP report. * * @param {Task} task * Definition of the task for which the measurement was taken. * @param {number|Array} measurement * The measured value for which a report is generated. * @param {AggregatorKeys} keys * The DAP encryption keys for each aggregator. * * @returns {ArrayBuffer} The generated binary report data. */ generateReport(task, measurement, keys) { let task_id = new Uint8Array( ChromeUtils.base64URLDecode(task.id, { padding: "ignore" }) ); let reportOut = {}; if (task.vdaf === "sum") { Services.DAPTelemetry.GetReportPrioSum( keys.leader_hpke, keys.helper_hpke, measurement, task_id, task.bits, task.time_precision, reportOut ); } else if (task.vdaf === "sumvec") { if (measurement.length != task.length) { throw new Error( "Measurement vector length doesn't match task configuration" ); } Services.DAPTelemetry.GetReportPrioSumVec( keys.leader_hpke, keys.helper_hpke, measurement, task_id, task.bits, task.time_precision, reportOut ); } else if (task.vdaf === "histogram") { Services.DAPTelemetry.GetReportPrioHistogram( keys.leader_hpke, keys.helper_hpke, measurement, task_id, task.length, task.time_precision, reportOut ); } else { throw new Error( `Unknown measurement type for task ${task.id}: ${task.vdaf} ${task.bits}` ); } return new Uint8Array(reportOut.value).buffer; } /** * Sends a report to the leader. * * @param {string} leader_endpoint * The URL for the leader. * @param {string} task_id * Base64 encoded task_id as it appears in the upload path. * @param {ArrayBuffer} report * Raw bytes of the TLS encoded report. * @param {AbortSignal} abortSignal * Can be used to cancel network requests. Does not cancel computation. * @param {object} options * @param {string} options.ohttp_relay * @param {Uint8Array} options.ohttp_hpke * If an OHTTP relay is specified, the reports are uploaded over OHTTP. In * this case, the OHTTP and DAP keys must be provided and this code will not * attempt to fetch them. * * @returns {Promise} Once the attempt to send the report completes, whether or not it was successful. */ async sendReport(leader_endpoint, task_id, report, abortSignal, options) { const upload_path = leader_endpoint + "/tasks/" + task_id + "/reports"; try { let requestOptions = { method: "PUT", headers: { "Content-Type": "application/dap-report" }, body: report, signal: abortSignal, }; let response; if (options.ohttp_relay) { response = await lazy.ObliviousHTTP.ohttpRequest( options.ohttp_relay, options.ohttp_hpke, upload_path, requestOptions ); } else { response = await fetch(upload_path, requestOptions); } if (response.status != 200) { const content_type = response.headers.get("content-type"); if (content_type && content_type === "application/json") { let error = await response.json(); throw new Error( `Sending failed. HTTP response: ${response.status} ${response.statusText}. Error: ${error.type} ${error.title}` ); } else { let error = await response.text(); throw new Error( `Sending failed. HTTP response: ${response.status} ${response.statusText}. Error: ${error}` ); } } else { lazy.logConsole.debug("DAP report sent"); } } catch (err) { if (err.name === "AbortError") { lazy.logConsole.error("Aborted DAP report sending: ", err); } else { lazy.logConsole.error("Failed to send report: ", err); } throw err; } } })();