An Airflow operator and hook to interface with the dbt-core Python package.
Although dbt is meant to be installed and used as a CLI, we may not have control of the environment where Airflow is running, disallowing us the option of using dbt as a CLI.
This is exactly what happens when using Amazon's Managed Workflows for Apache Airflow or MWAA: although a list of Python requirements can be passed, the CLI cannot be found in the worker's PATH.
There is a workaround which involves using Airflow's BashOperator and running Python from the command line:
from airflow.operators.bash import BashOperator
BASH_COMMAND = "python -c 'from dbt.main import main; main()' run"
operator = BashOperator(
task_id="dbt_run",
bash_command=BASH_COMMAND,
)But it can get sloppy when appending all potential arguments a dbt run command (or other subcommand) can take.
That's where airflow-dbt-python comes in: it abstracts the complexity of interfacing with dbt-core and exposes one operator for each dbt subcommand that can be instantiated with all the corresponding arguments that the dbt CLI would take.
The alternative airflow-dbt package, by default, would not work if the dbt CLI is not in PATH, which means it would not be usable in MWAA. There is a workaround via the dbt_bin argument, which can be set to "python -c 'from dbt.main import main; main()' run", in similar fashion as the BashOperator example. Yet this approach is not without its limitations:
airflow-dbtworks by wrapping thedbtCLI, which makes our code dependent on the environment in which it runs.airflow-dbtdoes not support the full range of arguments a command can take. For example,DbtRunOperatordoes not have an attribute forfail_fast.airflow-dbtdoes not offer access todbtartifacts created during execution.airflow-dbt-pythondoes so by pushing any artifacts to XCom.
Besides running dbt as one would do if doing so manually, airflow-dbt-python also supports a few additional features to bring dbt closer to being a first-class citizen of Airflow.
The arguments profiles_dir and project_dir would normally point to a directory containing a profiles.yml file and a dbt project in the local environment respectively. airflow-dbt-python extends these arguments to also take an AWS S3 URL (identified by an s3:// scheme):
- If an S3 URL is used for
profiles_dir, then this URL must point to a directory in S3 that contains aprofiles.ymlfile. Theprofiles.ymlfile will be downloaded and made available for the operator to use when running. - If an S3 URL is used for
project_dir, then this URL must point to a directory in S3 containing all the files required for adbtproject to run. All of the contents of this directory will be downloaded and made available for the operator. The URL may also point to a zip file containing all the files of adbtproject, which will be downloaded, uncompressed, and made available for the operator.
This feature is intended to work in line with Airflow's description of the task concept:
Tasks don’t pass information to each other by default, and run entirely independently.
In our world, that means task should be responsible of fetching all the dbt related files it needs in order to run independently. This is particularly relevant for an Airflow deployment with a remote executor, as Airflow does not guarantee which worker will run a particular task.
Each dbt execution produces several JSON artifacts that may be valuable to obtain metrics, build conditional workflows, for reporting purposes, or other uses. airflow-dbt-python can push these artifacts to XCom as requested by exposing a do_xcom_push_artifacts argument, which takes a list of artifacts to push. This way, artifacts may be pulled and operated on by downstream tasks. For example:
with DAG(
dag_id="example_dbt_artifacts",
schedule_interval="0 0 * * *",
start_date=days_ago(1),
catchup=False,
dagrun_timeout=dt.timedelta(minutes=60),
) as dag:
dbt_run = DbtRunOperator(
task_id="dbt_run_daily",
project_dir="/path/to/my/dbt/project/",
profiles_dir="~/.dbt/",
select=["+tag:daily"],
exclude=["tag:deprecated"],
target="production",
profile="my-project",
full_refresh=True,
do_xcom_push_artifacts=["manifest.json", "run_results.json"],
)
process_artifacts = PythonOperator(
task_id="process_artifacts",
python_callable=process_dbt_artifacts,
provide_context=True,
)
dbt_run >> process_artifactsSee the full example here.
Currently, the following dbt commands are supported:
cleancompiledebugdepsdocs generatelsparserunrun-operationseedsnapshotsourcetest
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow_dbt_python.operators.dbt import (
DbtRunOperator,
DbtSeedOperator,
DbtTestoperator,
)
args = {
'owner': 'airflow',
}
with DAG(
dag_id='example_dbt_operator',
default_args=args,
schedule_interval='0 0 * * *',
start_date=days_ago(2),
dagrun_timeout=timedelta(minutes=60),
tags=['example', 'example2'],
) as dag:
dbt_test = DbtTestOperator(
task_id="dbt_test",
selector_name=["pre-run-tests"],
)
dbt_seed = DbtSeedOperator(
task_id="dbt_seed",
select=["/path/to/first.csv", "/path/to/second.csv"],
full_refresh=True,
)
dbt_run = DbtRunOperator(
task_id="dbt_run",
select=["/path/to/models"],
full_refresh=True,
fail_fast=True,
)
dbt_test >> dbt_seed >> dbt_runMore examples can be found in the examples/ directory.
To line up with dbt-core, airflow-dbt-python supports Python 3.7, 3.8, and 3.9. We also include Python 3.10 in our testing pipeline, although dbt-core does not yet support it.
On the Airflow side, we unit test with version 1.10.12 and the latest version 2 release.
Finally, airflow-dbt-python requires at least dbt-core version 1.0.0. Since dbt-core follows semantic versioning, we do not impose any restrictions on the minor and patch versions, but do keep in mind that the latest dbt-core features incorporated as minor releases may not yet be supported.
pip install airflow-dbt-pythonAny dbt adapters you require may be installed by specifying extras:
pip install airflow-dby-python[snowflake,postgres]Clone the repo:
git clone https://github.com/tomasfarias/airflow-dbt-python.git
cd airflow-dbt-pythonWith poetry:
poetry installInstall any extras you need, and only those you need:
poetry install -E postgres -E redshiftAdd airflow-dbt-python to your requirements.txt file and edit your Airflow environment to use this new requirements.txt file.
Tests are written using pytest, can be located in tests/, and they can be run locally with poetry:
poetry run pytest tests/ -vv