{"cells": [{"attachments": {}, "cell_type": "markdown", "metadata": {}, "source": ["# 使用PolarDB-PG作为OpenAI嵌入向量数据库\n", "\n", "本笔记将逐步指导您如何将PolarDB-PG用作OpenAI嵌入向量数据库。\n", "\n", "本笔记介绍了以下端到端的过程:\n", "1. 使用OpenAI API创建的预先计算的嵌入向量。\n", "2. 将嵌入向量存储在PolarDB-PG的云实例中。\n", "3. 将原始文本查询转换为嵌入向量,使用OpenAI API。\n", "4. 使用PolarDB-PG在创建的集合中执行最近邻搜索。\n", "\n", "### 什么是PolarDB-PG\n", "\n", "[PolarDB-PG](https://www.alibabacloud.com/help/en/polardb/latest/what-is-polardb-2) 是一种高性能向量数据库,采用读写分离架构。它是由阿里云管理的云原生数据库,与PostgreSQL 100%兼容,与Oracle语法高度兼容。它支持处理大规模向量数据存储和查询,并通过优化底层执行算法大大提高向量计算的效率,为用户提供快速、弹性、高性能、大规模存储、安全可靠的向量数据库服务。此外,PolarDB-PG还支持多维和多模态时空信息引擎和地理信息引擎。同时,PolarDB-PG配备完整的OLAP功能和服务级别协议,得到许多用户的认可和使用;\n", "\n", "### 部署选项\n", "\n", "- 使用[PolarDB-PG云向量数据库](https://www.alibabacloud.com/product/polardb-for-postgresql)。[点击这里](https://www.alibabacloud.com/product/polardb-for-postgresql?spm=a3c0i.147400.6791778070.243.9f204881g5cjpP)快速部署。\n"]}, {"attachments": {}, "cell_type": "markdown", "metadata": {}, "source": ["## 先决条件\n", "\n", "为了完成这个练习,我们需要准备一些东西:\n", "\n", "1. PolarDB-PG 云服务器实例。\n", "2. 'psycopg2' 库用于与向量数据库交互。任何其他的PostgreSQL客户端库也可以。\n", "3. 一个[OpenAI API密钥](https://beta.openai.com/account/api-keys)。\n"]}, {"attachments": {}, "cell_type": "markdown", "metadata": {}, "source": ["我们可以通过运行一个简单的curl命令来验证服务器是否成功启动:\n"]}, {"attachments": {}, "cell_type": "markdown", "metadata": {}, "source": ["### 安装所需软件包\n", "\n", "这个笔记本显然需要`openai`和`psycopg2`软件包,但我们还会使用一些其他附加库。以下命令会安装它们全部:\n"]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["! pip install openai psycopg2 pandas wget\n"]}, {"attachments": {}, "cell_type": "markdown", "metadata": {}, "source": ["准备您的OpenAI API密钥\n", "OpenAI API密钥用于对文档和查询进行向量化。\n", "\n", "如果您还没有OpenAI API密钥,可以从https://beta.openai.com/account/api-keys 获取一个。\n", "\n", "获取密钥后,请将其添加到您的环境变量中,命名为OPENAI_API_KEY。\n", "\n", "如果您对通过环境变量设置API密钥有任何疑问,请参考[API密钥安全最佳实践](https://help.openai.com/en/articles/5112595-best-practices-for-api-key-safety)。\n"]}, {"cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [{"name": "stdout", "output_type": "stream", "text": ["OPENAI_API_KEY is ready\n"]}], "source": ["# 测试您的 OpenAI API 密钥是否已正确设置为环境变量。\n", "# 注意:如果您在本地运行此笔记本,您需要重新加载终端和笔记本,以使环境变量生效。\n", "\n", "if os.getenv(\"OPENAI_API_KEY\") is not None:\n", " print(\"OPENAI_API_KEY is ready\")\n", "else:\n", " print(\"OPENAI_API_KEY environment variable not found\")\n"]}, {"attachments": {}, "cell_type": "markdown", "metadata": {}, "source": ["## 连接到PolarDB\n", "首先将其添加到您的环境变量中。或者您可以直接更改下面的\"psycopg2.connect\"参数。\n", "\n", "使用官方的Python库连接到正在运行的PolarDB服务器实例非常简单:\n"]}, {"cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": ["import os\n", "import psycopg2\n", "\n", "# 注意:或者,您也可以像这样设置一个临时的环境变量:\n", "# os.environ[\"PGHOST\"] = \"your_host\"\n", "# os.environ[\"PGPORT\"] \"5432\"),\n", "# os.environ[\"PGDATABASE\"] \"postgres\"),\n", "# os.environ[\"PGUSER\"] \"user\"),\n", "# os.environ[\"PGPASSWORD\"] \"password\"),\n", "\n", "connection = psycopg2.connect(\n", " host=os.environ.get(\"PGHOST\", \"localhost\"),\n", " port=os.environ.get(\"PGPORT\", \"5432\"),\n", " database=os.environ.get(\"PGDATABASE\", \"postgres\"),\n", " user=os.environ.get(\"PGUSER\", \"user\"),\n", " password=os.environ.get(\"PGPASSWORD\", \"password\")\n", ")\n", "\n", "# 创建一个新的游标对象\n", "cursor = connection.cursor()\n"]}, {"attachments": {}, "cell_type": "markdown", "metadata": {}, "source": ["我们可以通过运行任何可用的方法来测试连接:\n"]}, {"cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [{"name": "stdout", "output_type": "stream", "text": ["Connection successful!\n"]}], "source": ["# 执行一个简单的查询以测试连接\n", "cursor.execute(\"SELECT 1;\")\n", "result = cursor.fetchone()\n", "\n", "# 检查查询结果\n", "if result == (1,):\n", " print(\"Connection successful!\")\n", "else:\n", " print(\"Connection failed.\")\n"]}, {"cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [{"data": {"text/plain": ["'vector_database_wikipedia_articles_embedded.zip'"]}, "execution_count": 7, "metadata": {}, "output_type": "execute_result"}], "source": ["import wget\n", "\n", "embeddings_url = \"https://cdn.openai.com/API/examples/data/vector_database_wikipedia_articles_embedded.zip\"\n", "\n", "# 文件大小约为700MB,因此需要一些时间来完成。\n", "wget.download(embeddings_url)\n"]}, {"attachments": {}, "cell_type": "markdown", "metadata": {}, "source": ["下载的文件必须被解压缩:\n"]}, {"cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [{"name": "stdout", "output_type": "stream", "text": ["The file vector_database_wikipedia_articles_embedded.csv exists in the data directory.\n"]}], "source": ["import zipfile\n", "import os\n", "import re\n", "import tempfile\n", "\n", "current_directory = os.getcwd()\n", "zip_file_path = os.path.join(current_directory, \"vector_database_wikipedia_articles_embedded.zip\")\n", "output_directory = os.path.join(current_directory, \"../../data\")\n", "\n", "with zipfile.ZipFile(zip_file_path, \"r\") as zip_ref:\n", " zip_ref.extractall(output_directory)\n", "\n", "\n", "# 检查CSV文件是否存在\n", "file_name = \"vector_database_wikipedia_articles_embedded.csv\"\n", "data_directory = os.path.join(current_directory, \"../../data\")\n", "file_path = os.path.join(data_directory, file_name)\n", "\n", "\n", "if os.path.exists(file_path):\n", " print(f\"The file {file_name} exists in the data directory.\")\n", "else:\n", " print(f\"The file {file_name} does not exist in the data directory.\")\n"]}, {"attachments": {}, "cell_type": "markdown", "metadata": {}, "source": ["## 索引数据\n", "\n", "PolarDB将数据存储在__关系__中,其中每个对象至少由一个向量描述。我们的关系将被称为**articles**,每个对象将由**title**和**content**向量描述。\n", "\n", "我们将从创建一个关系开始,在**title**和**content**上创建一个向量索引,然后我们将用预先计算的嵌入填充它。\n"]}, {"cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": ["create_table_sql = '''\n", "CREATE TABLE IF NOT EXISTS public.articles (\n", " id INTEGER NOT NULL,\n", " url TEXT,\n", " title TEXT,\n", " content TEXT,\n", " title_vector vector(1536),\n", " content_vector vector(1536),\n", " vector_id INTEGER\n", ");\n", "\n", "ALTER TABLE public.articles ADD PRIMARY KEY (id);\n", "'''\n", "\n", "# 创建索引的SQL语句\n", "create_indexes_sql = '''\n", "在public.articles表上使用ivfflat方法为content_vector字段创建索引,设置lists参数为1000;\n", "\n", "在public.articles表上使用ivfflat方法为title_vector字段创建索引,设置lists参数为1000。\n", "'''\n", "\n", "# 执行SQL语句\n", "cursor.execute(create_table_sql)\n", "cursor.execute(create_indexes_sql)\n", "\n", "# 提交更改\n", "connection.commit()\n"]}, {"attachments": {}, "cell_type": "markdown", "metadata": {}, "source": ["## 加载数据\n", "\n", "在本节中,我们将加载在本次会话之前准备好的数据,这样您就不必使用自己的学分重新计算维基百科文章的嵌入。\n"]}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ["import io\n", "\n", "# 本地CSV文件的路径\n", "csv_file_path = '../../data/vector_database_wikipedia_articles_embedded.csv'\n", "\n", "# 定义一个生成器函数,逐行处理文件\n", "def process_file(file_path):\n", " with open(file_path, 'r') as file:\n", " for line in file:\n", " yield line\n", "\n", "# 创建一个 StringIO 对象以存储修改后的行\n", "modified_lines = io.StringIO(''.join(list(process_file(csv_file_path))))\n", "\n", "# 创建用于 copy_expert 方法的 COPY 命令\n", "copy_command = '''\n", "复制 public.articles 表中的数据(包括 id、url、title、content、title_vector、content_vector、vector_id 字段)\n", "从标准输入读取,使用 CSV 格式,包含表头,分隔符为逗号。\n", "'''\n", "\n", "# 使用copy_expert方法执行COPY命令\n", "cursor.copy_expert(copy_command, modified_lines)\n", "\n", "# 提交更改\n", "connection.commit()\n"]}, {"cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [{"name": "stdout", "output_type": "stream", "text": ["Count:25000\n"]}], "source": ["# 检查集合大小,确保所有点都已存储。\n", "count_sql = \"\"\"从 public.articles 表中选择计数(*);\"\"\"\n", "cursor.execute(count_sql)\n", "result = cursor.fetchone()\n", "print(f\"Count:{result[0]}\")\n"]}, {"attachments": {}, "cell_type": "markdown", "metadata": {}, "source": ["## 搜索数据\n", "\n", "一旦数据被放入Qdrant中,我们将开始查询集合中最接近的向量。我们可以提供一个额外的参数`vector_name`,以从基于标题的搜索切换到基于内容的搜索。由于预先计算的嵌入是使用`text-embedding-3-small` OpenAI模型创建的,因此在搜索过程中我们也必须使用它。\n"]}, {"cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": ["def query_polardb(query, collection_name, vector_name=\"title_vector\", top_k=20):\n", "\n", " # 从用户查询生成嵌入向量\n", " embedded_query = openai.Embedding.create(\n", " input=query,\n", " model=\"text-embedding-3-small\",\n", " )[\"data\"][0][\"embedding\"]\n", "\n", " # 将嵌入式查询转换为与PostgreSQL兼容的格式\n", " embedded_query_pg = \"[\" + \",\".join(map(str, embedded_query)) + \"]\"\n", "\n", " # 创建SQL查询\n", " query_sql = f\"\"\"\n", " SELECT id, url, title, l2_distance({vector_name},'{embedded_query_pg}'::VECTOR(1536)) AS similarity\n", " FROM {collection_name}\n", " ORDER BY {vector_name} <-> '{embedded_query_pg}'::VECTOR(1536)\n", " LIMIT {top_k};\n", " \"\"\"\n", " # 执行查询\n", " cursor.execute(query_sql)\n", " results = cursor.fetchall()\n", "\n", " return results\n"]}, {"cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [{"name": "stdout", "output_type": "stream", "text": ["1. Museum of Modern Art (Score: 0.5)\n", "2. Western Europe (Score: 0.485)\n", "3. Renaissance art (Score: 0.479)\n", "4. Pop art (Score: 0.472)\n", "5. Northern Europe (Score: 0.461)\n", "6. Hellenistic art (Score: 0.457)\n", "7. Modernist literature (Score: 0.447)\n", "8. Art film (Score: 0.44)\n", "9. Central Europe (Score: 0.439)\n", "10. European (Score: 0.437)\n", "11. Art (Score: 0.437)\n", "12. Byzantine art (Score: 0.436)\n", "13. Postmodernism (Score: 0.434)\n", "14. Eastern Europe (Score: 0.433)\n", "15. Europe (Score: 0.433)\n", "16. Cubism (Score: 0.432)\n", "17. Impressionism (Score: 0.432)\n", "18. Bauhaus (Score: 0.431)\n", "19. Surrealism (Score: 0.429)\n", "20. Expressionism (Score: 0.429)\n"]}], "source": ["import openai\n", "\n", "query_results = query_polardb(\"modern art in Europe\", \"Articles\")\n", "for i, result in enumerate(query_results):\n", " print(f\"{i + 1}. {result[2]} (Score: {round(1 - result[3], 3)})\")\n"]}, {"cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [{"name": "stdout", "output_type": "stream", "text": ["1. Battle of Bannockburn (Score: 0.489)\n", "2. Wars of Scottish Independence (Score: 0.474)\n", "3. 1651 (Score: 0.457)\n", "4. First War of Scottish Independence (Score: 0.452)\n", "5. Robert I of Scotland (Score: 0.445)\n", "6. 841 (Score: 0.441)\n", "7. 1716 (Score: 0.441)\n", "8. 1314 (Score: 0.429)\n", "9. 1263 (Score: 0.428)\n", "10. William Wallace (Score: 0.426)\n", "11. Stirling (Score: 0.419)\n", "12. 1306 (Score: 0.419)\n", "13. 1746 (Score: 0.418)\n", "14. 1040s (Score: 0.414)\n", "15. 1106 (Score: 0.412)\n", "16. 1304 (Score: 0.411)\n", "17. David II of Scotland (Score: 0.408)\n", "18. Braveheart (Score: 0.407)\n", "19. 1124 (Score: 0.406)\n", "20. July 27 (Score: 0.405)\n"]}], "source": ["# 这次我们将使用内容向量进行查询。\n", "query_results = query_polardb(\"Famous battles in Scottish history\", \"Articles\", \"content_vector\")\n", "for i, result in enumerate(query_results):\n", " print(f\"{i + 1}. {result[2]} (Score: {round(1 - result[3], 3)})\")\n"]}], "metadata": {"kernelspec": {"display_name": "Python 3", "language": "python", "name": "python3"}, "language_info": {"codemirror_mode": {"name": "ipython", "version": 3}, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.16"}, "orig_nbformat": 4}, "nbformat": 4, "nbformat_minor": 2}