{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Input Postgresql" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This component pulls data from a postgresql database as CSV on a given SQL statement. Parameters like\n", "host, database, user, password and sql need to be set. Please note that data is processed in-memory (pandas) and can't spill on disk (spark) yet. Therefore, the queried data must fit onto main memory (of the POD in case running within KubeFlow context." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pip install psycopg2-binary==2.9.1 pandas==1.3.1" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import pandas as pd\n", "import psycopg2\n", "import re\n", "import sys" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# path and file name for output\n", "output_data_csv = os.environ.get('output_data_csv', 'data.csv')\n", "\n", "# hostname of database server\n", "host = os.environ.get('host')\n", "\n", "# database name\n", "database = os.environ.get('database')\n", "\n", "# db user\n", "user = os.environ.get('user')\n", "\n", "# db password\n", "password = os.environ.get('password')\n", "\n", "# db port\n", "port = int(os.environ.get('port', 5432))\n", "\n", "# sql query statement to be executed\n", "sql = os.environ.get('sql')\n", "\n", "# temporal data storage for local execution\n", "data_dir = os.environ.get('data_dir', '../../data/')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# override parameters received from a potential call using %run magic\n", "parameters = list(\n", " map(\n", " lambda s: re.sub('$', '\"', s),\n", " map(\n", " lambda s: s.replace('=', '=\"'),\n", " filter(\n", " lambda s: s.find('=') > -1,\n", " sys.argv\n", " )\n", " )\n", " )\n", ")\n", "\n", "for parameter in parameters:\n", " exec(parameter)\n", "\n", "# cast parameters to appropriate type\n", "port = int(port)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print('Logging configuration parameters...')\n", "print(output_data_csv)\n", "print(host)\n", "print(database)\n", "print(user)\n", "print(password)\n", "print(port)\n", "print(sql)\n", "print(data_dir)\n", "print('...done')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "conn = psycopg2.connect(\n", " host=host,\n", " database=database,\n", " user=user,\n", " password=password,\n", " port=port\n", ")\n", "print('Connection successfull')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "d = pd.read_sql_query(sql, conn)\n", "print('Query successfull')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "conn.close()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "d.to_csv(data_dir + output_data_csv, index=False)\n", "print('Data written successfully')" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.6" } }, "nbformat": 4, "nbformat_minor": 4 }