/* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ const lazy = {}; ChromeUtils.defineESModuleGetters(lazy, { Sqlite: "resource://gre/modules/Sqlite.sys.mjs", }); ChromeUtils.defineLazyGetter(lazy, "log", function () { return console.createInstance({ prefix: "ChatStore", maxLogLevelPref: "browser.aiwindow.chatStore.loglevel", }); }); import { CONVERSATION_TABLE, CONVERSATION_UPDATED_DATE_INDEX, CONVERSATION_INSERT, MESSAGE_TABLE, MESSAGE_ORDINAL_INDEX, MESSAGE_URL_INDEX, MESSAGE_CREATED_DATE_INDEX, MESSAGE_CONV_ID_INDEX, MESSAGE_INSERT, CONVERSATIONS_MOST_RECENT, CONVERSATION_BY_ID, CONVERSATIONS_BY_DATE, CONVERSATIONS_BY_URL, CONVERSATIONS_CONTENT_SEARCH, CONVERSATIONS_CONTENT_SEARCH_BY_ROLE, CONVERSATIONS_HISTORY_SEARCH, MESSAGES_BY_DATE, MESSAGES_BY_DATE_AND_ROLE, DELETE_CONVERSATION_BY_ID, CONVERSATIONS_OLDEST, CONVERSATION_HISTORY, ESCAPE_CHAR, getConversationMessagesSql, } from "./ChatSql.sys.mjs"; import { ChatMinimal } from "./ChatMessage.sys.mjs"; export { ChatConversation } from "./ChatConversation.sys.mjs"; export { ChatMessage, ChatMinimal } from "./ChatMessage.sys.mjs"; export { CONVERSATION_STATUS, MESSAGE_ROLE, INSIGHTS_FLAG_SOURCE, } from "./ChatConstants.sys.mjs"; import { CURRENT_SCHEMA_VERSION, DB_FOLDER_PATH, DB_FILE_NAME, PREF_BRANCH, CONVERSATION_STATUS, } from "./ChatConstants.sys.mjs"; import { parseConversationRow, parseMessageRows, parseChatHistoryViewRows, toJSONOrNull, } from "./ChatUtils.sys.mjs"; // NOTE: Reference to migrations file, migrations.mjs has an example // migration function set up for a migration, and the eslint-disable-next-line // should be removed once we create the first migration. // // eslint-disable-next-line no-unused-vars import { migrations } from "./ChatMigrations.sys.mjs"; const MAX_DB_SIZE_BYTES = 75 * 1024 * 1024; const SORTS = ["ASC", "DESC"]; /** * Simple interface to store and retrieve chat conversations and messages. * * @todo Bug 2005409 * Move this documentation to Firefox source docs * * See: https://docs.google.com/document/d/1VlwmGbMhPIe-tmeKWinHuPh50VC9QrWEeQQ5V-UvEso/edit?tab=t.klqqibndv3zk * * @example * let { ChatStore, ChatConversation, ChatMessage, MESSAGE_ROLE } = * ChromeUtils.importESModule("resource:///modules/aiwindow/ui/modules/ChatStore.sys.mjs"); * const chatStore = new ChatStore(); * const conversation = new ChatConversation({ * title: "title", * description: "description", * pageUrl: new URL("https://mozilla.com/"), * pageMeta: { one: 1, two: 2 }, * }); * const msg1 = new ChatMessage({ * ordinal: 0, * role: MESSAGE_ROLE.USER, * modelId: "test", * params: { one: "one" }, * usage: { two: "two", content: "some content" }, * }); * const msg2 = new ChatMessage({ * ordinal: 1, * role: MESSAGE_ROLE.ASSISTANT, * modelId: "test", * params: { one: "one" }, * usage: { two: "two", content: "some content 2" }, * }); * conversation.messages = [msg1, msg2]; * await chatStore.updateConversation(conversation); * // Or findConversationsByDate, findConversationsByURL. * const foundConversation = * await chatStore.findConversationById(conversation.id); * * @typedef {object} ChatStore * * @property {*} x ? */ export class ChatStore { #asyncShutdownBlocker; #conn; #promiseConn; constructor() { this.#asyncShutdownBlocker = async () => { await this.#closeConnection(); }; } /** * Updates a conversation's saved state in the SQLite db * * @param {ChatConversation} conversation */ async updateConversation(conversation) { await this.#ensureDatabase().catch(e => { lazy.log.error("Could not ensure a database connection."); throw e; }); const pageUrl = URL.parse(conversation.pageUrl); await this.#conn .executeTransaction(async () => { await this.#conn.executeCached(CONVERSATION_INSERT, { conv_id: conversation.id, title: conversation.title, description: conversation.description, page_url: pageUrl?.href ?? null, page_meta: toJSONOrNull(conversation.pageMeta), created_date: conversation.createdDate, updated_date: conversation.updatedDate, status: conversation.status, active_branch_tip_message_id: conversation.activeBranchTipMessageId, }); const messages = conversation.messages.map(m => ({ message_id: m.id, conv_id: conversation.id, created_date: m.createdDate, parent_message_id: m.parentMessageId, revision_root_message_id: m.revisionRootMessageId, ordinal: m.ordinal, is_active_branch: m.isActiveBranch ? 1 : 0, role: m.role, model_id: m.modelId, params: toJSONOrNull(m.params), content: toJSONOrNull(m.content), usage: toJSONOrNull(m.usage), page_url: m.pageUrl?.href || "", turn_index: m.turnIndex, insights_enabled: m.insightsEnabled, insights_flag_source: m.insightsFlagSource, insights_applied_jsonb: toJSONOrNull(m.insightsApplied), web_search_queries_jsonb: toJSONOrNull(m.webSearchQueries), })); await this.#conn.executeCached(MESSAGE_INSERT, messages); }) .catch(e => { lazy.log.error("Transaction failed to execute"); throw e; }); } /** * Gets a list of oldest conversations * * @param {number} numberOfConversations - How many conversations to retrieve * @returns {Array} - List of ChatMinimal items */ async findOldestConversations(numberOfConversations) { await this.#ensureDatabase().catch(e => { lazy.log.error("Could not ensure a database connection."); throw e; }); const rows = await this.#conn .executeCached(CONVERSATIONS_OLDEST, { limit: numberOfConversations, }) .catch(e => { lazy.log.error("Could not retrieve oldest conversations."); throw e; }); return rows.map(row => { return new ChatMinimal({ convId: row.getResultByName("conv_id"), title: row.getResultByName("title"), }); }); } /** * Gets a list of most recent conversations * * @param {number} numberOfConversations - How many conversations to retrieve * @returns {Array} - List of ChatMinimal items */ async findRecentConversations(numberOfConversations) { await this.#ensureDatabase().catch(e => { lazy.log.error("Could not ensure a database connection."); throw e; }); const rows = await this.#conn .executeCached(CONVERSATIONS_MOST_RECENT, { limit: numberOfConversations, }) .catch(e => { lazy.log.error("Could not retrieve most recent conversations."); throw e; }); return rows.map(row => { return new ChatMinimal({ convId: row.getResultByName("conv_id"), title: row.getResultByName("title"), }); }); } /** * Gets a Conversation using it's id * * @param {string} conversationId - The ID of the conversation to retrieve * * @returns {ChatConversation} - The conversation and its messages */ async findConversationById(conversationId) { const conversations = await this.#findConversationsWithMessages( CONVERSATION_BY_ID, { conv_id: conversationId, } ); return conversations[0] ?? null; } /** * Finds conversations between a specified start and end date * * @param {number} startDate - Start time epoch format * @param {number} endDate - End time epoch format * * @returns {Array} - The conversations and their messages */ async findConversationsByDate(startDate, endDate) { return this.#findConversationsWithMessages(CONVERSATIONS_BY_DATE, { start_date: startDate, end_date: endDate, }); } /** * Finds conversations between a specified start and end date * * @param {URL} pageUrl - The URL to find conversations for * * @returns {Array} - The conversations and their messages */ async findConversationsByURL(pageUrl) { return this.#findConversationsWithMessages(CONVERSATIONS_BY_URL, { page_url: pageUrl.href, }); } /** * Search for messages that happened between the specified start * and end dates, optionally, filter the messages by a specific * message role type. * * @param {Date} startDate - The start date, inclusive * @param {Date} [endDate=new Date()] - The end date, inclusive * @param {MessageRole} [role=-1] - The message role type to filter by, one of 0|1|2|3 * as defined by the constant MESSAGE_ROLE * @param {number} [limit=-1] - The max number of messages to retrieve * @param {number} [offset=-1] - The number or messages to skip from the result set * * @returns {Array} - An array of ChatMessage entries */ async findMessagesByDate( startDate, endDate = new Date(), role = -1, limit = -1, offset = -1 ) { const params = { start_date: startDate.getTime(), end_date: endDate.getTime(), limit, offset, }; let sql = MESSAGES_BY_DATE; if (role > -1) { sql = MESSAGES_BY_DATE_AND_ROLE; params.role = role; } let rows = await this.#conn.executeCached(sql, params); return parseMessageRows(rows); } #escapeForLike(searchString) { return searchString .replaceAll(ESCAPE_CHAR, `${ESCAPE_CHAR}${ESCAPE_CHAR}`) .replaceAll("%", `${ESCAPE_CHAR}%`) .replaceAll("_", `${ESCAPE_CHAR}_`); } /** * Searches through the message.content JSON object to find a particular * object path that contains a partial string match of a value. * * @param {string} keyChain - The object key chain to look through, * like obj.field1.field2 * @param {MessageRole} [role=-1] - A message role to search for * * @returns {Array} - An array of conversations with messages * that contain a message that matches the search string at the given content * object path */ async searchContent(keyChain, role = -1) { const path = `$.${keyChain}`; const query = role > -1 ? CONVERSATIONS_CONTENT_SEARCH_BY_ROLE : CONVERSATIONS_CONTENT_SEARCH; const params = { path }; if (role > -1) { params.role = role; } const rows = await this.#conn.executeCached(query, params); if (!rows.length) { return []; } const conversations = rows.map(parseConversationRow); return await this.#getMessagesForConversations(conversations); } /** * Searches for conversations where the conversation title, or the conversation * contains a user message where the search string contains a partial match * in the message.content.body field * * @param {string} searchString - The string to search with for conversations * * @returns {Array} - An array of conversations with messages * that contain a message that matches the search string in the conversation * titles */ async search(searchString) { const path = `$.body`; const pattern = `%${this.#escapeForLike(searchString)}%`; const rows = await this.#conn.executeCached(CONVERSATIONS_HISTORY_SEARCH, { path, pattern, }); if (!rows.length) { return []; } const conversations = rows.map(parseConversationRow); return await this.#getMessagesForConversations(conversations); } /** * Gets a list of chat history items to display in Chat History view. * * @param {number} [pageNumber=1] - The page number to get, 1 based indexing * @param {number} [pageSize=20] - Number of items to get per page * @param {string} [sort="desc"] - desc|asc The sorting order based on updated_date for conversations */ async chatHistoryView(pageNumber = 1, pageSize = 20, sort = "desc") { const sorting = SORTS.find(item => item === sort.toUpperCase()) ?? "DESC"; const offset = pageSize * (pageNumber - 1); const limit = pageSize; const params = { limit, offset, }; const rows = await this.#conn.executeCached( CONVERSATION_HISTORY.replace("{sort}", sorting), params ); return parseChatHistoryViewRows(rows); } /** * Prunes the database of old conversations in order to get the * database file size to the specified maximum size. * * @todo Bug 2005411 * Review the requirements for db pruning and set up invocation schedule, and refactor * to use dbstat * * @param {number} [reduceByPercentage=0.05] - Percentage to reduce db file size by * @param {number} [maxDbSizeBytes=MAX_DB_SIZE_BYTES] - Db max file size */ async pruneDatabase( reduceByPercentage = 0.05, maxDbSizeBytes = MAX_DB_SIZE_BYTES ) { if (!IOUtils.exists(this.databaseFilePath)) { return; } const DELETE_BATCH_SIZE = 50; const getPragmaInt = async name => { const result = await this.#conn.execute(`PRAGMA ${name}`); return result[0].getInt32(0); }; // compute the logical DB size in bytes using SQLite's page_size, // page_count, and freelist_count const getLogicalDbSizeBytes = async () => { const pageSize = await getPragmaInt("page_size"); const pageCount = await getPragmaInt("page_count"); const freelistCount = await getPragmaInt("freelist_count"); // Logical used pages = total pages - free pages const usedPages = pageCount - freelistCount; const lSize = usedPages * pageSize; return lSize; }; let logicalSize = await getLogicalDbSizeBytes(); if (logicalSize < maxDbSizeBytes) { return; } const targetLogicalSize = Math.max( 0, logicalSize * (1 - reduceByPercentage) ); const MAX_ITERATIONS = 100; // how many "no file size change" batches we tolerate const MAX_STAGNANT = 5; let iterations = 0; let stagnantIterations = 0; while ( logicalSize > targetLogicalSize && iterations < MAX_ITERATIONS && stagnantIterations < MAX_STAGNANT ) { iterations++; const recentChats = await this.findOldestConversations(DELETE_BATCH_SIZE); if (!recentChats.length) { break; } for (const chat of recentChats) { await this.deleteConversationById(chat.id); } const newLogicalSize = await getLogicalDbSizeBytes(); if (newLogicalSize >= logicalSize) { stagnantIterations++; } else { stagnantIterations = 0; } logicalSize = newLogicalSize; } // Actually reclaim disk space. await this.#conn.execute("PRAGMA incremental_vacuum;"); } /** * Returns the file size of the database. * Establishes a connection first to make sure the * database exists. * * @returns {number} - The file size in bytes */ async getDatabaseSize() { await this.#ensureDatabase(); const stats = await IOUtils.stat(this.databaseFilePath); return stats.size; } /** * Deletes a particular conversation using it's id * * @param {string} id - The conv_id of a conversation row to delete */ async deleteConversationById(id) { await this.#ensureDatabase(); await this.#conn.execute(DELETE_CONVERSATION_BY_ID, { conv_id: id, }); } /** * This method is meant to only be used for testing cleanup */ async destroyDatabase() { await this.#removeDatabaseFiles(); } /** * Gets the version of the schema currently set in the database. * * @returns {number} */ async getDatabaseSchemaVersion() { if (!this.#conn) { await this.#ensureDatabase(); } return this.#conn.getSchemaVersion(); } async #getMessagesForConversations(conversations) { const convs = conversations.reduce((convMap, conv) => { convMap[conv.id] = conv; return convMap; }, {}); // Find all the messages for all the conversations. const rows = await this.#conn .executeCached( getConversationMessagesSql(conversations.length), conversations.map(c => c.id) ) .catch(e => { lazy.log.error("Could not retrieve messages for conversatons"); lazy.log.error(`${e.message}\n${e.stack}`); return []; }); // TODO: retrieve TTL content. parseMessageRows(rows).forEach(message => { const conversation = convs[message.convId]; if (conversation) { conversation.messages.push(message); } }); return conversations; } async #openConnection() { lazy.log.debug("Opening new connection"); try { const confConfig = { path: this.databaseFilePath }; this.#conn = await lazy.Sqlite.openConnection(confConfig); } catch (e) { lazy.log.error("openConnection() could not open db:", e.message); throw e; } lazy.Sqlite.shutdown.addBlocker( "ChatStore: Shutdown", this.#asyncShutdownBlocker ); try { // TODO: remove this after switching pruneDatabase() to use dbstat await this.#conn.execute("PRAGMA page_size = 4096;"); // Setup WAL journaling, as it is generally faster. await this.#conn.execute("PRAGMA journal_mode = WAL;"); await this.#conn.execute("PRAGMA wal_autocheckpoint = 16;"); // Store VACUUM information to be used by the VacuumManager. await this.#conn.execute("PRAGMA auto_vacuum = INCREMENTAL;"); await this.#conn.execute("PRAGMA foreign_keys = ON;"); } catch (e) { lazy.log.warn("Configuring SQLite PRAGMA settings: ", e.message); } } async #closeConnection() { if (!this.#conn) { return; } lazy.log.debug("Closing connection"); lazy.Sqlite.shutdown.removeBlocker(this.#asyncShutdownBlocker); try { await this.#conn.close(); } catch (e) { lazy.log.warn(`Error closing connection: ${e.message}`); } this.#conn = null; } /** * @todo Bug 2005412 * Discuss implications of multiple instances of ChatStore * and the potential issues with migrations/schemas. */ async #ensureDatabase() { if (this.#promiseConn) { return this.#promiseConn; } let deferred = Promise.withResolvers(); this.#promiseConn = deferred.promise; if (this.#removeDatabaseOnStartup) { lazy.log.debug("Removing database on startup"); try { await this.#removeDatabaseFiles(); } catch (e) { deferred.reject(new Error("Could not remove the database files")); return deferred.promise; } } try { await this.#openConnection(); } catch (e) { if ( e.result == Cr.NS_ERROR_FILE_CORRUPTED || e.errors?.some(error => error.result == Ci.mozIStorageError.NOTADB) ) { lazy.log.warn("Invalid database detected, removing it.", e); await this.#removeDatabaseFiles(); } } if (!this.#conn) { try { await this.#openConnection(); } catch (e) { lazy.log.error("Could not open the database connection.", e); deferred.reject(new Error("Could not open the database connection")); return deferred.promise; } } try { await this.#initializeSchema(); } catch (e) { lazy.log.warn( "Failed to initialize the database schema, recreating the database.", e ); // If the schema cannot be initialized try to create a new database file. await this.#removeDatabaseFiles(); } deferred.resolve(this.#conn); return this.#promiseConn; } async setSchemaVersion(version) { await this.#conn.setSchemaVersion(version); } async #initializeSchema() { const version = await this.getDatabaseSchemaVersion(); if (version == this.CURRENT_SCHEMA_VERSION) { return; } if (version > this.CURRENT_SCHEMA_VERSION) { await this.setSchemaVersion(this.CURRENT_SCHEMA_VERSION); return; } // Must migrate the schema. await this.#conn.executeTransaction(async () => { if (version == 0) { // This is a newly created database, just create the entities. await this.#createDatabaseEntities(); await this.#conn.setSchemaVersion(this.CURRENT_SCHEMA_VERSION); // eslint-disable-next-line no-useless-return return; } await this.applyMigrations(); await this.setSchemaVersion(this.CURRENT_SCHEMA_VERSION); }); } async applyMigrations() { for (const migration of migrations) { if (typeof migration !== "function") { continue; } await migration(this.#conn, this.CURRENT_SCHEMA_VERSION); } } async #removeDatabaseFiles() { lazy.log.debug("Removing database files"); await this.#closeConnection(); try { for (let file of [ this.databaseFilePath, PathUtils.join(DB_FOLDER_PATH, this.databaseFileName + "-wal"), PathUtils.join(DB_FOLDER_PATH, this.databaseFileName + "-shm"), ]) { lazy.log.debug(`Removing ${file}`); await IOUtils.remove(file, { retryReadonly: true, recursive: true, ignoreAbsent: true, }); } this.#removeDatabaseOnStartup = false; } catch (e) { lazy.log.warn("Failed to remove database files", e); // Try to clear on next startup. this.#removeDatabaseOnStartup = true; // Re-throw the exception for the caller. throw e; } } async #findConversationsWithMessages(sql, queryParams) { await this.#ensureDatabase().catch(e => { lazy.log.error("Could not ensure a database connection."); lazy.log.error(`${e.message}\n${e.stack}`); return []; }); // @todo Bug 2005414 // Check summary first, find the one with the largest end_ordinal. // If not found retrieve all messages. // If found compare end_ordinal of the summary with active branch ordinal // to determine if extra messages must be retrieved. let rows = await this.#conn.executeCached(sql, queryParams); const conversations = rows.map(parseConversationRow); return await this.#getMessagesForConversations(conversations); } async #createDatabaseEntities() { await this.#conn.execute(CONVERSATION_TABLE); await this.#conn.execute(CONVERSATION_UPDATED_DATE_INDEX); await this.#conn.execute(MESSAGE_TABLE); await this.#conn.execute(MESSAGE_ORDINAL_INDEX); await this.#conn.execute(MESSAGE_URL_INDEX); await this.#conn.execute(MESSAGE_CREATED_DATE_INDEX); await this.#conn.execute(MESSAGE_CONV_ID_INDEX); } get #removeDatabaseOnStartup() { return Services.prefs.getBoolPref( `${PREF_BRANCH}.removeDatabaseOnStartup`, false ); } set #removeDatabaseOnStartup(value) { lazy.log.debug(`Setting removeDatabaseOnStartup to ${value}`); Services.prefs.setBoolPref(`${PREF_BRANCH}.removeDatabaseOnStartup`, value); } static get CONVERSATION_STATUS() { return CONVERSATION_STATUS; } get CURRENT_SCHEMA_VERSION() { return CURRENT_SCHEMA_VERSION; } get connection() { return this.#conn; } get databaseFileName() { return DB_FILE_NAME; } get databaseFilePath() { return PathUtils.join(PathUtils.profileDir, this.databaseFileName); } }