Orchestrating dbt with Airflow: A Step by Step Guide to Automating Data Pipelines — Part II

Rasiksuhail
10 min readJul 28, 2023

--

In today’s data-centric landscape, organizations heavily rely on automated data pipelines to manage vast data volumes. dbt (data build tool) and Apache Airflow have emerged as powerful tools for data transformation and workflow orchestration. By combining dbt and Airflow, data engineers and analysts can build scalable pipelines, ensuring timely data delivery for valuable insights.

This guide provides practical examples, troubleshooting tips, and code snippets to help you excel in orchestrating data pipelines efficiently.

This guide will be eloborateand will be covered in two blogs as Part 1 and Part 2. In that case, it will be easier to have a better understanding.

Link to Part I of the guide:

In Part II of this guide, following topics will be covered :

Advanced Techniques with dbt and Airflow
— Running Parallel dbt Jobs
— Parameterized DAGs for Dynamic Workflows

Error Handling and Troubleshooting
— Handling dbt Failures in Airflow
— Setting Up Error Notifications

Monitoring and Logging
— Logging dbt Output in Airflow
— Monitoring dbt Runs with Airflow UI

Integrating dbt Tests with Airflow
— Automating dbt Testing with Airflow
— Handling Test Failures and Alerts

Best Practices and Optimization
— Designing Efficient Airflow DAGs for dbt
— Improving Performance and Scalability

Lets dig in

Advanced Techniques with dbt and Airflow

a. Running Parallel dbt Jobs:

When dealing with large datasets or complex transformations, running dbt jobs in parallel can significantly speed up data processing. With Apache Airflow, we can easily execute multiple dbt tasks concurrently, optimizing overall workflow efficiency.

Example:

Let’s create an Airflow DAG that runs multiple dbt tasks in parallel using the TriggerDagRunOperator.

#
from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 5, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('parallel_dbt_dag', default_args=default_args, schedule_interval=timedelta(days=1))# Define dbt tasks using TriggerDagRunOperator
task1 = TriggerDagRunOperator(
task_id='trigger_dbt_job1',
trigger_dag_id='dbt_daily_dag',
dag=dag
)
task2 = TriggerDagRunOperator(
task_id='trigger_dbt_job2',
trigger_dag_id='dbt_daily_dag',
dag=dag
)
task3 = TriggerDagRunOperator(
task_id='trigger_dbt_job3',
trigger_dag_id='dbt_daily_dag',
dag=dag
)
# Set task dependencies
task1 >> [task2, task3]

In this example, we create an Airflow DAG named “parallel_dbt_dag” with a daily schedule. The TriggerDagRunOperator is used to trigger the "dbt_daily_dag" DAG, which contains dbt tasks for daily data transformations. By setting dependencies, we run task1 first, and then task2 and task3 in parallel.

b. Parameterized DAGs for Dynamic Workflows:

Parameterizing DAGs in Airflow allows us to create dynamic workflows that adapt based on external inputs or conditions. With parameterized DAGs, we can create reusable and flexible data pipelines, accommodating various scenarios.

Example:

Let’s create a parameterized DAG for dbt that accepts different models to run based on the execution date.

#def dbt_run_command(model):
return f'dbt run --models {model}'
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 5, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('parameterized_dbt_dag', default_args=default_args, schedule_interval=timedelta(days=1))# Define dbt tasks using BashOperator
task1 = BashOperator(
task_id='dbt_task1',
bash_command=dbt_run_command('model1'),
dag=dag
)
task2 = BashOperator(
task_id='dbt_task2',
bash_command=dbt_run_command('model2'),
dag=dag
)
# Set task dependencies
task1 >> task2

In this example, we define a function dbt_run_command that takes a model parameter and constructs the dbt run command accordingly. The DAG named "parameterized_dbt_dag" runs dbt tasks for "model1" and "model2" on a daily schedule. By passing different models as parameters, we can easily customize the DAG to run specific dbt models dynamically.

By incorporating parallel dbt jobs and parameterized DAGs in your data engineering workflows, you can achieve greater scalability, flexibility, and efficiency in your data transformations.

