# Input Postgresql

This component pulls data from a postgresql database as CSV on a given SQL statement. Parameters like
host, database, user, password and sql need to be set. Please note that data is processed in-memory (pandas) and can't spill on disk (spark) yet. Therefore, the queried data must fit onto main memory (of the POD in case running within KubeFlow context.

In [None]:
!pip install psycopg2-binary==2.9.1 pandas==1.3.1

In [None]:
import os
import pandas as pd
import psycopg2
import re
import sys

In [None]:
# path and file name for output
output_data_csv = os.environ.get('output_data_csv', 'data.csv')

# hostname of database server
host = os.environ.get('host')

# database name
database = os.environ.get('database')

# db user
user = os.environ.get('user')

# db password
password = os.environ.get('password')

# db port
port = int(os.environ.get('port', 5432))

# sql query statement to be executed
sql = os.environ.get('sql')

# temporal data storage for local execution
data_dir = os.environ.get('data_dir', '../../data/')

In [None]:
# override parameters received from a potential call using %run magic
parameters = list(
 map(
 lambda s: re.sub('$', '"', s),
 map(
 lambda s: s.replace('=', '="'),
 filter(
 lambda s: s.find('=') > -1,
 sys.argv
 )
 )
 )
)

for parameter in parameters:
 exec(parameter)

# cast parameters to appropriate type
port = int(port)

In [None]:
print('Logging configuration parameters...')
print(output_data_csv)
print(host)
print(database)
print(user)
print(password)
print(port)
print(sql)
print(data_dir)
print('...done')

In [None]:
conn = psycopg2.connect(
 host=host,
 database=database,
 user=user,
 password=password,
 port=port
)
print('Connection successfull')

In [None]:
d = pd.read_sql_query(sql, conn)
print('Query successfull')

In [None]:
conn.close()

In [None]:
d.to_csv(data_dir + output_data_csv, index=False)
print('Data written successfully')