import datetime import pendulum import os from airflow.decorators import dag, task from airflow.providers.postgres.hooks.postgres import PostgresHook from airflow.providers.postgres.operators.postgres import PostgresOperator @dag( dag_id="process-employees-new", schedule_interval="0 0 * * *", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, dagrun_timeout=datetime.timedelta(minutes=60), ) def ProcessEmployees(): create_employees_table = PostgresOperator( task_id="create_employees_table", postgres_conn_id="tutorial_pg_conn", sql=""" CREATE TABLE IF NOT EXISTS employees ( "Serial Number" NUMERIC PRIMARY KEY, "Company Name" TEXT, "Employee Markme" TEXT, "Description" TEXT, "Leave" INTEGER );""", ) create_employees_temp_table = PostgresOperator( task_id="create_employees_temp_table", postgres_conn_id="tutorial_pg_conn", sql=""" DROP TABLE IF EXISTS employees_temp; CREATE TABLE employees_temp ( "Serial Number" NUMERIC PRIMARY KEY, "Company Name" TEXT, "Employee Markme" TEXT, "Description" TEXT, "Leave" INTEGER );""", ) @task def get_data(): # NOTE: configure this as appropriate for your airflow environment data_path = "/opt/airflow/dags/files/employees.csv" os.makedirs(os.path.dirname(data_path), exist_ok=True) postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn") conn = postgres_hook.get_conn() cur = conn.cursor() with open(data_path, "r") as file: cur.copy_expert( "COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'", file, ) conn.commit() @task def merge_data(): query = """ INSERT INTO employees SELECT * FROM ( SELECT DISTINCT * FROM employees_temp ) t ON CONFLICT ("Serial Number") DO UPDATE SET "Serial Number" = excluded."Serial Number"; """ try: postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn") conn = postgres_hook.get_conn() cur = conn.cursor() cur.execute(query) conn.commit() return 0 except Exception as e: return 1 [create_employees_table, create_employees_temp_table] >> get_data() >> merge_data() dag = ProcessEmployees()