Advanced techniques enhance your data pipeline’s capabilities, enabling you to tackle complex data scenarios with ease and confidence.

Error Handling and Troubleshooting in dbt and Airflow

a. Handling dbt Failures in Airflow:

When working with data pipelines, it’s essential to handle failures gracefully to ensure data accuracy and avoid potential data inconsistencies. Apache Airflow provides built-in mechanisms to manage dbt job failures effectively.

Example:

Let’s modify the previous example of running dbt tasks in parallel to include failure handling using the on_failure_callback parameter.

def on_failure_callback(context):
# This function will be called when any task in the DAG fails
task_instance = context['task_instance']
task_log_url = task_instance.log_url
# You can add custom error notification logic here
# For example, sending an email or a notification to a team chat
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 5, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
'on_failure_callback': on_failure_callback,
}
dag = DAG('parallel_dbt_dag', default_args=default_args, schedule_interval=timedelta(days=1))# Define dbt tasks using TriggerDagRunOperator
task1 = TriggerDagRunOperator(
task_id='trigger_dbt_job1',
trigger_dag_id='dbt_daily_dag',
dag=dag
)
task2 = TriggerDagRunOperator(
task_id='trigger_dbt_job2',
trigger_dag_id='dbt_daily_dag',
dag=dag
)
task3 = TriggerDagRunOperator(
task_id='trigger_dbt_job3',
trigger_dag_id='dbt_daily_dag',
dag=dag
)
# Set task dependencies
task1 >> [task2, task3]

In this example, we have added an on_failure_callback function that gets triggered whenever any task in the DAG fails. Inside this function, you can define custom error notification logic, such as sending an email or a notification to a team chat, to alert stakeholders about the failure. This helps in proactive troubleshooting and prompt resolution of any issues in the data pipeline.

b. Setting Up Error Notifications:

Airflow provides multiple ways to set up error notifications, allowing you to be informed immediately when an error occurs during data processing.

Example:

Let’s configure Airflow to send email notifications when a dbt task fails.

#
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.email_operator import EmailOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 5, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
'email': ['your.email@example.com'], # Add your email address here
'email_on_failure': True,
}
dag = DAG('dbt_email_notifications', default_args=default_args, schedule_interval=timedelta(days=1))# Define dbt tasks using BashOperator
task1 = BashOperator(
task_id='dbt_task1',
bash_command='dbt run --models model1 model2',
dag=dag
)
# Email notification task
notify_on_failure = EmailOperator(
task_id='notify_on_failure',
to='your.email@example.com', # Add your email address here
subject='dbt task failed!',
html_content='One or more dbt tasks failed. Please check the Airflow logs for more details.',
dag=dag
)
# Set task dependencies
task1 >> notify_on_failure

In this example, we set the email and email_on_failure parameters in the default_args dictionary. The email parameter specifies the recipient's email address for notifications, while email_on_failure=True ensures that Airflow sends an email notification when any task in the DAG fails. Additionally, we include an EmailOperator named notify_on_failure that sends an email when the dbt task fails.

By effectively handling dbt failures in Airflow and setting up error notifications, you can quickly identify and resolve issues in your data pipeline, ensuring data quality and consistency.

These practices empower data engineers to maintain a robust and reliable data infrastructure, allowing for smoother data transformations and enhanced data-driven insights.

Monitoring and Logging in dbt and Airflow

a. Logging dbt Output in Airflow:

Logging is crucial for understanding the execution of data pipelines and identifying potential issues. In Airflow, you can log dbt output to view detailed information about dbt runs directly from the Airflow UI.

Example:

Let’s modify the previous example of running dbt tasks in parallel to log dbt output in Airflow.

