Event-Driven Airflow: Using Datasets for Smarter Scheduling

Key Takeaways
  • Data-Aware Scheduling with Datasets: Airflow 2.4 introduced Datasets, enabling DAGs to be triggered by data updates instead of fixed intervals. This eliminates rigid time-based execution and replaces it with dynamic, event-driven orchestration.
  • Simplified Dependency Management & UI Visibility: Producer-consumer DAG relationships let you build flexible pipelines, and Airflow’s UI now offers tabs to monitor Dataset Events, visualize dependency graphs, and manage datasets centrally.

In Apache Airflow, scheduling workflows has traditionally been managed using the schedule parameter, which accepts definitions such as datetime objects or cron expressions to establish time-based intervals for DAG (Directed Acyclic Graph) executions. Airflow was already a powerful scheduler but became even more efficient after Airflow 2.4 when it introduced Datasets into scheduling. This feature enables data-driven DAG execution by allowing workflows to be triggered by other DAG completions rather than relying on predetermined time intervals.  This means you can write a DAG that updates a file and once that DAG completes it will trigger another DAG.

In this article, we'll dive into the concept of Airflow Datasets, explore their impact on workflow orchestration, and provide a step-by-step guide to schedule your DAGs using Datasets!

Jump to Tutorial

Understanding Airflow Scheduling (Pre-Datasets)

Before Airflow 2.4, DAG scheduling in Airflow was primarily time-based, relying on parameters like schedule and start_date to define execution times. With this set up there were three ways to schedule your DAGs: Cron, presets, or timedelta objects. Let's examine each one.

  • Cron Expressions: These expressions allowed precise scheduling. For example, to run a DAG daily at 4:05 AM, you would set schedule='5 4 * * *'.  
  • Presets: Airflow provided string presets for common intervals:  
    • @hourly: Runs the DAG at the beginning of every hour.  
    • @daily: Runs the DAG at midnight every day.  
    • @weekly: Runs the DAG at midnight on the first day of the week.  
    • @monthly: Runs the DAG at midnight on the first day of the month.  
    • @yearly: Runs the DAG at midnight on January 1st.  
  • Timedelta Objects: For intervals not easily expressed with cron, a timedelta object could be used. For instance, schedule=timedelta(hours=6) would schedule the DAG every six hours.  

Limitations of Time-Based Scheduling

While effective for most complex jobs, time-based scheduling had some limitations:  

Fixed Timing: DAGs ran at predetermined times, regardless of data readiness (this is the key to Datasets). If data wasn't available at the scheduled time, tasks could fail or process incomplete data.  

Sensors and Polling: To handle data dependencies, sensors were employed to wait for data availability. However, sensors often relied on continuous polling, which could be resource-intensive and lead to inefficiencies.  

Airflow Datasets were created to overcome these scheduling limitations.

Intro to Airflow Datasets

When using Datasets in Airflow, you'll typically work with two types of DAGs: Producer and Consumer DAGs.

What is a Producer DAG?

A DAG responsible for defining and "updating" a specific Dataset. We say "updating" because Airflow considers a Dataset "updated" simply when a task that lists it in its outlets completes successfully — regardless of whether the data was truly modified.

A Producer DAG:
✅ Must have the Dataset variable defined or imported
✅ Must include a task with the outlets parameter set to that Dataset

What is a Consumer DAG?

A DAG that is scheduled to execute once the Producer DAG successfully completes.  

A Consumer DAG:
✅ Must reference the same Dataset using the schedule parameter

It’s this producer-consumer relationship that enables event-driven scheduling in Airflow — allowing workflows to run as soon as the data they're dependent on is ready, without relying on fixed time intervals.

What are Airflow Datasets?

A Dataset is a way to represent a specific set of data. Think of it as a label or reference to a particular data resource. This can be anything: a csv file, an s3 bucket or SQL table. A Dataset is defined by passing a string path to the Dataset() object. This path acts as an identifier — it doesn't have to be a real file or URL, but it should be consistent, unique, and ideally in ASCII format (plain English letters, numbers, slashes, underscores, etc.).

from airflow.datasets import Dataset

my_dataset = Dataset("s3://my-bucket/my-data.csv")
# or
my_dataset = Dataset("my_folder/my_file.txt")

When using Airflow Datasets, remember that Airflow does not monitor the actual contents of your data. It doesn’t check if a file or table has been updated.

