Skip to main content

Airflow and dbt Cloud

Updated
dbt Cloud
Orchestration
Intermediate
Menu

    Introduction

    In some cases, Airflow may be the preferred orchestrator for your organization over working fully within dbt Cloud. There are a few reasons your team might be considering using Airflow to orchestrate your dbt jobs:

    • Your team is already using Airflow to orchestrate other processes
    • Your team needs to ensure that a dbt job kicks off before or after another process outside of dbt Cloud
    • Your team needs flexibility to manage more complex scheduling, such as kicking off one dbt job only after another has completed
    • Your team wants to own their own orchestration solution
    • You need code to work right now without starting from scratch

    Prerequisites

    Airflow + dbt Core

    There are so many great examples from GitLab through their open source data engineering work. This is especially appropriate if you are well-versed in Kubernetes, CI/CD, and docker task management when building your airflow pipelines. If this is you and your team, you’re in good hands reading through more details here and here.

    Airflow + dbt Cloud API w/Custom Scripts

    This has served as a bridge until the fabled Astronomer + dbt Labs-built dbt Cloud provider became generally available here.

    There are many different permutations of this over time:

    These solutions are great, but can be difficult to trust as your team grows and management for things like: testing, job definitions, secrets, and pipelines increase past your team’s capacity. Roles become blurry (or were never clearly defined at the start!). Both data and analytics engineers start digging through custom logging within each other’s workflows to make heads or tails of where and what the issue really is. Not to mention that when the issue is found, it can be even harder to decide on the best path forward for safely implementing fixes. This complex workflow and unclear delineation on process management results in a lot of misunderstandings and wasted time just trying to get the process to work smoothly!

    In this guide, you'll learn how to:

    1. Creating a working local Airflow environment
    2. Invoking a dbt Cloud job with Airflow (with proof!)
    3. Reusing tested and trusted Airflow code for your specific use cases

    You’ll also gain a better understanding of how this will:

    • Reduce the cognitive load when building and maintaining pipelines
    • Avoid dependency hell (think: python -m pip install conflicts)
    • Implement better recoveries from failures
    • Define clearer workflows so that data and analytics engineers work better, together ♥️

    🙌 Let’s get started! 🙌

    Install the Astro CLI

    Astro is a managed software service that includes key features for teams working with Airflow. In order to use Astro, we’ll install the Astro CLI, which will give us access to useful commands for working with Airflow locally. You can read more about Astro here.

    In this example, we’re using Homebrew to install Astro CLI. Follow the instructions to install the Astro CLI for your own operating system here.

    brew install astro

    Install and start Docker Desktop

    Docker allows us to spin up an environment with all the apps and dependencies we need for the example.

    Follow the instructions here to install Docker desktop for your own operating system. Once Docker is installed, ensure you have it up and running for the next steps.

    Clone the airflow-dbt-cloud repository

    Open your terminal and clone the airflow-dbt-cloud repository. This contains example Airflow DAGs that you’ll use to orchestrate your dbt Cloud job. Once cloned, navigate into the airflow-dbt-cloud project.

    git clone https://github.com/sungchun12/airflow-dbt-cloud.git
    cd airflow-dbt-cloud

    Start the Docker container

    You can initialize an Astronomer project in an empty local directory using a Docker container, and then run your project locally using the start command.

    1. Run the following commands to initialize your project and start your local Airflow deployment:

      astro dev init
      astro dev start

      When this finishes, you should see a message similar to the following:

      Airflow is starting up! This might take a few minutes…

      Project is running! All components are now available.

      Airflow Webserver: http://localhost:8080
      Postgres Database: localhost:5432/postgres
      The default Airflow UI credentials are: admin:admin
      The default Postrgres DB credentials are: postgres:postgres
    2. Open the Airflow interface. Launch your web browser and navigate to the address for the Airflow Webserver from your output in Step 1.

      This will take you to your local instance of Airflow. You’ll need to log in with the default credentials:

      • Username: admin
      • Password: admin

      Airflow login screen

    Create a dbt Cloud service token

    Create a service token from within dbt Cloud using the instructions found here. Ensure that you save a copy of the token, as you won’t be able to access this later. In this example we use Account Admin, but you can also use Job Admin instead for token permissions.

    Create a dbt Cloud job

    In your dbt Cloud account create a job, paying special attention to the information in the bullets below. Additional information for creating a dbt Cloud job can be found here.

    • Configure the job with the commands that you want to include when this job kicks off, as Airflow will be referring to the job’s configurations for this rather than being explicitly coded in the Airflow DAG. This job will run a set of commands rather than a single command.
    • Ensure that the schedule is turned off since we’ll be using Airflow to kick things off.
    • Once you hit save on the job, make sure you copy the URL and save it for referencing later. The url will look similar to this:
    https://cloud.getdbt.com/#/accounts/{account_id}/projects/{project_id}/jobs/{job_id}/

    Add your dbt Cloud API token as a secure connection

    Now you have all the working pieces to get up and running with Airflow + dbt Cloud. Let’s dive into make this all work together. We will set up a connection and run a DAG in Airflow that kicks off a dbt Cloud job.

    1. Navigate to Admin and click on Connections

      Airflow connections menu

    2. Click on the + sign to add a new connection, then click on the drop down to search for the dbt Cloud Connection Type

      Create connection

      Connection type

    3. Add in your connection details and your default dbt Cloud account id. This is found in your dbt Cloud URL after the accounts route section (/accounts/{YOUR_ACCOUNT_ID}), for example the account with id 16173 would see this in their URL: https://cloud.getdbt.com/#/accounts/16173/projects/36467/jobs/65767/

    https://lh3.googleusercontent.com/sRxe5xbv_LYhIKblc7eiY7AmByr1OibOac2_fIe54rpU3TBGwjMpdi_j0EPEFzM1_gNQXry7Jsm8aVw9wQBSNs1I6Cyzpvijaj0VGwSnmVf3OEV8Hv5EPOQHrwQgK2RhNBdyBxN2

    Add your job_id and account_id config details to the python file

    Add your job_id and account_id config details to the python file: dbt_cloud_provider_eltml.py.

    1. You’ll find these details within the dbt Cloud job URL, see the comments in the code snippet below for an example.

      # dbt Cloud Job URL: https://cloud.getdbt.com/#/accounts/16173/projects/36467/jobs/65767/
      # account_id: 16173
      #job_id: 65767

      # line 28
      default_args={"dbt_cloud_conn_id": "dbt_cloud", "account_id": 16173},

      trigger_dbt_cloud_job_run = DbtCloudRunJobOperator(
      task_id="trigger_dbt_cloud_job_run",
      job_id=65767, # line 39
      check_interval=10,
      timeout=300,
      )
    2. Turn on the DAG and verify the job succeeded after running. Note: screenshots taken from different job runs, but the user experience is consistent.

      https://lh6.googleusercontent.com/p8AqQRy0UGVLjDGPmcuGYmQ_BRodyL0Zis-eQgSmp69EHbKW51o4S-bCl1fXHlOmwpYEBxD0A-O1Q1hwt-VDVMO1wWH-AIeaoelBx06JXRJ0m1OcHaPpFKH0xDiduIhNlQhhbLiy

      Airflow DAG

      Task run instance

      https://lh6.googleusercontent.com/S9QdGhLAdioZ3x634CChugsJRiSVtTTd5CTXbRL8ADA6nSbAlNn4zV0jb3aC946c8SGi9FRTfyTFXqjcM-EBrJNK5hQ0HHAsR5Fj7NbdGoUfBI7xFmgeoPqnoYpjyZzRZlXkjtxS

    How do I rerun the dbt Cloud job and downstream tasks in my pipeline?

    If you have worked with dbt Cloud before, you have likely encountered cases where a job fails. In those cases, you have likely logged into dbt Cloud, investigated the error, and then manually restarted the job.

    This section of the guide will show you how to restart the job directly from Airflow. This will specifically run just the trigger_dbt_cloud_job_run and downstream tasks of the Airflow DAG and not the entire DAG. If only the transformation step fails, you don’t need to re-run the extract and load processes. Let’s jump into how to do that in Airflow.

    1. Click on the task

      Task DAG view

    2. Clear the task instance

      Clear task instance

      Approve clearing

    3. Watch it rerun in real time

      Re-run

    Cleaning up

    At the end of this guide, make sure you shut down your docker container. When you’re done using Airflow, use the following command to stop the container:

    $ astrocloud dev stop

    [+] Running 3/3
    ⠿ Container airflow-dbt-cloud_e3fe3c-webserver-1 Stopped 7.5s
    ⠿ Container airflow-dbt-cloud_e3fe3c-scheduler-1 Stopped 3.3s
    ⠿ Container airflow-dbt-cloud_e3fe3c-postgres-1 Stopped 0.3s

    To verify that the deployment has stopped, use the following command:

    astrocloud dev ps

    This should give you an output like this:

    Name                                    State   Ports
    airflow-dbt-cloud_e3fe3c-webserver-1 exited
    airflow-dbt-cloud_e3fe3c-scheduler-1 exited
    airflow-dbt-cloud_e3fe3c-postgres-1 exited

    Frequently asked questions

    How can we run specific subsections of the dbt DAG in Airflow?

    Because of the way we configured the dbt Cloud job to run in Airflow, you can leave this job to your analytics engineers to define in the job configurations from dbt Cloud. If, for example, we need to run hourly-tagged models every hour and daily-tagged models daily, we can create jobs like Hourly Run or Daily Run and utilize the commands dbt run -s tag:hourly and dbt run -s tag:daily within each, respectively. We only need to grab our dbt Cloud account and job id, configure it in an Airflow DAG with the code provided, and then we can be on your way. See more node selection options: here

    How can I re-run models from the point of failure?

    You may want to parse the dbt DAG in Airflow to get the benefit of re-running from the point of failure. However, when you have hundreds of models in your DAG expanded out, it becomes useless for diagnosis and rerunning due to the overhead that comes along with creating an expansive Airflow DAG.

    You can’t re-run from failure natively in dbt Cloud today (feature coming!), but you can use a custom rerun parser.

    Using a simple python script coupled with the dbt Cloud provider, you can:

    • Avoid managing artifacts in a separate storage bucket(dbt Cloud does this for you)
    • Avoid building your own parsing logic
    • Get clear logs on what models you're rerunning in dbt Cloud (without hard coding step override commands)

    Watch the video below to see how it works!

    Should Airflow run one big dbt job or many dbt jobs?

    Overall we recommend being as purposeful and minimalistic as you can. This is because dbt manages all of the dependencies between models and the orchestration of running those dependencies in order, which in turn has benefits in terms of warehouse processing efforts.

    We want to kick off our dbt jobs after our ingestion tool (such as Fivetran) / data pipelines are done loading data. Any best practices around that?

    Our friends at Astronomer answer this question with this example: here

    How do you set up a CI/CD workflow with Airflow?

    Check out these two resources for accomplishing your own CI/CD pipeline:

    Can dbt dynamically create tasks in the DAG like Airflow can?

    We prefer to keep models bundled vs. unbundled. You can go this route, but if you have hundreds of dbt models, it’s more effective to let the dbt Cloud job handle the models and dependencies. Bundling provides the solution to clear observability when things go wrong - we've seen more success in having the ability to clearly see issues in a bundled dbt Cloud job than combing through the nodes of an expansive Airflow DAG. If you still have a use case for this level of control though, our friends at Astronomer answer this question here!

    Can you trigger notifications if a dbt job fails with Airflow? Is there any way to access the status of the dbt Job to do that?

    Yes, either through Airflow's email/slack functionality by itself or combined with dbt Cloud's notifications, which support email and slack notifications.

    Are there decision criteria for how to best work with dbt Cloud and airflow?

    Check out this deep dive into planning your dbt Cloud + Airflow implementation here!

    0