#
from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import subprocess
def run_dbt_task(**kwargs):
model = kwargs['model']
command = f'dbt run --models {model}'
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
output, error = process.communicate()
print(output.decode('utf-8'))
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 5, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('parallel_dbt_dag', default_args=default_args, schedule_interval=timedelta(days=1))# Define dbt tasks using PythonOperator
task1 = PythonOperator(
task_id='dbt_task1',
python_callable=run_dbt_task,
op_args=[{'model': 'model1'}],
dag=dag
)
task2 = PythonOperator(
task_id='dbt_task2',
python_callable=run_dbt_task,
op_args=[{'model': 'model2'}],
dag=dag
)
task3 = PythonOperator(
task_id='dbt_task3',
python_callable=run_dbt_task,
op_args=[{'model': 'model3'}],
dag=dag
)
# Set task dependencies
task1 >> [task2, task3]

In this example, we use the PythonOperator to execute dbt commands and log the output in the Airflow UI. The run_dbt_task function runs the dbt command using the subprocess module and captures the output, which is then printed to the Airflow logs.

b. Monitoring dbt Runs with Airflow UI:

Airflow provides a user-friendly web-based interface for monitoring and managing workflows. You can leverage this interface to monitor the status and progress of dbt runs, making it easier to detect errors or performance issues.

Example:

After running the dbt tasks using the previous example, you can navigate to the Airflow UI to monitor the status and logs of the dbt runs. The Airflow UI displays the details of each task execution, including the task’s start time, end time, duration, and logs.

To access the Airflow UI, open your web browser and enter the Airflow web server URL. By default, the Airflow UI can be accessed at http://localhost:8080.

In the Airflow UI, go to the “DAGs” page, find your dbt DAG, and click on it to view its details. You can see the individual tasks in the DAG and their execution status. Click on any task to view its log output, which includes the dbt run details and any printed log statements from the run_dbt_task function.

By leveraging the Airflow UI for monitoring, you gain real-time insights into the progress of dbt runs and can quickly identify any issues that may arise during data processing. This proactive monitoring ensures data pipeline reliability and allows for timely troubleshooting and optimization.

Integrating dbt Tests with Airflow

Integrating dbt tests with Airflow enables us to automate data testing and validation as part of our data pipeline. This ensures data quality and consistency and helps catch any issues or anomalies in the data early in the process.

a. Automating dbt Testing with Airflow

To automate dbt testing in Airflow, we can create a separate DAG specifically for running dbt tests. This DAG will trigger after the dbt transformation DAG completes successfully. We can use the SubDagOperator in Airflow to encapsulate the dbt tests as a sub-DAG.

Example:

Let’s create a separate DAG for dbt tests using the SubDagOperator.

# Define a function to create the dbt test DAG
from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
from datetime import datetime, timedelta
def create_dbt_test_dag(parent_dag_name, child_dag_name, args):
dbt_test_dag = DAG(
dag_id=f'{parent_dag_name}.{child_dag_name}',
default_args=args,
schedule_interval="@daily",
)
# Define dbt test tasks using PythonOperator
dbt_test_task1 = PythonOperator(
task_id='dbt_test_model1',
python_callable=run_dbt_test,
op_args=['model1'],
dag=dbt_test_dag
)
dbt_test_task2 = PythonOperator(
task_id='dbt_test_model2',
python_callable=run_dbt_test,
op_args=['model2'],
dag=dbt_test_dag
)
# Set task dependencies
dbt_test_task1 >> dbt_test_task2
return dbt_test_dagdefault_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 5, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('dbt_transformation_dag', default_args=default_args, schedule_interval=timedelta(days=1))# Define dbt transformation tasks using BashOperator
dbt_task1 = BashOperator(
task_id='dbt_task1',
bash_command='dbt run --models model1',
dag=dag
)
dbt_task2 = BashOperator(
task_id='dbt_task2',
bash_command='dbt run --models model2',
dag=dag
)
# Define the dbt test sub-DAG
dbt_test_subdag = SubDagOperator(
task_id='dbt_test_subdag',
subdag=create_dbt_test_dag('dbt_transformation_dag', 'dbt_test_subdag', default_args),
dag=dag,
)
# Set task dependencies
dbt_task1 >> dbt_task2 >> dbt_test_subdag

In this example, we create a dbt test sub-DAG named “dbt_test_subdag” that runs after the dbt transformation DAG completes successfully. The create_dbt_test_dag function defines the dbt test DAG, which includes PythonOperators for running dbt tests on "model1" and "model2". We set task dependencies to ensure that dbt tests are executed in order.