Instead, it tracks task completion. When a task that lists a Dataset in its outlets finishes successfully, Airflow marks that Dataset as “updated.” This means the task doesn’t need to actually modify any data — even a task that only runs a print() statement will still trigger any Consumer DAGs scheduled on that Dataset. It’s up to your task logic to ensure the underlying data is actually being modified when necessary. Even though Airflow isn’t checking the data directly, this mechanism still enables event-driven orchestration because your workflows can run when upstream data should be ready.

For example, if one DAG has a task that generates a report and writes it to a file, you can define a Dataset for that file. Another DAG that depends on the report can be triggered automatically as soon as the first DAG’s task completes. This removes the need for rigid time-based scheduling and reduces the risk of running on incomplete or missing data.

Tutorial: Scheduling with Datasets  

Create a producer DAG

1. Define your Dataset.  

In a new DAG file, define a variable using the Dataset object and pass in the path to your data as a string. In this example, it’s the path to a CSV file.

# producer.py
from airflow.datasets import Dataset 

# Define the dataset representing the CSV file 
csv_dataset = Dataset("/path/to/your_dataset.csv") 

2. Create a DAG with a task that updates the CSV dataset.

We’ll use the @dag and @task decorators for a cleaner structure. The key part is passing the outlets parameter to the task. This tells Airflow that the task updates a specific dataset. Once the task completes successfully, Airflow will consider the dataset "updated" and trigger any dependent DAGs.

We’re also using csv_dataset.uri to get the path to the dataset—this is the same path you defined earlier (e.g., "/path/to/your_dataset.csv").

# producer.py
from airflow.decorators import dag, task
from airflow.datasets import Dataset
from datetime import datetime
import pandas as pd
import os

# Define the dataset representing the CSV file
csv_dataset = Dataset("/path/to/your_dataset.csv")

@dag(
    dag_id='producer_dag',
    start_date=datetime(2025, 3, 31),
    schedule='@daily',
    catchup=False,
)
def producer_dag():

    @task(outlets=[csv_dataset])
    def update_csv():
        data = {'column1': [1, 2, 3], 'column2': ['A', 'B', 'C']}
        df = pd.DataFrame(data)
        file_path = csv_dataset.uri

        # Check if the file exists to append or write
        if os.path.exists(file_path):
            df.to_csv(file_path, mode='a', header=False, index=False)
        else:
            df.to_csv(file_path, index=False)

    update_csv()

producer_dag()

Create a Consumer DAG

Now that we have a producer DAG that is updating a Dataset. We can create our DAG that will be dependent on the consumer DAG. This is where the magic happens since this DAG will no longer be time dependent but rather Dataset dependant.  

1. Instantiate the same Dataset used in the Producer DAG

In a new DAG file (the consumer), start by defining the same Dataset that was used in the Producer DAG. This ensures both DAGs are referencing the exact same dataset path.

# consumer.py
from airflow.datasets import Dataset 

# Define the dataset representing the CSV file 
csv_dataset = Dataset("/path/to/your_dataset.csv") 

2. Set the schedule to the Dataset

Create your DAG and set the schedule parameter to the Dataset you instantiated earlier (the one being updated by the producer DAG). This tells Airflow to trigger this DAG only when that dataset is updated—no need for time-based scheduling.

# consumer.py
import datetime
from airflow.decorators import dag, task
from airflow.datasets import Dataset

csv_dataset = Dataset("/path/to/your_dataset.csv")

@dag(
    default_args={
        "start_date": datetime.datetime(2024, 1, 1, 0, 0),
        "owner": "Mayra Pena",
        "email": "mayra@example.com",
        "retries": 3
    },
    description="Sample Consumer DAG",
    schedule=[csv_dataset],
    tags=["transform"],
    catchup=False,
)
def data_aware_consumer_dag():
    
    @task
    def run_consumer():
        print("Processing updated CSV file")
    
    run_consumer()

dag = data_aware_consumer_dag()

Thats it!🎉 Now this DAG will run whenever the first Producer DAG completes (updates the file).  

Dry Principles for Datasets

When using Datasets you may be using the same dataset across multiple DAGs and therfore having to define it many times. There is a simple DRY (Dont Repeat Yourself) way to overcome this.

