{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# \"Data Engineering - Week 2\"\n", "> \"Week 2 - Data Engineering Zoomcamp course: Data ingestion\"\n", "\n", "- toc: True\n", "- branch: master\n", "- badges: true\n", "- comments: true\n", "- categories: [data engineering, mlops]\n", "- image: images/some_folder/your_image.png\n", "- hide: false\n", "- search_exclude: true" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Note**: The content of this post is from the course videos, my understandings and searches, and reference documentations." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "> youtube: https://youtu.be/W3Zm6rjOq70\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Data Lake\n", "\n", "![](images/data-engineering-w2/1.png)\n", "\n", "A data lake is a collection of technologies that enables querying of data contained in files or blob objects. When used effectively, they enable massive scale and cost-effective analysis of structured and unstructured data assets [[source](https://lakefs.io/data-lakes/)].\n", "\n", "Data lakes are comprised of four primary components: storage, format, compute, and metadata layers [[source](https://lakefs.io/data-lakes/)].\n", "\n", "![](images/data-engineering-w2/2.png)\n", "\n", "\n", "A data lake is a centralized repository for large amounts of data from a variety of sources. Data can be structured, semi-structured, or unstructured in general.\n", "The goal is to rapidly ingest data and make it available to or accessible to other team members such as data scientists, analysts, and engineers.\n", "The data lake is widely used for machine learning and analytical solutions.\n", "Generally, when you store data in a data lake, you associate it with some form of metadata to facilitate access. Generally, a data lake solution must be secure and scalable.\n", "Additionally, the hardware should be affordable. The reason for this is that you want to store as much data as possible quickly.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Data Lake vs Data Warehouse\n", "\n", "![](images/data-engineering-w2/3.png)\n", "\n", "\n", "Generally a data lake is an unstructured data and the target users are data scientists or data analysts. It stores huge amount of data, sometimes in the size of petabytes and terabytes. The use cases which are covered by data lake are basically stream processing, machine learning, and real-time analytics.\n", "On the data warehouse side, the data is generally structured. The users are business analysts, the data size is generally small, and the use case consists of batch processing or BI reporting.\n", "\n", "To read more, please check [here](https://lakefs.io/data-lakes/) and [here](https://luminousmen.com/post/data-lake-vs-data-warehouse).\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# ETL vs ELT\n", "- Extract Transform and Load vs Extract Load and Transform\n", "- ETL is mainly used for a small amount of data whereas ELT is used for large amounts of data\n", "- ELT provides data lake support (Schema on read)\n", "- ETL provides data warehouse solutions\n", "\n", "![](images/data-engineering-w2/4.png)\n", "*[source](https://www.guru99.com/etl-vs-elt.html#:~:text=ETL%20stands%20for%20Extract%2C%20Transform,directly%20into%20the%20target%20system.&text=ETL%2C%20ETL%20is%20mainly%20used,for%20large%20amounts%20of%20data.)*\n", "\n", "![](images/data-engineering-w2/5.png)\n", "*[source](https://www.guru99.com/etl-vs-elt.html#:~:text=ETL%20stands%20for%20Extract%2C%20Transform,directly%20into%20the%20target%20system.&text=ETL%2C%20ETL%20is%20mainly%20used,for%20large%20amounts%20of%20data.)*\n", "\n", "Data lake solutions provided by main cloud providers are as follows:\n", "\n", "- GCP - cloud storage\n", "- AWS - S3\n", "- AZURE - AZURE BLOB\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Workflow Orchestration\n", "\n", "> youtube: https://youtu.be/0yK7LXwYeD0" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We saw a simple data pipeline in week 1. One of the problems in that data pipeline was that we did several important jobs in the same place: downloading data and doing small processing and putting it into postgres. What if after downloading data, some error happens in the code or with the internet? We will lose the downloaded data and should do everything from scratch. That's why we need to do those steps separately. \n", "\n", "A data pipeline is a series of steps for data processing. If the data has not yet been loaded into the data platform, it is ingested at the pipeline's start. Then there is a series of steps, each of which produces an output that serves as the input for the subsequent step. This procedure is repeated until the pipeline is completed. In some instances, independent steps may be performed concurrently. [[source](https://hazelcast.com/glossary/data-pipeline/)].\n", "\n", "\n", "A data pipeline is composed of three critical components: a source, a processing step or series of processing steps, and a destination. The destination may be referred to as a sink in some data pipelines. Data pipelines, for example, enable the flow of data from an application to a data warehouse, from a data lake to an analytics database, or to a payment processing system. Additionally, data pipelines can share the same source and sink, allowing the pipeline to focus entirely on data modification. When data is processed between points A and B (or B, C, and D), there is a data pipeline between those points [[source](https://hazelcast.com/glossary/data-pipeline/)].\n", "\n", "![](images/data-engineering-w2/6.png)\n", "*[source](https://hazelcast.com/glossary/data-pipeline/)*\n", "\n", "In our example, the data pipeline we had in the previous week can be as follows:\n", "\n", "![](images/data-engineering-w2/7.png)\n", "\n", "We separated downloading dataset using `wget` and then ingesting it into postgres. I think we can have even another more step for processing (changing the string to datetime in the downloaded dataset)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "But this week, we will do something more complex. Let's have a look at the data workflow.\n", "\n", "![](images/data-engineering-w2/8.png)\n", "\n", "The above figure is called a DAG (Directed Acyclic Graph). We need to be sure that all steps are done sequentially and we can retry some of the steps if some thing happens and then go to the next step. There are some tools called workflow engines tat allow us to define these DAGs and do the data workflow orchestration:\n", "\n", "- LUIGI\n", "- APACHE AIRFLOW (we will go for this)\n", "- PREFECT\n", "- Google Cloud Dataflow\n", "\n", "Let's get more familiar with the last two ones:\n", "\n", "### Airflow\n", "Airflow is a platform to programmatically author, schedule and monitor workflows.\n", "Use Airflow to author workflows as Directed Acyclic Graphs (DAGs) of tasks. The Airflow scheduler executes your tasks on an array of workers while following the specified dependencies. Rich command line utilities make performing complex surgeries on DAGs a snap. The rich user interface makes it easy to visualize pipelines running in production, monitor progress, and troubleshoot issues when needed.\n", "When workflows are defined as code, they become more maintainable, versionable, testable, and collaborative [[Airflow docs](https://airflow.apache.org/docs/apache-airflow/stable/index.html)].\n", "\n", "\n", "![](images/data-engineering-w2/airflow.gif)\n", "*[Airflow docs](https://airflow.apache.org/docs/apache-airflow/stable/index.html)*\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Google Cloud Dataflow\n", "Real-time data is generated by websites, mobile applications, IoT devices, and other workloads. All businesses make data collection, processing, and analysis a priority. However, data from these systems is frequently not in a format suitable for analysis or effective use by downstream systems. That is where Dataflow enters the picture! Dataflow is used to process and enrich batch or stream data for analysis, machine learning, and data warehousing applications.\n", "\n", "Dataflow is a serverless, high-performance, and cost-effective service for stream and batch processing. It enables portability for processing jobs written in the open source Apache Beam libraries and alleviates operational burden on your data engineering teams by automating infrastructure provisioning and cluster management [[Google cloud docs](https://cloud.google.com/blog/topics/developers-practitioners/dataflow-backbone-data-analytics)]. \n", "\n", "\n", "![](images/data-engineering-w2/9.jpeg)\n", "*[Google cloud docs](https://cloud.google.com/blog/topics/developers-practitioners/dataflow-backbone-data-analytics)*\n", "\n", "\n", "[Here](https://stackshare.io/stackups/airflow-vs-google-cloud-dataflow) is a comparison between Airflow and Google cloud dataflow.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Google Cloud Composer\n", "\n", "Google also provides a fully managed service for workflow orchestration built on the Apache Airflow called [Google Cloud Composer](https://cloud.google.com/composer?utm_source=google&utm_medium=cpc&utm_campaign=emea-emea-all-en-dr-skws-all-all-trial-e-gcp-1011340&utm_content=text-ad-none-any-DEV_c-CRE_556004442652-ADGP_Hybrid%20%7C%20SKWS%20-%20EXA%20%7C%20Txt%20~%20Data%20Analytics%20~%20Cloud%20Composer-KWID_43700067144729219-aud-606988878374%3Akwd-427702736595-userloc_1005582&utm_term=KW_cloud%20composer-NET_g-PLAC_&gclid=Cj0KCQiAgP6PBhDmARIsAPWMq6ms9Wx1uZ8P_OXQdS-Sj-4hgI9XZxmrrqbaq2hDBOaxxWLfhpFawIQaAmPJEALw_wcB&gclsrc=aw.ds#section-4). \n", "\n", "Cloud Composer is a fully managed workflow orchestration service, enabling you to create, schedule, monitor, and manage workflows that span across clouds and on-premises data centers.\n", "Cloud Composer is built on the popular Apache Airflow open source project and operates using the Python programming language.\n", "By using Cloud Composer instead of a local instance of Apache Airflow, you can benefit from the best of Airflow with no installation or management overhead. Cloud Composer helps you create Airflow environments quickly and use Airflow-native tools, such as the powerful Airflow web interface and command-line tools, so you can focus on your workflows and not your infrastructure [[gcp docs](https://cloud.google.com/composer/docs/concepts/overview)]. \n", "\n", "[Here](https://stackoverflow.com/a/62771505) is a good comparison between Google Cloud Composer and Airflow on Docker and on-premise: \n", "\n", "\n", "> Cloud Composer is a GCP managed service for Airflow. Composer runs in something known as a Composer environment, which runs on Google Kubernetes Engine cluster. It also makes use of various other GCP services such as:\n", ">\n", "> - Cloud SQL - stores the metadata associated with Airflow,\n", ">\n", "> - App Engine Flex - Airflow web server runs as an App Engine Flex application, which is protected using an Identity-Aware Proxy,\n", ">\n", "> - GCS bucket - in order to submit a pipeline to be scheduled and run on Composer, all that we need to do is to copy out Python code into a GCS bucket. Within that, it'll have a folder called DAGs. Any Python code uploaded into that folder is automatically going to be picked up and processed by Composer.\n", ">\n", "> How Cloud Composer benefits?\n", ">\n", "> - Focus on your workflows, and let Composer manage the infrastructure (creating the workers, setting up the web server, the message brokers),\n", ">\n", "> - One-click to create a new Airflow environment,\n", ">\n", "> - Easy and controlled access to the Airflow Web UI,\n", ">\n", "> - Provide logging and monitoring metrics, and alert when your workflow is not running,\n", ">\n", "> - Integrate with all of Google Cloud services: Big Data, Machine Learning and so on. Run jobs elsewhere, i.e. other cloud provider (Amazon).\n", ">\n", "> Of course you have to pay for the hosting service, but the cost is low compare to if you have to host a production airflow server on your own.\n", ">\n", "> Airflow on-premise\n", ">\n", "> - DevOps work that need to be done: create a new server, manage Airflow installation, takes care of dependency and package management, check server health, scaling and security.\n", ">\n", "> - pull an Airflow image from a registry and creating the container\n", ">\n", "> - creating a volume that maps the directory on local machine where DAGs are held, and the locations where Airflow reads them on the container,\n", ">\n", "> - whenever you want to submit a DAG that needs to access GCP service, you need to take care of setting up credentials. Application's service account should be created and downloaded as a JSON file that contains the credentials. This JSON file must be linked into your docker container and the GOOGLE_APPLICATION_CREDENTIALS environment variable must contain the path to the JSON file inside the container.\n", ">\n", "> To sum up, if you don’t want to deal with all of those DevOps problem, and instead just want to focus on your workflow, then Google Cloud composer is a great solution for you." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To compare Google Dataflow and Google Cloud Composer, I found [this](https://stackoverflow.com/a/54155104) stackoverflow answer very interesting:\n", "\n", ">For the basics of your described task, Cloud Dataflow is a good choice. Big data that can be processed in parallel is a good choice for Cloud Dataflow.\n", ">\n", ">The real world of processing big data is usually messy. Data is usually somewhat to very dirty, arrives constantly or in big batches and needs to be processed in time sensitive ways. Usually it takes the coordination of more than one task / system to extract desired data. Think of load, transform, merge, extract and store types of tasks. Big data processing is often glued together using using shell scripts and / or Python programs. This makes automation, management, scheduling and control processes difficult.\n", ">\n", ">Google Cloud Composer is a big step up from Cloud Dataflow. Cloud Composer is a cross platform orchestration tool that supports AWS, Azure and GCP (and more) with management, scheduling and processing abilities.\n", ">\n", ">Cloud Dataflow handles tasks. Cloud Composer manages entire processes coordinating tasks that may involve BigQuery, Dataflow, Dataproc, Storage, on-premises, etc.\n", ">\n", ">If you need / require more management, control, scheduling, etc. of your big data tasks, then Cloud Composer adds significant value. If you are just running a simple Cloud Dataflow task on demand once in a while, Cloud Composer might be overkill.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this post, we just review Airflow and how to use it on Docker. Let's get started." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Airflow \n", "\n", "> youtube: https://youtu.be/lqDMzReAtrw\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "![](images/data-engineering-w2/10.png)\n", "*[Airflow architecture](https://airflow.apache.org/docs/apache-airflow/stable/concepts/overview.html)*\n", "\n", "\n", "Let's review the Airflow architecture. An Airflow installation generally consists of the following components:\n", "\n", "- **Web server**: GUI to inspect, trigger and debug the behaviour of DAGs and tasks. Available at http://localhost:8080.\n", "\n", "- **Scheduler**: Responsible for scheduling jobs. Handles both triggering & scheduled workflows, submits Tasks to the executor to run, monitors all tasks and DAGs, and then triggers the task instances once their dependencies are complete.\n", "\n", "- **Worker**: This component executes the tasks given by the scheduler.\n", "\n", "- **Metadata database (postgres)**: Backend to the Airflow environment. Used by the scheduler, executor and webserver to store state.\n", "\n", "Other components (seen in docker-compose services):\n", "\n", "- *redis*: Message broker that forwards messages from scheduler to worker.\n", "- *flower*: The flower app for monitoring the environment. It is available at http://localhost:5555.\n", "- *airflow-init*: initialization service (customized as per this design)\n", "\n", "Please read more about Airflow architecture [here](https://airflow.apache.org/docs/apache-airflow/stable/concepts/overview.html#architecture-overview) before continuing the blog post." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let's install airflow environment using docker. \n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You may need Python version 3.7+.\n", "\n", "You may also need to upgrade your docker-compose version to v2.x+ (as suggested in the course - however airflow documentation suggests v1.29.1 or newer).\n", "\n", "Default amount of memory available for Docker on MacOS is often not enough to get Airflow up and running. If enough memory is not allocated, it might lead to airflow webserver continuously restarting. You should at least allocate 4GB memory for the Docker Engine (ideally 8GB). You can check and change the amount of memory in Resources\n", "\n", "You can also check if you have enough memory by running this command [[Airflow docs](https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html)]:\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```bash\n", "docker run --rm \"debian:buster-slim\" bash -c 'numfmt --to iec $(echo $(($(getconf _PHYS_PAGES) * $(getconf PAGE_SIZE))))'\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For me, this is 16 GB:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "Unable to find image 'debian:buster-slim' locally\n", "buster-slim: Pulling from library/debian\n", "6552179c3509: Pull complete \n", "Digest: sha256:f6e5cbc7eaaa232ae1db675d83eabfffdabeb9054515c15c2fb510da6bc618a7\n", "Status: Downloaded newer image for debian:buster-slim\n", "16G" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "If enough memory is not allocated, it might lead to airflow-webserver continuously restarting. I used [this](https://stackoverflow.com/questions/49839028/how-to-upgrade-docker-compose-to-latest-version#:~:text=First%2C%20remove%20the%20old%20version%3A) answer to update mine. For limiting memory, it is easy to do it in mac and windows like [here](https://stackoverflow.com/questions/44533319/how-to-assign-more-memory-to-docker-container) and for linux you can check [here](https://phoenixnap.com/kb/docker-memory-and-cpu-limit).\n", "\n", "To deploy Airflow on Docker Compose, you should fetch `docker-compose.yaml`.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.2.3/docker-compose.yaml'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This file contains several service definitions:\n", "\n", "- airflow-scheduler - The scheduler monitors all tasks and DAGs, then triggers the task instances once their dependencies are complete. Behind the scenes, the scheduler spins up a subprocess, which monitors and stays in sync with all DAGs in the specified DAG directory. Once per minute, by default, the scheduler collects DAG parsing results and checks whether any active tasks can be triggered [[ref](https://airflow.apache.org/docs/apache-airflow/stable/concepts/scheduler.html)].\n", "- airflow-webserver - The webserver is available at http://localhost:8080.\n", "- airflow-worker - The worker that executes the tasks given by the scheduler.\n", "- airflow-init - The initialization service.\n", "- flower - The [flower](https://flower.readthedocs.io/en/latest/) app is a web based tool for monitoring the environment. It is available at http://localhost:5555.\n", "- postgres - The database.\n", "- redis - The [redis](https://redis.io/) - broker that forwards messages from scheduler to worker.\n", "\n", "All these services allow you to run Airflow with [CeleryExecutor](https://airflow.apache.org/docs/apache-airflow/stable/executor/celery.html). For more information, see [Architecture Overview](https://airflow.apache.org/docs/apache-airflow/stable/concepts/overview.html).\n", "\n", "Some directories in the container are mounted, which means that their contents are synchronized between your computer and the container.\n", "\n", "- ./dags - you can put your DAG files here.\n", "- ./logs - contains logs from task execution and scheduler.\n", "- ./plugins - you can put your custom [plugins](https://airflow.apache.org/docs/apache-airflow/stable/plugins.html) here. Airflow has a simple plugin manager built-in that can integrate external features to its core by simply dropping files in your $AIRFLOW_HOME/plugins folder.\n", "\n", "Here is the architecture of `docker-compose.yaml` file:" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```bash\n", "version: '3'\n", "x-airflow-common:\n", " &airflow-common\n", " # In order to add custom dependencies or upgrade provider packages you can use your extended image.\n", " # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml\n", " # and uncomment the \"build\" line below, Then run `docker-compose build` to build the images.\n", " image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.3}\n", " # build: .\n", " environment:\n", " &airflow-common-env\n", " AIRFLOW__CORE__EXECUTOR: CeleryExecutor\n", " AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow\n", " AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow\n", " AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0\n", " AIRFLOW__CORE__FERNET_KEY: ''\n", " AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'\n", " AIRFLOW__CORE__LOAD_EXAMPLES: 'true'\n", " AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'\n", " _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}\n", " volumes:\n", " - ./dags:/opt/airflow/dags\n", " - ./logs:/opt/airflow/logs\n", " - ./plugins:/opt/airflow/plugins\n", " user: \"${AIRFLOW_UID:-50000}:0\"\n", " depends_on:\n", " &airflow-common-depends-on\n", " redis:\n", " condition: service_healthy\n", " postgres:\n", " condition: service_healthy\n", "\n", "services:\n", " postgres:\n", " ...\n", "\n", " redis:\n", " ...\n", "\n", " airflow-webserver:\n", " ...\n", "\n", " airflow-scheduler:\n", " ...\n", "\n", " airflow-worker:\n", " ...\n", "\n", " airflow-triggerer:\n", " ...\n", "\n", " airflow-init:\n", " ...\n", "\n", " airflow-cli:\n", " ...\n", " \n", " flower:\n", " ...\n", "\n", "volumes:\n", " postgres-db-volume:\n", " ```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The above file uses the latest Airflow image ([apache/airflow](https://hub.docker.com/r/apache/airflow)). If you need to install a new Python library or system library, you can build your image.\n", "\n", "When running Airflow locally, you may wish to use an extended image that includes some additional dependencies - for example, you may wish to add new Python packages or upgrade the airflow providers to a newer version. This is accomplished by including a custom Dockerfile alongside your `docker-compose.yaml` file. Then, using the `docker-compose build` command, you can create your image (you need to do it only once). Additionally, you can add the `--build` flag to your `docker-compose` commands to automatically rebuild the images when other `docker-compose` commands are run. To learn more and see additional examples, visit [here](https://airflow.apache.org/docs/docker-stack/build.html) [[Airflow docs](https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html)]." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Ingest Data to GCS and BigQuery using Airflow \n", "\n", "First, there are some pre-requisites. For the sake of standardization across this tutorial's config, rename your gcp-service-accounts-credentials file to `google_credentials.json` and store it in your `$HOME` directory:" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```bash\n", "cd ~ && mkdir -p ~/.google/credentials/\n", "mv .json ~/.google/credentials/google_credentials.json\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In order to use airflow with GCP, we have changed the `docker-compose.yaml` file in this course as follows:\n", "\n", "- instead of using the official airflow image as the base image, we use a custom docker file to build and start from.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```bash\n", " build:\n", " context: .\n", " dockerfile: ./Dockerfile\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "- disable loading the DAG examples that ship with Airflow. It’s good to get started, but you probably want to set this to False in a production environment" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "AIRFLOW__CORE__LOAD_EXAMPLES: 'false'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "- add GCP environment variables (you need to use your own gcp project id and the gcs bucket you created in the previous week)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```bash\n", "GOOGLE_APPLICATION_CREDENTIALS: /.google/credentials/google_credentials.json\n", "AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT: 'google-cloud-platform://?extra__google_cloud_platform__key_path=/.google/credentials/google_credentials.json'\n", "GCP_PROJECT_ID: 'pivotal-surfer-336713'\n", "GCP_GCS_BUCKET: \"dtc_data_lake_pivotal-surfer-336713\"\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "- add the folder we created at the beginning of the post for google credentials." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "- ~/.google/credentials/:/.google/credentials:ro" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here is the beginning of the file after our modifications:" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```bash\n", " build:\n", " context: .\n", " dockerfile: ./Dockerfile\n", " environment:\n", " &airflow-common-env\n", " AIRFLOW__CORE__EXECUTOR: CeleryExecutor\n", " AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow\n", " AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow\n", " AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0\n", " AIRFLOW__CORE__FERNET_KEY: ''\n", " AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'\n", " AIRFLOW__CORE__LOAD_EXAMPLES: 'false'\n", " AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'\n", " _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}\n", " GOOGLE_APPLICATION_CREDENTIALS: /.google/credentials/google_credentials.json\n", " AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT: 'google-cloud-platform://?extra__google_cloud_platform__key_path=/.google/credentials/google_credentials.json'\n", " GCP_PROJECT_ID: 'pivotal-surfer-336713'\n", " GCP_GCS_BUCKET: \"dtc_data_lake_pivotal-surfer-336713\"\n", "\n", " volumes:\n", " - ./dags:/opt/airflow/dags\n", " - ./logs:/opt/airflow/logs\n", " - ./plugins:/opt/airflow/plugins\n", " - ~/.google/credentials/:/.google/credentials:ro\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Following is the custom Dockerfile whcich is placed inside the `airflow` folder.\n", "The Dockerfile has the custom packages to be installed. The one we'll need the most is `gcloud` to connect with the GCS bucket/Data Lake." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```bash\n", "# First-time build can take upto 10 mins.\n", "\n", "FROM apache/airflow:2.2.3\n", "\n", "ENV AIRFLOW_HOME=/opt/airflow\n", "\n", "USER root\n", "RUN apt-get update -qq && apt-get install vim -qqq\n", "# git gcc g++ -qqq\n", "\n", "COPY requirements.txt .\n", "RUN pip install --no-cache-dir -r requirements.txt\n", "\n", "\n", "# Ref: https://airflow.apache.org/docs/docker-stack/recipes.html\n", "\n", "SHELL [\"/bin/bash\", \"-o\", \"pipefail\", \"-e\", \"-u\", \"-x\", \"-c\"]\n", "\n", "ARG CLOUD_SDK_VERSION=322.0.0\n", "ENV GCLOUD_HOME=/home/google-cloud-sdk\n", "\n", "ENV PATH=\"${GCLOUD_HOME}/bin/:${PATH}\"\n", "\n", "RUN DOWNLOAD_URL=\"https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-sdk-${CLOUD_SDK_VERSION}-linux-x86_64.tar.gz\" \\\n", " && TMP_DIR=\"$(mktemp -d)\" \\\n", " && curl -fL \"${DOWNLOAD_URL}\" --output \"${TMP_DIR}/google-cloud-sdk.tar.gz\" \\\n", " && mkdir -p \"${GCLOUD_HOME}\" \\\n", " && tar xzf \"${TMP_DIR}/google-cloud-sdk.tar.gz\" -C \"${GCLOUD_HOME}\" --strip-components=1 \\\n", " && \"${GCLOUD_HOME}/install.sh\" \\\n", " --bash-completion=false \\\n", " --path-update=false \\\n", " --usage-reporting=false \\\n", " --quiet \\\n", " && rm -rf \"${TMP_DIR}\" \\\n", " && gcloud --version\n", "\n", "WORKDIR $AIRFLOW_HOME\n", "\n", "USER $AIRFLOW_UID\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `requirements.txt` file in the Dockerfile which contains the required pyton packages is as follows:" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```bash\n", "apache-airflow-providers-google\n", "pyarrow\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In case you don't want to see so many services as it is done in the above `docker-compose.yaml` file, you can use the following one which is placed in the `week_2_data_ingestion/airflow/extras` folder in the course github repo:" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```bash\n", "version: '3.7'\n", "services:\n", " webserver:\n", " container_name: airflow\n", " build:\n", " context: ..\n", " dockerfile: ../Dockerfile\n", " environment:\n", " - PYTHONPATH=/home/airflow\n", " # airflow connection with SQLAlchemy container\n", " - AIRFLOW__CORE__SQL_ALCHEMY_CONN=sqlite:///$AIRFLOW_HOME/airflow.db\n", " - AIRFLOW__CORE__EXECUTOR=LocalExecutor\n", " # disable example loading\n", " - AIRFLOW__CORE__LOAD_EXAMPLES=FALSE\n", "\n", " volumes:\n", " - ./dags:/home/airflow/dags\n", " # user: \"${AIRFLOW_UID:-50000}:0\"\n", " ports:\n", " - \"8080:8080\"\n", " command: > # airflow db upgrade;\n", " bash -c \"\n", " airflow scheduler -D;\n", " rm /home/airflow/airflow-scheduler.*;\n", " airflow webserver\"\n", " healthcheck:\n", " test: [ \"CMD-SHELL\", \"[ -f /home/airflow/airflow-webserver.pid ]\" ]\n", " interval: 30s\n", " timeout: 30s\n", " retries: 3\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We will not use this file to avoid any confusion." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "There is also another lightweight and less memory-intensive docker-compose file in the [github repo](https://github.com/DataTalksClub/data-engineering-zoomcamp/tree/main/week_2_data_ingestion/airflow) which can be used. \n", "\n", "There is another one from one of the students [here](https://gist.github.com/nervuzz/d1afe81116cbfa3c834634ebce7f11c5) which sounds interesting too." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Before starting Airflow for the first time, You need to prepare your environment, i.e. create the necessary files, directories and initialize the database.\n", "\n", "\n", "On Linux, the quick-start needs to know your host user id and needs to have group id set to `0`. Otherwise the files created in `dags`, `logs` and `plugins` will be created with `root` user. You have to make sure to configure them for the docker-compose: (run it inside the `airflow` folder where the `docker-compose.yaml` file is placed) [[Airflow docs](https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html)]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```bash\n", "mkdir -p ./dags ./logs ./plugins\n", "echo -e \"AIRFLOW_UID=$(id -u)\" > .env\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For other operating systems, you will get warning that `AIRFLOW_UID` is not set, but you can ignore it. You can also manually create the `.env` file in the same folder your `docker-compose.yaml` is placed with this content to get rid of the warning:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "AIRFLOW_UID=1000" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Read more about environment variables for compose [here](https://docs.docker.com/compose/environment-variables/). It seems that when we run `docker-compose up`, it looks for `.env` file in the same directory and uses the variables in that file." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Then we need to initialize the database. On all operating systems, you need to run database migrations and create the first user account. To do it, run.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```bash\n", "docker-compose build\n", "docker-compose up airflow-init\n", "docker-compose up\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You may also some error, but you can ignore them as they are for some services in the official docker compose file that we do not use.\n", "\n", "You can check which services are up using:" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```bash\n", "docker-compose ps\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For me, the output is as follows:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "airflow-airflow-scheduler-1 \"/usr/bin/dumb-init …\" airflow-scheduler running (healthy) 8080/tcp\n", "airflow-airflow-triggerer-1 \"/usr/bin/dumb-init …\" airflow-triggerer running (healthy) 8080/tcp\n", "airflow-airflow-webserver-1 \"/usr/bin/dumb-init …\" airflow-webserver running (healthy) 0.0.0.0:8080->8080/tcp, :::8080->8080/tcp\n", "airflow-airflow-worker-1 \"/usr/bin/dumb-init …\" airflow-worker running (healthy) 8080/tcp\n", "airflow-flower-1 \"/usr/bin/dumb-init …\" flower running (healthy) 0.0.0.0:5555->5555/tcp, :::5555->5555/tcp\n", "airflow-postgres-1 \"docker-entrypoint.s…\" postgres running (healthy) 5432/tcp\n", "airflow-redis-1 \"docker-entrypoint.s…\" redis running (healthy) 6379/tcp" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "There are several ways to interact with it:\n", "\n", "- by running [CLI commands](https://airflow.apache.org/docs/apache-airflow/stable/usage-cli.html).\n", "- via a browser using the [web interface](https://airflow.apache.org/docs/apache-airflow/stable/ui.html).\n", "- using the [REST API](https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html).\n", "\n", "For the web interface, you can go to this address: `http://0.0.0.0:8080/`\n", "\n", "The airflow UI will be like this:\n", "\n", "![](images/data-engineering-w2/11.png)\n", "\n", "The account created has the login `airflow` and the password `airflow`. After log in you will see two generated dags from the `week_2_data_ingestion/airflow/dags` folder.\n", "\n", "> youtube: https://youtu.be/9ksX9REfL8w" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A Workflow has the following components: \n", "\n", "- `DAG`: Directed acyclic graph, specifies the dependencies between a set of tasks with explicit execution order, and has a beginning as well as an end. (Hence, “acyclic”)\n", "\n", " - `DAG Structure`: DAG Definition, Tasks (eg. Operators), Task Dependencies (control flow: `>>` or `<<` )\n", "\n", "- `Task`: a defined unit of work (aka, operators in Airflow). The Tasks themselves describe what to do, be it fetching data, running analysis, triggering other systems, or more.\n", "\n", " - Common Types: Operators (used in this workshop), Sensors, TaskFlow decorators\n", " - Sub-classes of Airflow's BaseOperator\n", "\n", "- `DAG Run`: individual execution/run of a DAG\n", "\n", " - scheduled or triggered\n", "\n", "- `Task Instance`: an individual run of a single task. Task instances also have an indicative state, which could be “running”, “success”, “failed”, “skipped”, “up for retry”, etc.\n", "\n", " - Ideally, a task should flow from `none`, to `scheduled`, to `queued`, to `running`, and finally to `success`.\n", "\n", "\n", "Let's look at how to use Airflow to ingest data into GCP. To do so, we'll need to create a DAG object. One thing to remember is that this Airflow Python script is really just a configuration file that specifies the DAG's structure as code. The tasks defined here will be executed in a context distinct from that of this script. This script cannot be used to cross-communicate between tasks because different tasks run on different workers at different times. Note that we have a more advanced feature called [XComs](https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html) that can be used for this purpose [[Airflow docs](https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html)].\n", "\n", "People mistakenly believe that the DAG definition file is where they can do actual data processing - this is not the case! The goal of the script is to create a DAG object. It must evaluate quickly (seconds, not minutes) because the scheduler will run it on a regular basis to reflect any changes [[Airflow docs](https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html)].\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The structure of a DAG file is as follows:\n", "\n", "```python\n", "# Imports\n", "from airflow import DAG\n", "...\n", "\n", "default_args = {\n", " ...\n", "}\n", "\n", "with DAG(\n", " 'tutorial',\n", " default_args=default_args,\n", " description='A simple tutorial DAG',\n", " schedule_interval=timedelta(days=1),\n", " start_date=datetime(2021, 1, 1),\n", " catchup=False,\n", " tags=['example'],\n", ") as dag:\n", "\n", " # t1, t2 and t3 are examples of tasks created by instantiating operators\n", " t1 = BashOperator(\n", " task_id='print_date',\n", " bash_command='date',\n", " )\n", "\n", " t2 = ...\n", "\n", " t3 = ...\n", "\n", " ## defining task dependencies\n", " t1 >> [t2, t3]\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "- An Airflow pipeline is just a Python script that happens to define an Airflow DAG object.\n", "- We have the choice to explicitly pass a set of arguments to each task’s constructor (which would become redundant), or (better!) we can define a dictionary of default parameters that we can use when creating tasks.\n", "- We’ll need a DAG object to nest our tasks into. Here we pass a string that defines the dag_id, which serves as a unique identifier for your DAG. We also pass the default argument dictionary that we just defined and define a schedule_interval of 1 day for the DAG.\n", "- Tasks are generated when instantiating operator objects. An object instantiated from an operator is called a task. The first argument task_id acts as a unique identifier for the task.\n", "- Then we need to define dependencies between tasks.\n", "\n", "You can check more tutorials and examples [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html).\n", "\n", "Now let's see out own DAG file for ingesting `yellow_tripdata_2021-01.csv` dataset into GCP which is place in `week_2_data_ingestion/airflow/dags/data_ingestion_gcs_dag.py`. First let's check the structure:\n", "\n", "- importing python libraries \n", " - in-built airflow like `BashOperator` and `PythonOperator`. There is also `DockerOperator` to run docker in docker!\n", " - `storage` from google cloud library to interact with GCS.\n", " - BigQuery from google airflow provider to interact with BigQuery and create an external table.\n", " - `pyarrow` library for converting dataset type to `parquet` before uploading it to GCS. `parquet` is used more in production and is faster to upload it and also uses less space on GCS.\n", "- setting some variables\n", " - `GCP_PROJECT_ID`, `GCP_GCS_BUCKET` which we set in the `docker-compose.yaml` file under `x-airflow-common` section.\n", " - info about dataset url\n", " - airflow local folder path\n", " - name of the desired parquet file\n", " - BigQuery dataset which can be found from `variables.tf` from terraform folder of week 1 (in `week_1_basics_n_setup/1_terraform_gcp/terraform/variables.tf`). I think the name was `BQ_DATASET` there, but the value is the same `trips_data_all`.\n", "- Some python functions which will be attached to `PythonOperator`s like `format_to_parquet()` and `upload_to_gcs()`. Their names describe their functionality.\n", "- Default arguments which will be used in DAG definition. \n", "- Then the DAG declaration with tasks and their dependencies\n", " - `download_dataset_task`to download the dataset using a bash command.\n", " - `format_to_parquet_task` which call the `format_to_parquet()` function.\n", " - `local_to_gcs_task` which call the `upload_to_gcs()` function.\n", " - `bigquery_external_table_task` to extract schema and create a BigQuery table form the file uploaded to GCS. You can easily run SQL queries on this table.\n", "- Then the workflow for direction of tasks: `download_dataset_task >> format_to_parquet_task >> local_to_gcs_task >> bigquery_external_table_task`" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```python\n", "import os\n", "import logging\n", "\n", "from airflow import DAG\n", "from airflow.utils.dates import days_ago\n", "from airflow.operators.bash import BashOperator\n", "from airflow.operators.python import PythonOperator\n", "\n", "from google.cloud import storage\n", "from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateExternalTableOperator\n", "import pyarrow.csv as pv\n", "import pyarrow.parquet as pq\n", "\n", "PROJECT_ID = os.environ.get(\"GCP_PROJECT_ID\")\n", "BUCKET = os.environ.get(\"GCP_GCS_BUCKET\")\n", "\n", "dataset_file = \"yellow_tripdata_2021-01.csv\"\n", "dataset_url = f\"https://s3.amazonaws.com/nyc-tlc/trip+data/{dataset_file}\"\n", "path_to_local_home = os.environ.get(\"AIRFLOW_HOME\", \"/opt/airflow/\")\n", "parquet_file = dataset_file.replace('.csv', '.parquet')\n", "BIGQUERY_DATASET = os.environ.get(\"BIGQUERY_DATASET\", 'trips_data_all')\n", "\n", "\n", "def format_to_parquet(src_file):\n", " if not src_file.endswith('.csv'):\n", " logging.error(\"Can only accept source files in CSV format, for the moment\")\n", " return\n", " table = pv.read_csv(src_file)\n", " pq.write_table(table, src_file.replace('.csv', '.parquet'))\n", "\n", "\n", "# NOTE: takes 20 mins, at an upload speed of 800kbps. Faster if your internet has a better upload speed\n", "def upload_to_gcs(bucket, object_name, local_file):\n", " \"\"\"\n", " Ref: https://cloud.google.com/storage/docs/uploading-objects#storage-upload-object-python\n", " :param bucket: GCS bucket name\n", " :param object_name: target path & file-name\n", " :param local_file: source path & file-name\n", " :return:\n", " \"\"\"\n", " # WORKAROUND to prevent timeout for files > 6 MB on 800 kbps upload speed.\n", " # (Ref: https://github.com/googleapis/python-storage/issues/74)\n", " storage.blob._MAX_MULTIPART_SIZE = 5 * 1024 * 1024 # 5 MB\n", " storage.blob._DEFAULT_CHUNKSIZE = 5 * 1024 * 1024 # 5 MB\n", " # End of Workaround\n", "\n", " client = storage.Client()\n", " bucket = client.bucket(bucket)\n", "\n", " blob = bucket.blob(object_name)\n", " blob.upload_from_filename(local_file)\n", "\n", "\n", "default_args = {\n", " \"owner\": \"airflow\",\n", " \"start_date\": days_ago(1),\n", " \"depends_on_past\": False,\n", " \"retries\": 1,\n", "}\n", "\n", "# NOTE: DAG declaration - using a Context Manager (an implicit way)\n", "with DAG(\n", " dag_id=\"data_ingestion_gcs_dag\",\n", " schedule_interval=\"@daily\",\n", " default_args=default_args,\n", " catchup=False,\n", " max_active_runs=1,\n", " tags=['dtc-de'],\n", ") as dag:\n", "\n", " download_dataset_task = BashOperator(\n", " task_id=\"download_dataset_task\",\n", " bash_command=f\"curl -sS {dataset_url} > {path_to_local_home}/{dataset_file}\"\n", " )\n", "\n", " format_to_parquet_task = PythonOperator(\n", " task_id=\"format_to_parquet_task\",\n", " python_callable=format_to_parquet,\n", " op_kwargs={\n", " \"src_file\": f\"{path_to_local_home}/{dataset_file}\",\n", " },\n", " )\n", "\n", " # TODO: Homework - research and try XCOM to communicate output values between 2 tasks/operators\n", " local_to_gcs_task = PythonOperator(\n", " task_id=\"local_to_gcs_task\",\n", " python_callable=upload_to_gcs,\n", " op_kwargs={\n", " \"bucket\": BUCKET,\n", " \"object_name\": f\"raw/{parquet_file}\",\n", " \"local_file\": f\"{path_to_local_home}/{parquet_file}\",\n", " },\n", " )\n", "\n", " bigquery_external_table_task = BigQueryCreateExternalTableOperator(\n", " task_id=\"bigquery_external_table_task\",\n", " table_resource={\n", " \"tableReference\": {\n", " \"projectId\": PROJECT_ID,\n", " \"datasetId\": BIGQUERY_DATASET,\n", " \"tableId\": \"external_table\",\n", " },\n", " \"externalDataConfiguration\": {\n", " \"sourceFormat\": \"PARQUET\",\n", " \"sourceUris\": [f\"gs://{BUCKET}/raw/{parquet_file}\"],\n", " },\n", " },\n", " )\n", "\n", " download_dataset_task >> format_to_parquet_task >> local_to_gcs_task >> bigquery_external_table_task\n", "\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's run it. First go to `localhost:8080` and use `airflow` and `airflow` as username and password to log in. Then switch on the `data_ingestion_gcs_dag` and click on that to open and be able to see the tree. You can also swith to graph using the toolbar on top of the page.\n", "\n", "![](images/data-engineering-w2/12.png)\n", "\n", "![](images/data-engineering-w2/13.png)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Then click on the play button on top-right part of the page and select `Trigger DAG`. Note that after running `docker-compose ps` everything should be in the `healthy` mode.\n", "\n", "In case any of the tasks fails, you can check the logs as follows:\n", "\n", "![](images/data-engineering-w2/2.gif)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If the tasks are completed successfully, then you will see the uploaded `parquet` file in GCS bucket and also the table in BigQuery.\n", "\n", "**Note**: All the `PythonOperator` codes (functions called by that) are executed in airflow-workers (which is a container) and files (datasets) are saved there and not in your local machine. If you use `DockerOperator`, you are actually running a docker inside another docker (airflow-worker).\n", "\n", "On finishing your run or to shut down the container/s:" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```bash\n", "docker-compose down\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To stop and delete containers, delete volumes with database data, and download images, run:" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```bash\n", "docker-compose down --volumes --rmi all\n", "or\n", "docker-compose down --volumes --remove-orphans\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Ingest Data to Postgres using Airflow \n", "In order to see another step-by-step tutorial on ingesting data to local postgres using airflow, you can check the following video.\n", "\n", "> youtube: https://youtu.be/s2U8MWJH5xA" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Before, we uploaded data to GCS using the `upload_to_gcs` function. But here, a s we want to ingest data into postgres (which we will run using another docker-compose), we need to use another function. All the steps are the same (check the above video) and we just check the DAG file and ingestion script here which is a modified version of what we used in week 1.\n", "\n", "Let's see the `week_2_data_ingestion/airflow/dags_local/data_ingestion_local.py` file first:" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```python\n", "import os\n", "\n", "from datetime import datetime\n", "\n", "from airflow import DAG\n", "\n", "from airflow.operators.bash import BashOperator\n", "from airflow.operators.python import PythonOperator\n", "\n", "from ingest_script import ingest_callable\n", "\n", "\n", "AIRFLOW_HOME = os.environ.get(\"AIRFLOW_HOME\", \"/opt/airflow/\")\n", "\n", "\n", "PG_HOST = os.getenv('PG_HOST')\n", "PG_USER = os.getenv('PG_USER')\n", "PG_PASSWORD = os.getenv('PG_PASSWORD')\n", "PG_PORT = os.getenv('PG_PORT')\n", "PG_DATABASE = os.getenv('PG_DATABASE')\n", "\n", "\n", "local_workflow = DAG(\n", " \"LocalIngestionDag\",\n", " schedule_interval=\"0 6 2 * *\",\n", " start_date=datetime(2021, 1, 1)\n", ")\n", "\n", "\n", "URL_PREFIX = 'https://s3.amazonaws.com/nyc-tlc/trip+data' \n", "URL_TEMPLATE = URL_PREFIX + '/yellow_tripdata_{{ execution_date.strftime(\\'%Y-%m\\') }}.csv'\n", "OUTPUT_FILE_TEMPLATE = AIRFLOW_HOME + '/output_{{ execution_date.strftime(\\'%Y-%m\\') }}.csv'\n", "TABLE_NAME_TEMPLATE = 'yellow_taxi_{{ execution_date.strftime(\\'%Y_%m\\') }}'\n", "\n", "with local_workflow:\n", " wget_task = BashOperator(\n", " task_id='wget',\n", " bash_command=f'curl -sSL {URL_TEMPLATE} > {OUTPUT_FILE_TEMPLATE}'\n", " )\n", "\n", " ingest_task = PythonOperator(\n", " task_id=\"ingest\",\n", " python_callable=ingest_callable,\n", " op_kwargs=dict(\n", " user=PG_USER,\n", " password=PG_PASSWORD,\n", " host=PG_HOST,\n", " port=PG_PORT,\n", " db=PG_DATABASE,\n", " table_name=TABLE_NAME_TEMPLATE,\n", " csv_file=OUTPUT_FILE_TEMPLATE\n", " ),\n", " )\n", "\n", " wget_task >> ingest_task\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And the ingestion function which is imported in the above script is as follows:" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```python\n", "import os\n", "\n", "from time import time\n", "\n", "import pandas as pd\n", "from sqlalchemy import create_engine\n", "\n", "\n", "def ingest_callable(user, password, host, port, db, table_name, csv_file, execution_date):\n", " print(table_name, csv_file, execution_date)\n", "\n", " engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{db}')\n", " engine.connect()\n", "\n", " print('connection established successfully, instering data...')\n", "\n", " t_start = time()\n", " df_iter = pd.read_csv(csv_file, iterator=True, chunksize=100000)\n", "\n", " df = next(df_iter)\n", "\n", " df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)\n", " df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)\n", "\n", " df.head(n=0).to_sql(name=table_name, con=engine, if_exists='replace')\n", "\n", " df.to_sql(name=table_name, con=engine, if_exists='append')\n", "\n", " t_end = time()\n", " print('inserted the first chunk, took %.3f second' % (t_end - t_start))\n", "\n", " while True: \n", " t_start = time()\n", "\n", " try:\n", " df = next(df_iter)\n", " except StopIteration:\n", " print(\"completed\")\n", " break\n", "\n", " df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)\n", " df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)\n", "\n", " df.to_sql(name=table_name, con=engine, if_exists='append')\n", "\n", " t_end = time()\n", "\n", " print('inserted another chunk, took %.3f second' % (t_end - t_start))\n", "\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we have two `docker-compose.yaml` files: one for this week which runs airflow stuff and one for week 1 for postgres and pgadmin. Let's see how we can connect them. \n", "\n", "When we run the `docker-compose.yaml` file for airflow, it creates a network called `airflow_default` which we will use as an external network to connect the docker-compose for postgres and pgadmin to [[ref](https://stackoverflow.com/questions/38088279/communication-between-multiple-docker-compose-projects)]." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```bash\n", "# docker-compose.yaml of week 1\n", "services:\n", " pgdatabase:\n", " image: postgres:13\n", " environment:\n", " - POSTGRES_USER=root\n", " - POSTGRES_PASSWORD=root\n", " - POSTGRES_DB=ny_taxi\n", " volumes:\n", " - \"./ny_taxi_postgres_data:/var/lib/postgresql/data:rw\"\n", " ports:\n", " - \"5432:5432\"\n", " networks:\n", " - airflow\n", " pgadmin:\n", " image: dpage/pgadmin4\n", " environment:\n", " - PGADMIN_DEFAULT_EMAIL=admin@admin.com\n", " - PGADMIN_DEFAULT_PASSWORD=root\n", " ports:\n", " - \"8080:80\"\n", "\n", "networks:\n", " airflow:\n", " external:\n", " name: airflow_default\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We will have two postgres databases, one for airflow that stores its own metadata and one for uplading our dataset into.\n", "\n", "First run `docker-compose up` in week 1 folder, and then test if you can connect using `pgcli -h localhost -p 5432 -U root -d ny_taxi` command.\n", "\n", "Then we can go to the airflow worker container and see if we can connect to the postgres database that we ran above:" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```bash\n", "docker exec -it bash\n", "# then type python to open python \n", ">> from sqlalchemy import create_engine\n", ">> engine = create_engine('postgresql://root:root@pgdatabase:5432/ny_taxi')\n", ">> engine.connect()\n", "# no error here\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you don't see any error by following the above procedure, it shows that you can connect to the postgres from airflow worker container. So calling the ingestion script from the `PythonOperator` in the DAG file should work. Now you can run both docker-compose files and go to airflow web interface to see the DAGs running.\n", "\n", "On finishing your run or to shut down the container/s:" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```bash\n", "docker-compose down\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To stop and delete containers, delete volumes with database data, and download images, run:" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```bash\n", "docker-compose down --volumes --rmi all\n", "or\n", "docker-compose down --volumes --remove-orphans\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Transfer Service in GCP\n", "\n", "Until now we have used airflow DAGs to download dataset and then push it into GCS. We can also use a service from GCP called transfer service to do this task directly from other cloud providers or local storages. [Storage transfer service](https://cloud.google.com/storage-transfer/docs) is a secure, low-cost services for transferring data from cloud, like AWS or Azure, or on-premises sources. If you search for the transfer service in GCP, there are two separate services, one for cloud and one for on-premises.\n", "\n", "To activate a job, you can use terraform or the UI in GCP. Check the following video to learn how to do it via GCP.\n", "\n", "\n", "> youtube: https://youtu.be/rFOFTfD1uGk" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To learn how to do it via terraform, please check the following video:\n", "\n", "> youtube: https://youtu.be/VhmmbqpIzeI" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Conclusion\n", "\n", "The reviewed procedure in this post for Airflow is not really suitable for production. Usually a combination of Airflow & Kubernetes & Git can be used. You can check the following resources to learn more.\n", "\n", "[Deploying Apache Airflow to Google Kubernetes Engine](https://medium.com/@olivertosky/deploying-apache-airflow-to-google-kubernetes-engine-a72c7db912ee)\n", "\n", "> youtube: https://youtu.be/rSkIa0lREUc\n", "\n", "> youtube: https://youtu.be/3VDeKmxHWYA\n", "\n", "> youtube: https://youtu.be/QgzkB1hcq5s\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can also use the Google Cloud Composer service to have a fully managed Airflow. This would be easier with a bit more cost.\n", "\n", "A Cloud Composer environment is a wrapper around Apache Airflow. Cloud Composer creates the following components for each environment: [[gcp docs](https://cloud.google.com/composer/docs/concepts/features)]\n", "\n", "- GKE cluster: The Airflow schedulers, workers, and Redis Queue run as GKE workloads on a single cluster, and are responsible for processing and executing DAGs. The cluster also hosts other Cloud Composer components like Composer Agent and Airflow Monitoring, which help manage the Cloud Composer environment, gather logs to store in Cloud Logging, and gather metrics to upload to Cloud Monitoring.\n", "- Web server: The web server runs the Apache Airflow web interface, and Identity-Aware Proxy protects the interface. For more information, see Airflow Web Interface.\n", "- Database: The database holds the Apache Airflow metadata.\n", "- Cloud Storage bucket: Cloud Composer associates a Cloud Storage bucket with the environment. The associated bucket stores the DAGs, logs, custom plugins, and data for the environment. For more information about the storage bucket for Cloud Composer, see Data Stored in Cloud Storage.\n", "\n", "To access and manage your Airflow environments, you can use the following Airflow-native tools:\n", "\n", "- Web interface: You can access the Airflow web interface from the Google Cloud Console or by direct URL with the appropriate permissions. For information, see Airflow Web Interface.\n", "- Command line tools: After you install the Google Cloud CLI, you can run gcloud composer environments commands to issue Airflow command-line commands to Cloud Composer environments. For information, see [Airflow Command-line Interface][cc-access-airflow-cli].\n", "\n", "In addition to native tools, the Cloud Composer REST and RPC APIs provide programmatic access to your Airflow environments." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.13" } }, "nbformat": 4, "nbformat_minor": 4 }