b. Handling Test Failures and Alerts

Handling test failures and setting up alerts is crucial to ensure timely notification of any data quality issues. We can use Airflow’s on_failure_callback parameter to specify a function that will be triggered when a task fails. In this function, we can implement custom logic to send notifications or alerts, such as emails or Slack messages, to alert stakeholders about the test failures.

def on_failure_callback_test(context):
task_instance = context['task_instance']
test_name = task_instance.task_id
log_url = task_instance.log_url
alert_message = f"The dbt test '{test_name}' failed. Check the logs at {log_url} for more details."
# Implement custom alerting logic here, e.g., send email or Slack notification
send_alert_email(alert_message)

Integrating dbt tests with Airflow and automating the testing process not only saves time but also ensures that data quality is maintained throughout the data pipeline. Handling test failures with alerts enables proactive data monitoring and quality assurance, leading to reliable data-driven decision-making and business insights.

By setting up alerts for test failures, data engineers and analysts can be promptly informed of any data quality issues, allowing them to take immediate action and ensure data accuracy and reliability.

Best Practices and Optimization

a. Designing Efficient Airflow DAGs for dbt:

When designing Airflow DAGs for dbt, it’s essential to follow best practices to ensure efficiency and maintainability. Some key considerations include:

  • DAG Structure: Organize your DAGs logically, with clear separation of tasks and dependencies. Use the subdag feature for complex tasks to keep your main DAGs concise.
  • Task Parallelism: Use parallelism to execute tasks concurrently, especially when running multiple dbt models or tests. This can significantly improve overall DAG execution time.
  • Dynamic Task Generation: Leverage Airflow’s ability to generate tasks dynamically based on external configurations. For example, you can generate dbt tasks based on models defined in a configuration file.
  • Task Scheduling: Set appropriate schedule_interval for your DAGs based on the data update frequency. Avoid unnecessary frequent runs to optimize resource utilization.

b. Improving Performance and Scalability:

As your data infrastructure and dbt projects grow, optimizing performance and scalability becomes crucial. Consider the following strategies:

  • Resource Allocation: Assign adequate resources to your Airflow and dbt processes, such as CPU, memory, and parallelism, to handle the data processing load efficiently.
  • Using Caching: Utilize dbt’s caching feature to avoid re-computing results for unchanged data. This reduces processing time and minimizes resource consumption.
  • Airflow Worker Scaling: Scale Airflow workers horizontally to handle a larger number of DAG runs concurrently.
  • Database Optimization: Optimize your data warehouse database for query performance. Use indexes, partitions, and clustering to speed up dbt model runs.
  • Incremental Processing: Leverage incremental processing in dbt to process only the changed data, reducing overall execution time.

Example: Implementing Caching in dbt

{{
config(
materialized='incremental',
unique_key='id',
cache=1, # Enable caching for the model
incremental_strategy='check',
updated_at='last_modified'
)
}}
SELECT
id,
name,
age
FROM
raw_data_table

In this example, we use the cache=1 parameter in the dbt model configuration to enable caching. When this model runs, dbt will store the results in the cache. On subsequent runs, if the underlying data hasn't changed, dbt will use the cached results instead of re-executing the entire model, leading to faster execution.

By following these best practices and optimization techniques, you can ensure that your dbt and Airflow workflows perform efficiently, scale to handle increasing data volumes, and provide timely and accurate insights to your organization. Optimized data pipelines enhance the overall data engineering process and contribute to data-driven decision-making and analytics.

By integrating these advanced dbt concepts with Airflow orchestration, teams can construct resilient data pipelines, maintain strict data governance standards, and provide valuable and reliable insights to stakeholders. This powerful combination empowers data engineers to optimize their workflows, ensure data quality, and drive data-driven decision-making across the organization.

Happy Integration of dbt with Airflow !

Explore my other articles on dbt series:

Thanks for Reading !

I post about Data , AI , Startups , Leadership, Writing & Culture.

Stay tuned for the next blog.

--

--

Responses (3)