AIRFLOW WORKFLOWS AND THE CLOUD

“Airflow is a platform to programmatically author, schedule, and monitor workflows”

Airflow allows you to author workflows by creating tasks in a Direct Acyclic Graph (DAG). Airflow scheduler then executes the tasks in these DAGs on a configured array of workers (executors).

At Core Compete, we use Airflow to orchestrate ETL jobs on cloud platforms like GCP and AWS. Jobs ranging from querying several tables in Google BigQuery to generate reports, to submitting multiple steps to EMR clusters are all orchestrated using airflow. It becomes important to be able to run a large number of tasks in parallel to improve efficiency of the ETL pipeline. We distribute the airflow tasks over multiple servers to scale out the pipelines, thereby increasing the time performance of our ETL processes

This article will briefly explain the airflow architecture that we use at Core Compete to achieve high scalability followed by an example dag to show how to configure relevant airflow parameters to achieve scalability.

ARCHITECTURE

Airflow provides an option to utilize CeleryExecutor to execute tasks in distributed fashion. In this mode, we can run several servers each running multiple worker nodes to execute the tasks. This mode uses Celery along with a message queueing service RabbitMQ.

For simplicity of the blog, we will demonstrate the architecture of a single node master server and a single node worker server The diagram show the interactivity between different component services i.e. Airflow, Celery and RabbitMQ.

Airflow celery executor

In this configuration, airflow executor distributes task over multiple celery workers which can run on different machines using message queuing services. This is the most scalable option since it is not limited by the resource available on the master node. We can have several worker nodes that perform execution of tasks in a distributed manner.

Celery

Celery is an asynchronous task queue. It utilizes a messsage broker to distribute tasks onto multiple celery workers from the main application.

RabbitMQ

RabbitMQ is a message broker. It provides an API to operate message queues which are used for communication between multiple services. Services can publish and subscribe to different message queues in RabbitMQ.

ENVIRONMENT SETUP

Environment prerequisites:

  • Operating system: Centos/RHEL 7
  • Python virtual environment: Python v3.5.x
  • Database: Postgres v11.2

Once you have taken care of the prerequisites, activate the virtual environment, and perform the steps below

Setting up postgres database

Connect to Postgres instance as the admin user

psql –host={DATABASE_HOST} –port={DATABASE_PORT} –username={DATABASE_ADMIN_USER} –password

Create database named airflow

CREATE DATABASE airflow;

Create database user

CREATE USER {DATABASE_USER} WITH PASSWORD ‘{DATABASE_USER_PASSWORD}’;

Grant permissions to {DATABASE_USER} on the database created

GRANT ALL PRIVILEGES ON DATABASE airflow TO {DATABASE_USER};

GRANT CONNECT ON DATABASE airflow TO {DATABASE_USER};

Setting up rabbitmq

Install rabbitmq

yum install rabbitmq-server

Install RabbitMQ Web Interface

rabbitmq-plugins enable rabbitmq_management

Enable and start the rabbitmq-server

systemctl enable rabbitmq-server

systemctl start rabbitmq-server

systemctl status rabbitmq-server

Add user and permissions

rabbitmqctl add_user {RABBITMQ_USER} {RABBITMQ_USER_PASSWORD}

rabbitmqctl set_user_tags {RABBITMQ_USER} administrator

Make a virtual host and set permissions

rabbitmqctl add_vhost {VIRTUALHOST_NAME}

rabbitmqctl set_permissions -p {VIRTUALHOST_NAME}

{RABBITMQ_USER} “.*” “.*” “.*”

Download rabbitmqadmin utility

wget http://127.0.0.1:15672/cli/rabbitmqadmin

chmod +x rabbitmqadmin

Make a Queue

./rabbitmqadmin declare queue –username={RABBITMQ_USER} –password={RABBITMQ_USER_PASSWORD} –vhost={VIRTUALHOST_NAME} name={QUEUE_NAME} durable=true

We now can access the RabbitMQ UI at port 15672

RabbitMQ UI Overview

.

RabbitMQ Queues

.

Setting up Airflow with Celery

Install Airflow packages

pip install apache-airflow[postgres,rabbitmq,celery]==1.10.3

Configure Airflow home variable

export AIRFLOW_HOME=~/airflow

Initialize Airflow database

airflow initdb

Modify configurations in {AIRFLOW_HOME}/airflow.cfg

1. Set the executor to use Celery

executor = CeleryExecutor

2. Set the connection string for backend database named “airflow”

sql_alchemy_conn = postgresql+psycopg2://{DATABASE_USER}:{DATABASE_USER_PASSWORD}@{DATABASE_HOST}:{DATABASE_PORT}/airflow

3. Set celery backend to point towards the backend database named “airflow”

result_backend = db+postgresql://{DATABASE_USER}:{DATABASE_USER_PASSWORD}@{DATABASE_HOST}:{DATABASE_PORT}/airflow

