Orchestrating dbt with Airflow: A Step by Step Guide to Automating Data Pipelines — Part I

Rasiksuhail
10 min readJul 23, 2023

--

In today’s data-driven world, organizations rely heavily on automated data pipelines to process, transform, and analyze vast amounts of data. dbt (data build tool) has emerged as a powerful data transformation and modeling tool, while Apache Airflow has become the de facto standard for orchestrating complex data workflows. Combining the strengths of dbt and Airflow allows data engineers and analysts to build efficient and scalable data pipelines, ensuring timely and accurate data delivery for business insights. In this comprehensive guide, we will explore the integration of dbt with Airflow, covering practical examples, troubleshooting steps, and code snippets to help you master the art of orchestrating data pipelines effectively.

This guide will be extensive and will be covered in two blogs as Part 1 and Part 2. In that case, it will be easier to have a better understanding.

In Part 1 of this guide, following topics will be covered :

Understanding dbt and Apache Airflow
— Using Airflow with dbt

Setting Up the Environment
— Installing dbt and Airflow
— Configuring Connections and Variables

dbt and Airflow Integration
— Running dbt as an Airflow Task
— Passing Parameters to dbt Models

Creating Airflow DAGs for dbt Jobs
— Building a Simple dbt DAG
— Handling Dependencies and Triggers

Scheduling dbt Runs with Airflow
— Implementing Scheduling Strategies
— Configuring SLAs and Timeouts

Lets dive in

I. Using Airflow with dbt:

a. Centralized Management: By integrating dbt with Airflow, data engineers can manage all data pipelines, including dbt transformations, in a centralized platform.

b. Flexibility: Airflow’s flexibility allows you to orchestrate complex workflows involving dbt models and other data processing tasks seamlessly.

c. Error Handling and Monitoring: Airflow provides built-in error handling and monitoring capabilities, ensuring that any failures in dbt runs or other tasks are captured and alerted.

d. Dynamic Scheduling: With Airflow, you can dynamically schedule dbt runs based on external triggers or dependencies, enabling adaptive data pipeline automation.

II. Setting Up the Environment: Installing dbt and Airflow

a. Installing dbt:

To get started with dbt, you need to install it on your system. dbt can be installed using pip, the Python package manager. Additionally, you will need Python and a database that dbt can connect to, such as Snowflake, BigQuery, or Redshift.

Step 1: Install dbt using pip:

pip install dbt

Step 2: Verify dbt installation:

dbt --version

b. Installing Apache Airflow:

Before installing Airflow, you need to have Python and pip installed on your system.

Step 1: Install Apache Airflow using pip:

pip install apache-airflow

Step 2: Initialize the Airflow database:

airflow db init

Step 3: Start the Airflow web server and scheduler:

airflow webserver --port 8080
airflow scheduler

Now that you have dbt and Airflow installed, you can begin configuring connections and variables to interact with your data warehouse and other services

c. Configuring Connections and Variables in Airflow:

i. Configuring Connections:

Airflow uses connections to store login credentials, API tokens, and other settings required for interacting with various external systems. You can configure connections in the Airflow web UI or by using the airflow connections command-line interface.

Example:

Let’s configure a connection for connecting to a BigQuery project.

Step 1: Open the Airflow web UI: Go to http://localhost:8080 (or the address of your Airflow web server) in your web browser.

Step 2: Click on “Admin” in the top menu and select “Connections.”

Step 3: Click on the “Create” button to add a new connection.

Step 4: Fill in the connection details:

  • Conn Id: Enter a unique ID for the connection (e.g., “bigquery_conn”).
  • Conn Type: Select “Google Cloud Platform.”
  • Login: Enter your Google Cloud service account email.
  • Password: Enter your Google Cloud service account key (JSON format).

Step 5: Click on “Save” to save the connection.

Now, you have configured a connection to BigQuery, and you can use this connection to interact with BigQuery datasets and tables in your Airflow DAGs.

ii. Configuring Variables:

Airflow variables are used to store key-value pairs that can be referenced in your DAGs and tasks. They are useful for storing configuration settings or other parameters that can change over time.

Example:

Let’s configure a variable to store the name of the dataset we want to use in our dbt transformation.

Step 1: Open the Airflow web UI: Go to http://localhost:8080 (or the address of your Airflow web server) in your web browser.

Step 2: Click on “Admin” in the top menu and select “Variables.”

Step 3: Click on the “Create” button to add a new variable.

Step 4: Fill in the variable details:

  • Key: Enter a unique key for the variable (e.g., “dbt_dataset_name”).
  • Value: Enter the name of the dataset (e.g., “my_dataset”).