1. Create a central datasets.py file
To follow DRY (Don't Repeat Yourself) principles, centralize your dataset definitions in a utility module.

Simply create a utils folder and add a datasets.py file.
If you're using Datacoves, your Airflow-related files typically live in a folder named orchestrate, so your path might look like:
orchestrate/utils/datasets.py

2. Import the Dataset object

Inside your datasets.py file, import the Dataset class from Airflow:

from airflow.datasets import Dataset 

3. Define your Dataset in this file

Now that you’ve imported the Dataset object, define your dataset as a variable. For example, if your DAG writes to a CSV file:

from airflow.datasets import Dataset 

# Define the dataset representing the CSV file 
CSV_DATASET= Dataset("/path/to/your_dataset.csv") 

Notice we’ve written the variable name in all caps (CSV_DATASET)—this follows Python convention for constants, signaling that the value shouldn’t change. This makes your code easier to read and maintain.

4. Import the Dataset in your DAG

In your DAG file, simply import the dataset you defined in your utils/datasets.py file and use it as needed.

from airflow.decorators import dag, task
from orchestrate.utils.datasets import CSV_DATASET
from datetime import datetime
import pandas as pd
import os

@dag(
    dag_id='producer_dag',
    start_date=datetime(2025, 3, 31),
    schedule='@daily',
    catchup=False,
)
def producer_dag():

    @task(outlets=[CSV_DATASET])
    def update_csv():
        data = {'column1': [1, 2, 3], 'column2': ['A', 'B', 'C']}
        df = pd.DataFrame(data)
        file_path = CSV_DATASET.uri

        # Check if the file exists to append or write
        if os.path.exists(file_path):
            df.to_csv(file_path, mode='a', header=False, index=False)
        else:
            df.to_csv(file_path, index=False)

    update_csv()

producer_dag()

Now you can reference CSV_DATASET in your DAG's schedule or as a task outlet, keeping your code clean and consistent across projects.🎉

Visualizing Dataset Dependencies in the UI

You can visualize your Datasets as well as events triggered by Datasets in the Airflow UI.  There are 3 tabs that will prove helpful for implementation and debugging your event triggered pipelines:  

Dataset Events

The Dataset Events sub-tab shows a chronological list of recent events associated with datasets in your Airflow environment. Each entry details the dataset involved, the producer task that updated it, the timestamp of the update, and any triggered consumer DAGs. This view is important for monitoring the flow of data, ensuring that dataset updates occur as expected, and helps with prompt identification and resolution of issues within data pipelines.  

Dependency Graph

The Dependency Graph is a visual representation of the relationships between datasets and DAGs. It illustrates how producer tasks, datasets, and consumer DAGs interconnect, providing a clear overview of data dependencies within your workflows. This graphical depiction helps visualize the structure of your data pipelines to identify potential bottlenecks and optimize your pipeline.

Datasets

The Datasets sub-tab provides a list of all datasets defined in your Airflow instance. For each dataset, it shows important information such as the dataset's URI, associated producer tasks, and consumer DAGs. This centralized view provides efficient management of datasets, allowing users to track dataset usage across various workflows and maintain organized data dependencies.  

Datasets UI
Datasets View Airflow UI

Best Practices & Considerations

When working with Datasets, there are a couple of things to take into consideration to maintain readability.  

Naming datasets meaningfully: Ensure your names are verbose and descriptive. This will help the next person who is looking at your code and even future you.

Avoid overly granular datasets: While they are a great tool too many = hard to manage. So try to strike a balance.  

Monitor for dataset DAG execution delays: It is important to keep an eye out for delays since this could point to an issue in your scheduler configuration or system performance.  

Task Completion Signals Dataset Update: It’s important to understand that Airflow doesn’t actually check the contents of a dataset (like a file or table). A dataset is considered “updated” only when a task that lists it in its outlets completes successfully. So even if the file wasn’t truly changed, Airflow will still assume it was. At Datacoves, you can also trigger a DAG externally using the Airflow API and an AWS Lambda Function to trigger your DAG once data lands in an S3 Bucket.

Compatibility: Datasets are only in Airflow 2.4+. So, if you would like to take advantage of them, upgrade today!  

Datacoves provides a scalable Managed Airflow solution and handles these upgrades for you. This alleviates the stress of managing Airflow Infrastructure so you can data teams focus on their pipelines. Checkout how Datadrive saved 200 hours yearly by choosing Datacoves.  

Conclusion

The introduction of data-aware scheduling with Datasets in Apache Airflow is a big advancement in workflow orchestration. By enabling DAGs to trigger based on data updates rather than fixed time intervals, Airflow has become more adaptable and efficient in managing complex data pipelines.  

By adopting Datasets, you can enhance the maintainability and scalability of your workflows, ensuring that tasks are executed exactly when the upstream data is ready. This not only optimizes resource utilization but also simplifies dependency management across DAGs.

Give it a try! 😎

Last updated on
April 2, 2025

Get our free ebook dbt Cloud vs dbt Core

Comparing dbt Core and dbt Cloud? Download our eBook for insights on feature, pricing and total cost. Find the best fit for your business!

Get the PDF

Table of Contents

Get our free ebook dbt Cloud vs dbt Core