Airflow not able to parallelize tasks with .expand() - GCP Composer 3

Hello,

I was developing dag pipelines in Airflow with Composer 3 set-up. Everything was working fine even with a pipeline that uses .expand() to parallelize many files download.

As I was testing it with only three files I tried adjusting the resources and the environment configuration to allow Airflow to manage many same actions but with a more real environment once in production.

Now, when I execute the DAG it gets stucked and don’t perform correctly when there is more than one file. The other pipelines that manages only one file work fine, even this one works fine with only one file… I retrace the changes in the configuration but now it does not work anymore as it was at the beggining.

Also it does not store the logs, so even when task gets skipped or failed Error Logging shows error with logs retention and it does not pops up any log. Error: *** The task A from DAG might not have been executed, logs were deleted as part of logs retention (default of 30 days), or worker executing it might have finished abnormally (e.g. was evicted).

Also a common error that appears is that it lost connection to database sqlachemy.

Please any solution recommended?

Hi @hervasj ,

Welcome to Google Cloud Community!

This could be related to resource constraint. To resolve this, make sure to provision enough resources for workers. While on the error logs, you might want to check this documentation and apply the provided solution stated.

If the issue persists, I suggest reaching out to Google Cloud Support. When reaching out, include detailed information and relevant screenshots of the errors you’ve encountered. This will assist them in diagnosing and resolving your issue more efficiently.

Was this helpful? If so, please accept this answer as “Solution”. If you need additional assistance, reply here within 2 business days and I’ll be happy to help.

1 Like

I appreciate the answer Caryna. I’ll retake the project now that we have Terraform deployment autonomy, as soon as I get the answer I’ll post it in this thread.

Hi hervasj,

There may be a few things happening, max_active_tis_per_dagrun and max_active_tis_per_dag may be cancelling tasks prematurely if they are set too low. Airflow also has max_map_length that generally limits the number of tasks that can be mapped, I believe this means you may struggle when files start exceeding 1000 files so maybe add a yielding function.

Also check if your executor is actually compatible with the extend method and ensure parallelization is turned on in the env.

Just did a quick search apparently certain airflow operators are not compatible with extend as well.

Hope you find the answer I am keen to learn.