In Apache Airflow, scheduling workflows has traditionally been managed using the schedule
parameter, which accepts definitions such as datetime
objects or cron expressions to establish time-based intervals for DAG (Directed Acyclic Graph) executions. Airflow was already a powerful scheduler but became even more efficient after Airflow 2.4 when it introduced Datasets
into scheduling. This feature enables data-driven DAG execution by allowing workflows to be triggered by other DAG completions rather than relying on predetermined time intervals. This means you can write a DAG that updates a file and once that DAG completes it will trigger another DAG.
In this article, we'll dive into the concept of Airflow Datasets, explore their impact on workflow orchestration, and provide a step-by-step guide to schedule your DAGs using Datasets!
Understanding Airflow Scheduling (Pre-Datasets)
Before Airflow 2.4, DAG scheduling in Airflow was primarily time-based, relying on parameters like schedule
and start_date
to define execution times. With this set up there were three ways to schedule your DAGs: Cron, presets, or timedelta objects. Let's examine each one.
- Cron Expressions: These expressions allowed precise scheduling. For example, to run a DAG daily at 4:05 AM, you would set
schedule='5 4 * * *'.
- Presets: Airflow provided string presets for common intervals:
@hourly
: Runs the DAG at the beginning of every hour.
@daily
: Runs the DAG at midnight every day.
@weekly
: Runs the DAG at midnight on the first day of the week.
@monthly
: Runs the DAG at midnight on the first day of the month.
@yearly
: Runs the DAG at midnight on January 1st.
- Timedelta Objects: For intervals not easily expressed with cron, a timedelta object could be used. For instance,
schedule=timedelta(hours=6)
would schedule the DAG every six hours.
Limitations of Time-Based Scheduling
While effective for most complex jobs, time-based scheduling had some limitations:
Fixed Timing: DAGs ran at predetermined times, regardless of data readiness (this is the key to Datasets). If data wasn't available at the scheduled time, tasks could fail or process incomplete data.
Sensors and Polling: To handle data dependencies, sensors were employed to wait for data availability. However, sensors often relied on continuous polling, which could be resource-intensive and lead to inefficiencies.
Airflow Datasets were created to overcome these scheduling limitations.
Intro to Airflow Datasets
When using Datasets in Airflow, you'll typically work with two types of DAGs: Producer and Consumer DAGs.
What is a Producer DAG?
A DAG responsible for defining and "updating" a specific Dataset. We say "updating" because Airflow considers a Dataset "updated" simply when a task that lists it in its outlets
completes successfully — regardless of whether the data was truly modified.
A Producer DAG:
✅ Must have the Dataset variable defined or imported
✅ Must include a task with the outlets
parameter set to that Dataset
What is a Consumer DAG?
A DAG that is scheduled to execute once the Producer DAG successfully completes.
A Consumer DAG:
✅ Must reference the same Dataset using the schedule
parameter
It’s this producer-consumer relationship that enables event-driven scheduling in Airflow — allowing workflows to run as soon as the data they're dependent on is ready, without relying on fixed time intervals.
What are Airflow Datasets?
A Dataset is a way to represent a specific set of data. Think of it as a label or reference to a particular data resource. This can be anything: a csv file, an s3 bucket or SQL table. A Dataset is defined by passing a string path to the Dataset()
object. This path acts as an identifier — it doesn't have to be a real file or URL, but it should be consistent, unique, and ideally in ASCII format (plain English letters, numbers, slashes, underscores, etc.).
from airflow.datasets import Dataset
my_dataset = Dataset("s3://my-bucket/my-data.csv")
# or
my_dataset = Dataset("my_folder/my_file.txt")
When using Airflow Datasets, remember that Airflow does not monitor the actual contents of your data. It doesn’t check if a file or table has been updated.
Instead, it tracks task completion. When a task that lists a Dataset in its outlets
finishes successfully, Airflow marks that Dataset as “updated.” This means the task doesn’t need to actually modify any data — even a task that only runs a print()
statement will still trigger any Consumer DAGs scheduled on that Dataset. It’s up to your task logic to ensure the underlying data is actually being modified when necessary. Even though Airflow isn’t checking the data directly, this mechanism still enables event-driven orchestration because your workflows can run when upstream data should be ready.
For example, if one DAG has a task that generates a report and writes it to a file, you can define a Dataset for that file. Another DAG that depends on the report can be triggered automatically as soon as the first DAG’s task completes. This removes the need for rigid time-based scheduling and reduces the risk of running on incomplete or missing data.
Tutorial: Scheduling with Datasets
Create a producer DAG
1. Define your Dataset.
In a new DAG file, define a variable using the Dataset
object and pass in the path to your data as a string. In this example, it’s the path to a CSV file.
# producer.py
from airflow.datasets import Dataset
# Define the dataset representing the CSV file
csv_dataset = Dataset("/path/to/your_dataset.csv")
2. Create a DAG with a task that updates the CSV dataset.
We’ll use the @dag
and @task
decorators for a cleaner structure. The key part is passing the outlets
parameter to the task. This tells Airflow that the task updates a specific dataset. Once the task completes successfully, Airflow will consider the dataset "updated" and trigger any dependent DAGs.
We’re also using csv_dataset.uri
to get the path to the dataset—this is the same path you defined earlier (e.g., "/path/to/your_dataset.csv"
).
# producer.py
from airflow.decorators import dag, task
from airflow.datasets import Dataset
from datetime import datetime
import pandas as pd
import os
# Define the dataset representing the CSV file
csv_dataset = Dataset("/path/to/your_dataset.csv")
@dag(
dag_id='producer_dag',
start_date=datetime(2025, 3, 31),
schedule='@daily',
catchup=False,
)
def producer_dag():
@task(outlets=[csv_dataset])
def update_csv():
data = {'column1': [1, 2, 3], 'column2': ['A', 'B', 'C']}
df = pd.DataFrame(data)
file_path = csv_dataset.uri
# Check if the file exists to append or write
if os.path.exists(file_path):
df.to_csv(file_path, mode='a', header=False, index=False)
else:
df.to_csv(file_path, index=False)
update_csv()
producer_dag()
Create a Consumer DAG
Now that we have a producer DAG that is updating a Dataset. We can create our DAG that will be dependent on the consumer DAG. This is where the magic happens since this DAG will no longer be time dependent but rather Dataset dependant.
1. Instantiate the same Dataset used in the Producer DAG
In a new DAG file (the consumer), start by defining the same Dataset
that was used in the Producer DAG. This ensures both DAGs are referencing the exact same dataset path.
# consumer.py
from airflow.datasets import Dataset
# Define the dataset representing the CSV file
csv_dataset = Dataset("/path/to/your_dataset.csv")
2. Set the schedule to the Dataset
Create your DAG and set the schedule
parameter to the Dataset
you instantiated earlier (the one being updated by the producer DAG). This tells Airflow to trigger this DAG only when that dataset is updated—no need for time-based scheduling.
# consumer.py
import datetime
from airflow.decorators import dag, task
from airflow.datasets import Dataset
csv_dataset = Dataset("/path/to/your_dataset.csv")
@dag(
default_args={
"start_date": datetime.datetime(2024, 1, 1, 0, 0),
"owner": "Mayra Pena",
"email": "mayra@example.com",
"retries": 3
},
description="Sample Consumer DAG",
schedule=[csv_dataset],
tags=["transform"],
catchup=False,
)
def data_aware_consumer_dag():
@task
def run_consumer():
print("Processing updated CSV file")
run_consumer()
dag = data_aware_consumer_dag()
Thats it!🎉 Now this DAG will run whenever the first Producer DAG completes (updates the file).
Dry Principles for Datasets
When using Datasets you may be using the same dataset across multiple DAGs and therfore having to define it many times. There is a simple DRY (Dont Repeat Yourself) way to overcome this.
1. Create a central datasets.py
file
To follow DRY (Don't Repeat Yourself) principles, centralize your dataset definitions in a utility module.
Simply create a utils
folder and add a datasets.py
file.
If you're using Datacoves, your Airflow-related files typically live in a folder named orchestrate
, so your path might look like:orchestrate/utils/datasets.py
2. Import the Dataset
object
Inside your datasets.py
file, import the Dataset
class from Airflow:
from airflow.datasets import Dataset
3. Define your Dataset in this file
Now that you’ve imported the Dataset
object, define your dataset as a variable. For example, if your DAG writes to a CSV file:
from airflow.datasets import Dataset
# Define the dataset representing the CSV file
CSV_DATASET= Dataset("/path/to/your_dataset.csv")
Notice we’ve written the variable name in all caps (CSV_DATASET
)—this follows Python convention for constants, signaling that the value shouldn’t change. This makes your code easier to read and maintain.
4. Import the Dataset in your DAG
In your DAG file, simply import the dataset you defined in your utils/datasets.py
file and use it as needed.
from airflow.decorators import dag, task
from orchestrate.utils.datasets import CSV_DATASET
from datetime import datetime
import pandas as pd
import os
@dag(
dag_id='producer_dag',
start_date=datetime(2025, 3, 31),
schedule='@daily',
catchup=False,
)
def producer_dag():
@task(outlets=[CSV_DATASET])
def update_csv():
data = {'column1': [1, 2, 3], 'column2': ['A', 'B', 'C']}
df = pd.DataFrame(data)
file_path = CSV_DATASET.uri
# Check if the file exists to append or write
if os.path.exists(file_path):
df.to_csv(file_path, mode='a', header=False, index=False)
else:
df.to_csv(file_path, index=False)
update_csv()
producer_dag()
Now you can reference CSV_DATASET
in your DAG's schedule
or as a task outlet
, keeping your code clean and consistent across projects.🎉
Visualizing Dataset Dependencies in the UI
You can visualize your Datasets as well as events triggered by Datasets in the Airflow UI. There are 3 tabs that will prove helpful for implementation and debugging your event triggered pipelines:
Dataset Events
The Dataset Events sub-tab shows a chronological list of recent events associated with datasets in your Airflow environment. Each entry details the dataset involved, the producer task that updated it, the timestamp of the update, and any triggered consumer DAGs. This view is important for monitoring the flow of data, ensuring that dataset updates occur as expected, and helps with prompt identification and resolution of issues within data pipelines.
Dependency Graph
The Dependency Graph is a visual representation of the relationships between datasets and DAGs. It illustrates how producer tasks, datasets, and consumer DAGs interconnect, providing a clear overview of data dependencies within your workflows. This graphical depiction helps visualize the structure of your data pipelines to identify potential bottlenecks and optimize your pipeline.
Datasets
The Datasets sub-tab provides a list of all datasets defined in your Airflow instance. For each dataset, it shows important information such as the dataset's URI, associated producer tasks, and consumer DAGs. This centralized view provides efficient management of datasets, allowing users to track dataset usage across various workflows and maintain organized data dependencies.

Best Practices & Considerations
When working with Datasets, there are a couple of things to take into consideration to maintain readability.
Naming datasets meaningfully: Ensure your names are verbose and descriptive. This will help the next person who is looking at your code and even future you.
Avoid overly granular datasets: While they are a great tool too many = hard to manage. So try to strike a balance.
Monitor for dataset DAG execution delays: It is important to keep an eye out for delays since this could point to an issue in your scheduler configuration or system performance.
Task Completion Signals Dataset Update: It’s important to understand that Airflow doesn’t actually check the contents of a dataset (like a file or table). A dataset is considered “updated” only when a task that lists it in its outlets
completes successfully. So even if the file wasn’t truly changed, Airflow will still assume it was. At Datacoves, you can also trigger a DAG externally using the Airflow API and an AWS Lambda Function to trigger your DAG once data lands in an S3 Bucket.
Compatibility: Datasets are only in Airflow 2.4+. So, if you would like to take advantage of them, upgrade today!
Datacoves provides a scalable Managed Airflow solution and handles these upgrades for you. This alleviates the stress of managing Airflow Infrastructure so you can data teams focus on their pipelines. Checkout how Datadrive saved 200 hours yearly by choosing Datacoves.
Conclusion
The introduction of data-aware scheduling with Datasets in Apache Airflow is a big advancement in workflow orchestration. By enabling DAGs to trigger based on data updates rather than fixed time intervals, Airflow has become more adaptable and efficient in managing complex data pipelines.
By adopting Datasets, you can enhance the maintainability and scalability of your workflows, ensuring that tasks are executed exactly when the upstream data is ready. This not only optimizes resource utilization but also simplifies dependency management across DAGs.
Give it a try! 😎