Step 5: Click on “Save” to save the variable.

Now, you have configured a variable named “dbt_dataset_name,” and you can use this variable in your dbt DAG to specify the dataset to be used in the transformation.

By following these steps, you have successfully set up your environment by installing dbt and Apache Airflow and configuring connections and variables.

III. dbt and Airflow Integration

a.Running dbt as an Airflow Task:

To run dbt as an Airflow task, you need to define an Airflow Operator that executes the dbt CLI command to run your dbt models. The BashOperator is commonly used to execute shell commands, including dbt commands.

Example:

Let’s create an Airflow DAG that runs a dbt model as a task.

Step 1: Define the dbt DAG in Airflow:

# Define the default arguments for the DAG
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 5, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Create the DAG with the specified schedule interval
dag = DAG('dbt_dag', default_args=default_args, schedule_interval=timedelta(days=1))
# Define the dbt run command as a BashOperator
run_dbt_model = BashOperator(
task_id='run_dbt_model',
bash_command='dbt run',
dag=dag
)

n this example, we have created an Airflow DAG named “dbt_dag” that will run dbt every day (schedule_interval=timedelta(days=1)). The run_dbt_model task uses the BashOperator to execute the dbt CLI command dbt run.

Step 2: Save the DAG in the Airflow directory:

Save the above code as a Python file (e.g., dbt_dag.py) in the Airflow DAG directory.

Step 3: Trigger the DAG in Airflow:

Once you have saved the DAG file, Airflow will automatically detect and schedule the DAG. You can trigger the DAG manually from the Airflow web UI or let it run according to the specified schedule.

b.Passing Parameters to dbt Models:

You may want to pass parameters to your dbt models to control their behavior during execution. dbt allows you to define variables in your SQL scripts and pass values to those variables when running dbt from the command line.

Example:

Let’s modify the previous DAG to pass parameters to a dbt model.

Step 1: Define the dbt DAG in Airflow with Parameters:

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 5, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('dbt_dag_with_params', default_args=default_args, schedule_interval=timedelta(days=1))# Define the dbt run command with a parameter named "dataset"
run_dbt_model_with_params = BashOperator(
task_id='run_dbt_model_with_params',
bash_command='dbt run --var dataset=my_dataset',
dag=dag
)

In this example, we have modified the DAG to use the --var option of the dbt run command to pass a parameter named "dataset" with the value "my_dataset" to the dbt model.

Step 2: Save the DAG in the Airflow directory:

Save the updated code as a Python file (e.g., dbt_dag_with_params.py) in the Airflow DAG directory.

Step 3: Trigger the DAG in Airflow:
Trigger the DAG manually from the Airflow web UI or let it run according to the specified schedule. The dbt model will run with the parameter “dataset=my_dataset.”

By running dbt as an Airflow task and passing parameters to dbt models, you can achieve a flexible and automated data pipeline that adapts to your specific data processing needs.

IV. Creating Airflow DAGs for dbt Jobs

a. Building a Simple dbt DAG:

To create an Airflow DAG for dbt jobs, we use the DAG class provided by Airflow. This allows us to define the workflow and set the schedule for running dbt tasks.

Example:

Let’s create a simple Airflow DAG that runs dbt models daily.

# Define the default arguments for the DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 5, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Create the DAG with the specified schedule interval
dag = DAG('dbt_daily_dag', default_args=default_args, schedule_interval=timedelta(days=1))
# Define dbt tasks using BashOperator
task1 = BashOperator(
task_id='dbt_task1',
bash_command='dbt run --models model1 model2',
dag=dag
)
task2 = BashOperator(
task_id='dbt_task2',
bash_command='dbt run --models model3 model4',
dag=dag
)
# Set task dependencies
task1 >> task2

In this example, we define a simple Airflow DAG named “dbt_daily_dag” with a daily schedule interval (schedule_interval=timedelta(days=1)). We create two dbt tasks using the BashOperator, each running different dbt models. The task1 runs dbt models "model1" and "model2," while task2 runs models "model3" and "model4." The >> operator is used to set task dependencies, indicating that task2 should run after task1 is successfully completed.

b. Handling Dependencies and Triggers:

Airflow allows you to set dependencies and triggers between tasks to control their execution order. Task dependencies are specified using the set_upstream and set_downstream methods, while triggers can be set using the TriggerDagRunOperator.

Example:

Let’s modify the previous DAG to demonstrate handling dependencies and triggers between dbt tasks.


