In the domain of data orchestration, the effective management of intricate dependencies between Directed Acyclic Graphs (DAGs) presents a significant challenge.
This challenge is particularly pronounced when orchestrating workflows that encompass tasks with varying temporal frequencies, such as yearly, monthly, weekly, or even at specific times. In such scenarios, ensuring the dependable and punctual triggering of child tasks, in alignment with their parent tasks’ schedules, becomes a critical imperative
This blog post highlights a Google Cloud Cloud Composer solution that addresses this very challenge, empowering data engineers to orchestrate complex DAG relationships with finesse.
The core challenge: taming dependencies across time frequencies
Imagine a scenario where a daily DAG is dependent on parent DAGs running on varying frequencies like monthly and weekly etc. Ensuring that each child DAG executes seamlessly based on its parent’s schedule can quickly become a tangled web of dependencies. The traditional approaches often fall short in handling such intricate relationships, leading to potential delays, errors, and maintenance headaches.
The goal of this use-case is to provide a common pattern to automatically check for the parent dag execution and implement the composer dependency management. The primary challenge addressed is the need to handle complex dependencies between DAGs with different frequencies.
The solution: Airflow’s dependency management to the rescue
The proposed solution utilizes Airflow’s robust dependency management capabilities, specifically leveraging the External Task Sensor. A custom logic is implemented within a function to dynamically determine the date and time interval at which the child DAG should “poke” the parent DAG. This time interval calculation is achieved by employing the execution_date_fn parameter of the External Task Sensor operator.
This approach effectively establishes a hierarchical relationship between the parent and child DAGs, ensuring that the child DAGs tasks are executed only on the condition of the completion of their respective parent DAGs, irrespective of their varying frequencies.
Code snippet: orchestrating dependencies with precision
Let’s take a look at a code snippet that demonstrates how to implement this solution. Consider a scenario where a child DAG is dependent on the parent DAG that runs once a year.
Understanding External Task Sensor
The External Task Sensor is an Airflow Operator that executes a verification process for tasks that run in external or directed acyclic graphs (DAGs). To function effectively, it requires several parameters. Let us thoroughly examine each of these parameters:
- external_dag_id: Denotes the parent DAG’s ID for which the current DAG must wait.
- external_task_id: Represents the task ID that must be awaited within the parent DAG. If the value is None, the sensor will wait for the entire DAG to complete.
- allowed_states: This is a list specifying the permitted states for the task awaiting completion. The default value is [‘success’]. Other allowed values may include “failed” or “skipped.”
- check_existence: Setting this parameter to True prompts the sensor to verify the existence of the external task (when external_task_id is not None) or the DAG to wait for (when external_task_id is None). If the external task or DAG does not exist, the sensor will immediately cease waiting. The default value for this parameter is False.
Apart from these parameters, there are two significant parameters that facilitate the specification of the time when the child DAG should check for the parent DAG’s execution. These are as follows:
- execution_delta: This represents the time difference with the previous execution that must be taken into account. The default value is the same execution date as the current task or DAG.
- execution_date_fn: Calls a custom function that can contain the logic for calculating the exact date and time to inquire about the parent DAG.
In our case, given the varying frequencies, and the need for dynamic time calculation, it is advantageous to utilize the execution_date_fn function. An example of how to use the execution_date_fn is provided below:
def execution_delta_dependency(logical_date, **kwargs):
#get the date
dt = logical_date
#if you want to use the execution date, then the following
# parameter can be used
# exec_dt = kwargs['context']['execution_date']
# fetch the current executing task from kwargs task_instance_id=str(kwargs['task_instance']).split(':')[1].split(' ')[1].split('.')[1]
res = None
for sub in parent_tasks:
if sub['task_id'] == task_instance_id:
res = sub
break
# fetch the schedule frequency of the parent task
schedule_frequency=res['schedule_frequency']
#based on the schedule frequency, derive the parent dag poke
parent_dag_poke = ''
if schedule_frequency == "monthly":
parent_dag_poke = dt.replace(day=1).replace(hour=0, minute=0, second=0, microsecond=0)
elif schedule_frequency == "weekly":
parent_dag_poke = (dt - timedelta(days=dt.isoweekday() % 7)).replace(hour=0, minute=0, second=0, microsecond=0)
elif schedule_frequency == "yearly":
parent_dag_poke = dt.replace(day=1, month=1, hour=0, minute=0, second=0, microsecond=0)
elif schedule_frequency == "daily":
parent_dag_poke = (dt).replace(hour=0, minute=0, second=0, microsecond=0)
print(parent_dag_poke)
return parent_dag_poke
#alternatively, you can also use the croniter library to find the #previous execution date and time of the parent dag based on the #schedule frequency
#from croniter import croniter
#import math
#iter = croniter('frequency/cron job expression', )
# prev_instance = iter.get_prev(datetime)
#return prev_instance
***************************************************************
# Create external task sensors dynamically
external_task_sensors = []
for parent_task in parent_tasks:
external_task_sensor = ExternalTaskSensor(
task_id=parent_task["task_id"],
external_dag_id=parent_task["dag_id"],
timeout=900,
execution_date_fn=execution_delta_dependency,
poke_interval=60,
mode="reschedule",
check_existence=True
)
external_task_sensors.append(external_task_sensor)
Core functionality
The code snippet establishes dependencies between Airflow DAGs with varying schedules (yearly, monthly, weekly, daily). It dynamically creates ExternalTaskSensor tasks that wait for the successful completion of parent tasks in other DAGs before triggering their own execution.
execution_delta_dependency function
- Identifies Parent Task: Extracts the ID of the parent task that triggered the sensor.
- Retrieves Schedule Frequency: Fetches the schedule frequency (e.g., ‘monthly’) of the identified parent task.
- Calculates
parent_dag_poke: Determines the execution date the child DAG should check for the parent’s completion. This calculation is based on the parent’s schedule frequency and ensures proper dependency handling across different time scales. 2 approaches are mentioned here- Switch case to calculate the date and time at which the child dag should check for the parent dag completion
- Croniter library to get the previous execution runtime of the parent dag
- Returns
parent_dag_poke: Provides the calculated execution date to theExternalTaskSensor.
ExternalTaskSensor creation
- Iterates through Parent Tasks: Dynamically creates an
ExternalTaskSensorfor each parent task defined inparent_tasks. - Configures Sensors: Sets up each sensor with:
- The parent task’s ID and DAG ID.
- A timeout for waiting.
- The
execution_delta_dependencyfunction to calculate the correct execution date to check. - A poke interval to periodically check the parent task’s status.
- Reschedule mode to handle cases where the parent task isn’t yet complete.
- Existence check to ensure the parent task is valid.
Key points
- Dynamic Dependency Management: Handles dependencies between DAGs with different schedules, ensuring child tasks trigger at the right time based on their parent’s completion.
- ExternalTaskSensor: Core Airflow component used to monitor the status of tasks in external DAGs.
execution_date_fn: Custom function to calculate the appropriate execution date for dependency checks, crucial for handling varying schedule frequencies
Benefits of this approach
- Improved DAG Organization and Maintainability: By establishing clear parent-child relationships, DAGs become more organized and easier to maintain.
- Simplified Dependency Management: The dynamic configuration of
external_date_fnstreamlines the management of complex dependencies across varying time frequencies. - Reliable Execution: Child DAGs are guaranteed to execute on schedule, based on their parent’s completion, ensuring data consistency and workflow integrity.
- Scalability: This approach can readily scale to accommodate intricate DAG structures, making it suitable for large and complex data orchestration scenarios.
Conclusion
Managing complex task dependencies in Airflow can be a daunting task, but with the right approach, it becomes a symphony of orchestration. The Cloud Composer solution presented in this blog offers a robust and scalable way to handle dependencies across varying time frequencies. By leveraging Airflow’s dependency management capabilities and dynamic configuration, data engineers can ensure that their workflows execute flawlessly, leading to improved data quality, efficiency, and overall business success.
