Environment
- Composer v3, fresh environment,
composer-3-airflow-2.10.5-build.13- Which includes: apache-airflow-providers-google==15.1.0
- Region: europe-north1, Small size, CeleryKubernetesExecutor
- DAG: Single DAG with 4 CloudRunExecuteJobOperator tasks using
deferrable=True - No custom configurations - all defaults
Problem
CloudRunExecuteJobOperator with deferrable=True fails intermittently when attempting to defer execution.
Error logs / stack trace
self.defer(
File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 1822, in defer
raise TaskDeferred(trigger=trigger, method_name=method_name, kwargs=kwargs, timeout=timeout)
airflow.exceptions.TaskDeferred
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 3371, in _wrap_pool_connect
return fn()
^^^^
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 327, in connect
return _ConnectionFairy._checkout(self)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 894, in _checkout
fairy = _ConnectionRecord.checkout(pool)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 493, in checkout
rec = pool._do_get()
^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/pool/impl.py", line 256, in _do_get
return self._create_connection()
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 273, in _create_connection
return _ConnectionRecord(self)
^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 388, in __init__
self.__connect()
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 690, in __connect
with util.safe_reraise():
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
compat.raise_(
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 686, in __connect
self.dbapi_connection = connection = pool._invoke_creator(self)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/create.py", line 574, in connect
return dialect.connect(*cargs, **cparams)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 598, in connect
return self.dbapi.connect(*cargs, **cparams)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/psycopg2/__init__.py", line 122, in connect
conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
psycopg2.OperationalError: connection to server at "localhost" (::1), port 3306 failed: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/opt/python3.11/lib/python3.11/site-packages/airflow/task/task_runner/standard_task_runner.py", line 117, in _start_by_fork
ret = args.func(args, dag=self.dag)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/cli/cli_config.py", line 49, in command
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/utils/cli.py", line 116, in wrapper
return f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/cli/commands/task_command.py", line 489, in task_run
task_return_code = _run_task_by_selected_method(args, _dag, ti)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/cli/commands/task_command.py", line 256, in _run_task_by_selected_method
return _run_raw_task(args, ti)
^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/cli/commands/task_command.py", line 341, in _run_raw_task
return ti._run_raw_task(
^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/utils/session.py", line 97, in wrapper
return func(*args, session=session, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 3008, in _run_raw_task
return _run_raw_task(
^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 288, in _run_raw_task
ti.defer_task(exception=defer, session=session)
File "/opt/python3.11/lib/python3.11/site-packages/airflow/utils/session.py", line 94, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 3196, in defer_task
_defer_task(ti=self, exception=exception, session=session)
File "/opt/python3.11/lib/python3.11/site-packages/airflow/api_internal/internal_api_call.py", line 166, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/utils/session.py", line 94, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1703, in _defer_task
session.flush()
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3449, in flush
self._flush(objects)
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush
with util.safe_reraise():
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
compat.raise_(
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush
flush_context.execute()
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
rec.execute(self)
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
util.preloaded.orm_persistence.save_obj(
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/orm/persistence.py", line 212, in save_obj
for (
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/orm/persistence.py", line 373, in _organize_states_for_save
for state, dict_, mapper, connection in _connections_for_states(
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/orm/persistence.py", line 1737, in _connections_for_states
connection = uowtransaction.transaction.connection(base_mapper)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 629, in connection
return self._connection_for_bind(bind, execution_options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 738, in _connection_for_bind
conn = self._parent._connection_for_bind(bind, execution_options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 750, in _connection_for_bind
conn = bind.connect()
^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/future/engine.py", line 412, in connect
return super(Engine, self).connect()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 3325, in connect
return self._connection_cls(self, close_with_result=close_with_result)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 96, in __init__
else engine.raw_connection()
^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 3404, in raw_connection
return self._wrap_pool_connect(self.pool.connect, _connection)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 3374, in _wrap_pool_connect
Connection._handle_dbapi_exception_noconnection(
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 2208, in _handle_dbapi_exception_noconnection
util.raise_(
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 3371, in _wrap_pool_connect
return fn()
^^^^
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 327, in connect
return _ConnectionFairy._checkout(self)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 894, in _checkout
fairy = _ConnectionRecord.checkout(pool)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 493, in checkout
rec = pool._do_get()
^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/pool/impl.py", line 256, in _do_get
return self._create_connection()
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 273, in _create_connection
return _ConnectionRecord(self)
^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 388, in __init__
self.__connect()
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 690, in __connect
with util.safe_reraise():
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
compat.raise_(
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 686, in __connect
self.dbapi_connection = connection = pool._invoke_creator(self)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/create.py", line 574, in connect
return dialect.connect(*cargs, **cparams)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 598, in connect
return self.dbapi.connect(*cargs, **cparams)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/psycopg2/__init__.py", line 122, in connect
conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) connection to server at "localhost" (::1), port 3306 failed: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
Behavior Pattern
-
Cloud Run job successfully starts
-
Operator attempts to defer (pause execution)
-
Immediate failure with above database error
-
Airflow retries the task
-
Often succeeds on 2nd/3rd retry (but not always)
Stack Trace Location
Fails at self.defer() call when trying to flush state to database:
- During
session.flush()in/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py:3196
Minimal Reproduction
python
CloudRunExecuteJobOperator(
task_id="any_task",
project_id=<PROJECT_ID>,
region="europe-north1",
job_name="any-cloud-run-job",
deferrable=True, # Fails
# deferrable=False works fine
)
What Makes This Strange
-
Minimal load: Only 4 tasks - should not stress any component
-
Intermittent: Sometimes works on retry, suggesting timing/initialization issue
-
Port 3306: Error shows port 3306 (typically MySQL) but Composer uses PostgreSQL
-
Fresh environment: No modifications from default Composer v3 setup
Workaround
Setting deferrable=False works consistently but eliminates the resource efficiency benefits.
Questions
-
Is this a known issue with the triggerer component in Composer v3?
-
Why would psycopg2 attempt to connect to port 3306?
-
Are there any race conditions in triggerer initialization?
-
Should we increase triggerer resources despite the minimal load?
What I’ve Verified
-
Environment is healthy (all components running)
-
Cloud Run jobs execute successfully when deferrable=False
-
No custom Airflow configurations
-
Triggerer component is running (1 pod, 0.5 vCPU, 1GB memory)
This seems like an infrastructure/initialization issue rather than a configuration problem, given it works intermittently and fails with minimal load.