Cloud Composer error with Dataflow Operator in deferrable mode

Hello!

I’m currently testing my Composer 2 environment to upgrade from composer-2.9.11-airflow-2.10.2 to composer-2.11.1-airflow-2.10.2. However, I’ve run into a problem when using "

DataflowStartFlexTemplateOperator" in deferrable mode.
In this new version, when I try to use the operator, the Dataflow job gets created successfully, but I get a permission error when trying to get the status:

Task failed with exception
Traceback (most recent call last):
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 768, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 734, in _execute_callable
    return ExecutionCallableRunner(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/utils/operator_helpers.py", line 252, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 1824, in resume_execution
    return execute_callable(context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/google/cloud/operators/dataflow.py", line 936, in execute_complete
    raise AirflowException(event["message"])
airflow.exceptions.AirflowException: 403 Permission 'dataflow.jobs.get' denied on resource (or it may not exist). [reason: "IAM_PERMISSION_DENIED"
domain: "iam.googleapis.com"
metadata {
  key: "permission"
  value: "dataflow.jobs.get"
}
]​

Here’s a print:

I can confirm it’s some problem with deferrable operators because if I change the operator to not use deferrable mode, the task executes successfully.

Also, I’ve checked in Composer v3 and event the recently released 28/Jan version that this problem still persists. I’ve also tried to upgrade/downgrade the apache-airflow-providers-google to several versions (including 10.26.0, 10.25.0 and 11.0.0), but nothing’s changed.

The only thing I haven’t been able to test is to create a custom dataflow connection, but it would be best if I could simply use the default connection anyway.

Any ideas to further investigate this issue will be appreciated.

Hi @lucas_rosa_dotz ,

Welcome to Google Cloud Community!

I understand you’re having an issue on your Composer 2 environment when you upgrade from composer-2.9.11-airflow-2.10.2 to composer-2.11.1-airflow-2.10.2 with Dataflow Operator in deferrable mode.

Here are several suggestions that may help resolve the issue:

  • The error suggests a missing or insufficient permissions in IAM roles, deferrable operators “hand off” part of the execution and can sometimes make identity and permissions more complex, check which Airflow service account/identity is checking the dataflow job status, and grant it with dataflow.jobs.get. In addition, ensure also that the service account has roles/dataflow.admin and roles/dataflow.viewer to execute Dataflow jobs and get their status. You can check this AM & Admin > IAM.
  • You can also be more specific, create a dedicated service account for your Airflow specifically for Dataflow operations.
  • Check your Audit Logging, to further analyze your issue, you may examine and review logs which can provide more insights into the error.

Was this helpful? If so, please accept this answer as “Solution”. If you need additional assistance, reply here within 2 business days and I’ll be happy to help.

So I kind of accidentally found the solution to this specific problem. When using the operator

DataflowStartFlexTemplateOperator in deferrable mode, you MUST supply the project_id parameter for it to work properly. It’s a bit confusing since, like I said, this problem doesn’t show up outside of deferrable mode…
Anyway, it was a very quick fix and everything seems to be in working order now.