from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 5, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('dbt_daily_dag', default_args=default_args, schedule_interval=timedelta(days=1))# Define dbt tasks using BashOperator
task1 = BashOperator(
task_id='dbt_task1',
bash_command='dbt run --models model1 model2',
dag=dag
)
task2 = BashOperator(
task_id='dbt_task2',
bash_command='dbt run --models model3 model4',
dag=dag
)
task3 = BashOperator(
task_id='dbt_task3',
bash_command='dbt run --models model5 model6',
dag=dag
)
# Set task dependencies
task1 >> task2
task1 >> task3
# Set a TriggerDagRunOperator to trigger the DAG when task2 completes
trigger_dag = TriggerDagRunOperator(
task_id='trigger_dbt_daily_dag',
trigger_dag_id='dbt_daily_dag',
dag=dag
)
# Set the trigger_dag task as downstream of task2
task2 >> trigger_dag

In this example, we have added a new dbt task named task3. We set the task dependencies so that task1 runs first, followed by either task2 or task3. We also introduced a TriggerDagRunOperator named trigger_dag that will trigger the DAG named "dbt_daily_dag" when task2 completes successfully. This allows us to rerun the entire DAG when specific tasks finish, providing more flexibility and control over the workflow.

By creating Airflow DAGs for dbt jobs and handling dependencies and triggers between tasks, you can build complex data workflows that efficiently orchestrate dbt transformations and other data processing tasks.

V. Scheduling dbt Runs with Airflow

a. Implementing Scheduling Strategies:

In Airflow, you can implement different scheduling strategies for running dbt tasks based on your specific data pipeline requirements. Commonly used scheduling strategies include:

Time-based Scheduling: You can schedule dbt runs at specific time intervals, such as hourly, daily, or weekly. This is useful for maintaining up-to-date data and ensuring that dbt transformations are executed at regular intervals.

Event-based Scheduling: You can trigger dbt runs based on external events, such as the completion of upstream tasks or the arrival of new data in your data warehouse. This allows for more dynamic and flexible data pipeline scheduling.

Dependency-based Scheduling: Airflow allows you to define dependencies between dbt tasks and other tasks in your workflow. This ensures that dbt runs are executed in the correct order, based on the success or failure of preceding tasks.

Example:

Let’s implement a time-based scheduling strategy to run dbt tasks daily at midnight.

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 5, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('dbt_daily_schedule', default_args=default_args, schedule_interval=timedelta(days=1))# Define dbt tasks using BashOperator
dbt_run_task = BashOperator(
task_id='dbt_run_task',
bash_command='dbt run',
dag=dag
)

In this example, we have created an Airflow DAG named “dbt_daily_schedule” with a schedule interval of one day (schedule_interval=timedelta(days=1)). The dbt_run_task uses the BashOperator to execute the dbt CLI command dbt run. This task will run dbt transformations daily at midnight.

b. Configuring SLAs and Timeouts:

Airflow allows you to set Service Level Agreements (SLAs) and timeouts for dbt tasks to monitor their execution and ensure timely completion.

SLAs: SLAs define the maximum allowed duration for a task to complete successfully. If a task exceeds its SLA, Airflow will trigger an alert to notify stakeholders. This helps you identify and address performance issues in your data pipelines.

Example:

To set an SLA for the dbt run task to complete within one hour, you can add the sla=timedelta(hours=1) parameter to the BashOperator.

dbt_run_task = BashOperator(
task_id='dbt_run_task',
bash_command='dbt run',
dag=dag,
sla=timedelta(hours=1)
)

Timeouts: Timeouts define the maximum duration for which a task is allowed to run. If a task exceeds its timeout, Airflow will automatically fail the task. This prevents tasks from running indefinitely and ensures better resource management.

Example:

To set a timeout of 30 minutes for the dbt run task, you can add the execution_timeout=timedelta(minutes=30) parameter to the BashOperator.

dbt_run_task = BashOperator(
task_id='dbt_run_task',
bash_command='dbt run',
dag=dag,
execution_timeout=timedelta(minutes=30)
)

By implementing scheduling strategies and configuring SLAs and timeouts, you can effectively manage and monitor dbt runs in your data pipeline. This ensures that dbt transformations are executed in a timely manner, helping you maintain data freshness and reliability in your analytics and reporting processes.

Incorporating these dbt concepts and Airflow orchestration enables teams to build robust data pipelines, adhere to data governance principles, and deliver high-quality insights to stakeholders.

Happy Integration of dbt with Airflow !

Explore my other articles on dbt series:

Thanks for Reading !

I post about Data , AI , Startups , Leadership, Writing & Culture.

Stay tuned for the next blog.

--

--

Responses (3)