# Batch Processing : ETL pipeline, data modelling and warehousing of Sales data
## Table of Contents
1. [Introduction](#1-introduction)
- [Technologies used](#technologies-used)
3. [Implementation Overview](#2-implementation-overview)
4. [Design](#3-design)
5. [Project structure](#4-project-structure)
6. [Settings](#5-settings)
- [Prerequisites](#prerequisites)
- [Important note : You must specify AWS credentials for each of the following](#important-note)
- [AWS Infrastructure](#aws-infrastructure)
- [Docker](#docker)
- [Running](#running)
7. [Implementation](#6-implementation)
- [Load Sales Data into PostgreSQL Database](#61-load-sales-data-into-postgresql-database)
- [Load Data from PostgreSQL to Amazon Redshift](#62-load-data-from-postgresql-to-amazon-redshift)
8. [Visualize Result](#7-visualize-result)
## 1. Introduction
Data is collected from an e-commerce company about their sales in 2022, the company's analytic teams is interested in understanding their business situation in the last year. We will build ETL pipelines which will transform raw data into actionable insights, store them in OLTP database (PostgreSQL) and OLAP database (Amazon Redshift) for enhanced data analytics capabilities.
Data include 4 csv files : Sales, Products, Shipments, Customers.
### Technologies used
- Python
- PostgreSQL
- Airflow
- Terraform (Infrastructure provisioning tool)
- AWS services : S3, Redshift (data warehouse)
- Docker
## 2. Implementation overview
Design data models for OLTP database (PostgreSQL) and data warehouse (Amazon Redshift). Build an ETL pipeline to transform raw data into actionable insights in PostgreSQL, also store them in S3 for staging. Then implement another ETL pipeline which process data from S3 and load them to Amazon Redshift for enhanced data analytics . Using Airflow to orchestrate pipeline workflow, Terraform for setting up AWS Redshift cluster, and Docker to containerize the project - allow for fast build, test, and deploy project.
## 3. Design
Data model for Postgres
Data model (star schema) for Redshift
Airflow workflow
## 4. Project Structure
```bash
Batch-Processing/
├── airflow/
│ ├── dags/
│ │ ├── dags_setup.py
│ │ ├── ETL_psql
│ │ │ ├── Extract
│ │ │ │ └── Extract.py
│ │ │ ├── Load/
│ │ │ │ └── Load_psql.py
│ │ │ └── Transform
│ │ │ ├── Rename_col_df.py
│ │ │ ├── Transform.py
│ │ │ ├── Transform_customers.py
│ │ │ ├── Transform_locations.py
│ │ │ ├── Transform_products.py
│ │ │ ├── Transform_shipments.py
│ │ │ └── Transfrom_sales.py
│ │ └── ETL_redshift
│ │ ├── ETL_psql_s3.py
│ │ └── Load_s3_to_redshift.py
│ └── logs
├── postgreSQL_setup
│ └── create_pgsql_schema.sql
├── redshift_setup
│ └── create_redshift_schema.sql
├── docker
│ ├── Dockerfile
│ └── requirements.txt
├── docker-compose.yaml
├── Implementation detail.md
├── assets
│ └── Many images.png
├── Input_data
├── Transformed_data
├── Makefile
├── terraform
│ ├── main.tf
│ ├── terraform.tfvars
│ └── variables.tf
└── readme.md
```
## 5. Settings
### Prerequisites
- AWS account
- Terraform
- Docker
### Important note
You must specify AWS credentials for each of the following
- S3 access : Create an IAM user "S3-admin" with S3FullAccess policy, create access credentials and add them to [Extract.py](/airflow/dags/ETL_psql/Extract/Extract.py) and [ETL_psql_s3.py](/airflow/dags/ETL_redshift/ETL_psql_s3.py)
- Redshift access : Create an IAM user "Redshift-admin" with RedshiftFulAccess policy, create accesss credentials and add them to [Load_s3_to_redshift.py](/airflow/dags/ETL_redshift/Load_s3_to_redshift.py).
- Administrator access : Create an IAM user "Admin" with AdministratorAccess policy, create accesss credentials and add them to [terraform.tfvars](/terraform/terraform.tfvars) and [Makefile](/Makefile). This IAM user responsible for provisioning Redshift cluster, and add connection between airflow and Redshift cluster .
### AWS Infrastructure
- Two dc2.large type nodes for Redshift cluster
- Redshift cluster type : multi-node
- Redshift cluster is put inside a VPC (10.10.0.0/16) , redshift subnet group consists of 2 subnets "Subnet for redshift az 1"(10.10.0.0/24) and "Subnet for redshift az 2" (10.10.1.0/24) , each subnet is put in an Availability zone.
- These two subnets associate with a public route table (outbound traffic to the public internet through the Internet Gateway).
- Redshift security group allows all inbound traffic from port 5439.
- Finally, IAM role is created for Redshift with full S3 Access.
- Create redshift cluster.
```python
resource "aws_redshift_cluster" "sale_redshift_cluster" {
cluster_identifier = var.redshift_cluster_identifier
database_name = var.redshift_database_name
master_username = var.redshift_master_username
master_password = var.redshift_master_password
node_type = var.redshift_node_type
cluster_type = var.redshift_cluster_type
number_of_nodes = var.redshift_number_of_nodes
iam_roles = [aws_iam_role.redshift_iam_role.arn]
cluster_subnet_group_name = aws_redshift_subnet_group.redshift_subnet_group.id
skip_final_snapshot = true
tags = {
Name = "vupham_redshift_cluster"
}
}
```
### Docker
```Python
# ./docker/Dockerfile
FROM apache/airflow:2.5.1
COPY requirements.txt /
RUN pip install --no-cache-dir -r /requirements.txt
```
[Dockerfile](/docker/Dockerfile) build a custom images with apache-airflow:2.5.1 and libraries in 'requirements.txt'
```python
# ./docker/requirements.txt
redshift_connector
pandas
apache-airflow-providers-amazon==8.1.0
apache-airflow-providers-postgres==5.4.0
boto3==1.26.148
psycopg2-binary==2.9.6
```
[docker-compose](/docker-compose.yaml) will build containers to run our application.
### Running
Please refer to [Makefile](/Makefile) for more details
```
# Clone and cd into the project directory
git clone https://github.com/anhvuphamtan/Batch-Processing.git
cd Batch-Processing
# Start docker containers on your local computer
make up
# Add airflow connections : postgres-redshift-aws connections
make connections
# Set up cloud infrastructure
make infra-init # Only need in the first run
# Notes : Please configure your own AWS access & secret keys in terraform.tfvars and /airflow/dags/ETL_redshift/Load_s3_to_redshift.py in order to create redshift cluster and connect to it
make infra-up # Build cloud infrastructure
```
## 6. Implementation
### Refer to [Implementation detail.md](/Implementation%20detail.md) for more details on implementation
### 6.1 Load sales data into PostgreSQL database
Airflow tasks
```python
# ./dags_setup.py # Airflow dags
# -------------- Create schema task ------------- #
Create_psql_schema = PostgresOperator(
task_id = 'Create_psql_schema',
postgres_conn_id = 'postgres_sale_db',
sql = 'create_pgsql_schema.sql'
)
# ---------------------------------------------- #
# ---------------- Extract task ---------------- #
Extract_from_source = PythonOperator(
task_id = 'Extract_from_source',
python_callable = Extract_from_source
)
# ---------------------------------------------- #
# ---------------- Transform task ---------------- #
Transform_products = PythonOperator(
task_id = "Transform_product_df",
python_callable = Transform_products,
op_kwargs = {"Name" : "products", "filePath" : "products.csv"}
)
.....
Transform_shipments = PythonOperator(
task_id = "Transform_shipment_df",
python_callable = Transform_shipments,
op_kwargs = {"Name" : "shipments", "filePath" : "shipments.csv"}
)
# ----------------------------------------------- #
# ----------------- Load task ----------------- #
Load_psql = PythonOperator(
task_id = "Load_to_psql",
python_callable = Load_schema
)
# -------------------------------------------- #
```
1. Create_psql_schema : Create PostgreSQL schema and its tables according to our data model design.
2. Extract_from_source : Extract raw data from s3 bucket and store them in Input_data folder.
3. Perform transformation : This part split into 5 small tasks, each handle the data transformation on a specific topic.
There are 6 python files : Transform.py , Transform_name.py where name correspond to a topic ['sales', 'products', 'customers', 'shipments', 'locations'].
Each Transform_name.py responsible for cleaning, transforming and integrating to a corresponding OLTP table. Python class is used,
all they all inherit from the parent class in Transform.py :
4. Load_to_psql : Load all transformed data into PostgreSQL database.
### 6.2 Load data from PostgreSQL to Amazon Redshift
Airflow tasks
```python
# ./dags_setup.py # Airflow dags
ETL_s3 = PythonOperator(
task_id = "ETL_s3",
python_callable = ETL_s3
)
Create_redshift_schema = PythonOperator(
task_id = "Create_redshift_schema",
python_callable = Create_redshift_schema,
op_kwargs = {"root_dir" : "/opt/airflow/redshift_setup"}
)
Load_s3_redshift = PythonOperator(
task_id = "Load_s3_redshift",
python_callable = Load_s3_to_redshift
)
```
1. ETL_s3 : Extract data from PostgreSQL database, perform transformation, and load to S3 bucket
2. Create_redshift_schema : Create redshift schema
3. Load_s3_redshift : Load data from S3 bucket to Redshift
## 7. Visualize result
Connect redshift to metabase and visualize results
Connect to metabase
### Results
Revenue by month in 2022
Brand popularity
Profit by state
Shipping orders by company