4. Set the broker url for celery to point towards the rabbitmq service

broker_url = amqp://{RABBITMQ_USER}:{RABBITMQ_USER_PASSWORD}@localhost:5672/{VIRTUALHOST_NAME}

5. Set the default queue for rabbitmq

default_queue = {QUEUE_NAME}

6. Parallelism is the max number of task instances that can run concurrently on airflow

parallelism = 32

7. Dag concurrency is the number of task instances allowed to run concurrently within a specific dag

dag_concurrency = 16

8. Work  concurrency determines how many tasks a single worker can process

worker_concurrency = 16

Start airflow

airflow initdb

nohup airflow worker > worker.out &

nohup airflow scheduler > scheduler.out &

nohup airflow webserver > webserver.out &

UNDERSTANDING THE SCALABILITY PARAMETERS

Documentation is not exhaustive in explaining the parameters like parallelism, dag_concurrency, and worker_concurrency which are crucial to achieve desired scalability.

So we’ll take an example dag and play around with the following parameters to understand what they actually mean.

parallelism

This parameter determines the maximum number of task instances that can be actively running in parallel across the entire airflow deployment. For example, if it is set to 10 there can’t be more than 10 tasks running irrespective of the number of dags. Hence, this is the maximum number of active tasks at any time.

dag_concurrency

This parameter determines the number of task instances that can be scheduled per DAG. For example, if it is set to 10 there can’t be more than 10 tasks running per dag. Also, they’ll be limited by the value of parallelism parameter. Hence, this is the maximum number of tasks that can be scheduled at once per DAG

worker_concurrency

This parameter determines the number of tasks each worker node can run at any given time. For example, if it is set to 10 then the worker node can concurrently execute 10 tasks that have been scheduled by the scheduler.

Example Dag

Let’s create an example dag which we will trigger for different values of parameters parallelism, dag_concurrency, and worker_concurrency.

         Example Dag

.

Below is the snippet of code which creates the tasks in the dag

def my_sleeping_function(random_base):

“””This is a function that will run within the DAG execution”””

time.sleep(random_base)

run_this_bash_first = BashOperator(

task_id=‘run_this_bash_first’,

bash_command=‘echo start’,

dag=dag)

run_this_bash_last = BashOperator(

task_id=‘run_this_bash_last’,

bash_command=‘echo “run_id={{ run_id }} | dag_run={{ dag_run }}”‘,

dag=dag)

for i in range(10):

task_python = PythonOperator(

task_id=‘sleep_task_’ + str(i),

python_callable=my_sleeping_function,

op_kwargs={‘random_base’: 10},

dag=dag)

run_this_bash_first.set_downstream(task_python)

task_python.set_downstream(run_this_bash_last)

.

Configuration 1

  • parallelism = 1
  • dag_concurrency = 1
  • worker_concurrency = 1

The task scheduling in this situation is limited by the parameter parallelism=1. So only one task is scheduled by the scheduler at a time. Also, worker_concurrency=1 means that the worker will execute 1 task at a time. The pattern of task scheduling in queue is shown below

.

Configuration 2

  • parallelism = 10
  • dag_concurrency = 1
  • worker_concurrency = 1

The task scheduling in this situation is limited by the parameter dag_concurrency=1. Although, airflow has the capacity to run 10 tasks at a time due to parallelism=10, however only one task per dag is scheduled by the scheduler. Also, worker_concurrency=1 means that the worker will execute 1 task at a time. The pattern of task scheduling in queue is shown below

.

Configuration 3

  • parallelism = 10
  • dag_concurrency = 10
  • worker_concurrency = 1

The task scheduling in this situation is capped at 10. So scheduler can schedule 10 tasks at a time. However, the worker_concurrency=1 means that the worker will execute only 1 task at a time. The pattern of task scheduling in queue is shown below

.

Configuration 4

  • parallelism = 10
  • dag_concurrency = 10
  • worker_concurrency = 10

The task scheduling in this situation is capped at 10. So scheduler can schedule 10 tasks at a time. Also, worker_concurrency=10 means that the worker can execute 10 task at a time. The pattern of task scheduling in queue is shown below.

Now that we know what each parameter means, we can tune the parameters to our liking, depending on the number of tasks in the ETL workflow.

CONCLUSION

This is how we achieve a scalable airflow deployment for our ETL pipelines. Hope this article helped to understand and implement Airflow in a highly scalable manner, and also shed some light on configuring the parameters which have scarce documentation available.

About the Author

Ankur Saxena, Data Engineer

Ankur is a GCP certified Professional Data Engineer who specializes in building and orchestrating ‘big data’ ETL pipelines on cloud. He is passionate about optimizing data processing using Pandas, Spark and SQL. He is an anime enthusiast and likes playing Tennis.

Related Posts