What are Airflow Filters? Your Quick Start Guide

What are Airflow Filters? Your Quick Start Guide

Airflow Filters are powerful tools that allow you to dynamically generate parameter values for your tasks at runtime. They enhance flexibility and reusability in your workflows.

Key Concepts

  • Jinja Templating: Airflow uses Jinja2 templating engine, allowing you to embed expressions within your DAG definition.
  • Context Variables: Airflow provides a set of predefined context variables accessible within Jinja templates, such as ds (date stamp), execution_date, dag_run, and task_instance.
  • Custom Filters: You can define your own custom filters to perform specific data transformations.

Commonly Used Filters

  • ds_add(days): Adds or subtracts days from a date string.
  • ds_format(input_format, output_format): Formats a date string from one format to another.
  • tojson: Converts a Python object to a JSON string.

Example Usage

Suppose you want to pass the execution date to a task. You can use the {{ ds }} context variable directly.


BashOperator(

What are Airflow Filters? Your Quick Start Guide

task_id='print_date',

bash_command='echo {{ ds }}',

dag=dag,

To add one day to the execution date:


BashOperator(

What are Airflow Filters? Your Quick Start Guide

task_id='print_tomorrow',

bash_command='echo {{ ds_add(ds, 1) }}',

dag=dag,

To format the execution date:


BashOperator(

What are Airflow Filters? Your Quick Start Guide

task_id='print_formatted_date',

bash_command='echo {{ ds_format(ds, "%Y-%m-%d", "%m/%d/%Y") }}',

dag=dag,

Creating Custom Filters

To define a custom filter:

  1. Define a Python function for the filter.
  2. Register the function as a filter in your Airflow configuration.

Example:

What are Airflow Filters? Your Quick Start Guide

def reverse_string(s):

return s[::-1]

# In your DAG file or a separate module:

from * import Variable

from * import apply_defaults

What are Airflow Filters? Your Quick Start Guide

from jinja2 import Environment

def get_env(self, context):

env = Environment(extensions=['*'])

*['reverse_string'] = reverse_string

return env

What are Airflow Filters? Your Quick Start Guide

#Use the filter inside a task

BashOperator(

task_id='reverse_string_task',

bash_command='echo {{ "hello" reverse_string }}',

dag=dag

What are Airflow Filters? Your Quick Start Guide

Benefits

  • Dynamic Configuration: Adapt task parameters based on the execution context.
  • Reusability: Create reusable DAGs that can handle different data or configurations.
  • Simplified Maintenance: Reduce hardcoding and improve the maintainability of your workflows.

Airflow filters significantly improve the flexibility and power of your data pipelines by enabling dynamic parameterization and customization.