Cloud Composer v3: Deferrable Operators Fail with Database Connection Error

Environment

  • Composer v3, fresh environment, composer-3-airflow-2.10.5-build.13
    • Which includes: apache-airflow-providers-google==15.1.0
  • Region: europe-north1, Small size, CeleryKubernetesExecutor
  • DAG: Single DAG with 4 CloudRunExecuteJobOperator tasks using deferrable=True
  • No custom configurations - all defaults

Problem

CloudRunExecuteJobOperator with deferrable=True fails intermittently when attempting to defer execution.

Error logs / stack trace

    self.defer(
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 1822, in defer
    raise TaskDeferred(trigger=trigger, method_name=method_name, kwargs=kwargs, timeout=timeout)
airflow.exceptions.TaskDeferred

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 3371, in _wrap_pool_connect
    return fn()
           ^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 327, in connect
    return _ConnectionFairy._checkout(self)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 894, in _checkout
    fairy = _ConnectionRecord.checkout(pool)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 493, in checkout
    rec = pool._do_get()
          ^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/pool/impl.py", line 256, in _do_get
    return self._create_connection()
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 273, in _create_connection
    return _ConnectionRecord(self)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 388, in __init__
    self.__connect()
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 690, in __connect
    with util.safe_reraise():
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 686, in __connect
    self.dbapi_connection = connection = pool._invoke_creator(self)
                                         ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/create.py", line 574, in connect
    return dialect.connect(*cargs, **cparams)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 598, in connect
    return self.dbapi.connect(*cargs, **cparams)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/psycopg2/__init__.py", line 122, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
psycopg2.OperationalError: connection to server at "localhost" (::1), port 3306 failed: server closed the connection unexpectedly
	This probably means the server terminated abnormally
	before or while processing the request.


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/task/task_runner/standard_task_runner.py", line 117, in _start_by_fork
    ret = args.func(args, dag=self.dag)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/utils/cli.py", line 116, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/cli/commands/task_command.py", line 489, in task_run
    task_return_code = _run_task_by_selected_method(args, _dag, ti)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/cli/commands/task_command.py", line 256, in _run_task_by_selected_method
    return _run_raw_task(args, ti)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/cli/commands/task_command.py", line 341, in _run_raw_task
    return ti._run_raw_task(
           ^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/utils/session.py", line 97, in wrapper
    return func(*args, session=session, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 3008, in _run_raw_task
    return _run_raw_task(
           ^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 288, in _run_raw_task
    ti.defer_task(exception=defer, session=session)
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/utils/session.py", line 94, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 3196, in defer_task
    _defer_task(ti=self, exception=exception, session=session)
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/api_internal/internal_api_call.py", line 166, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/utils/session.py", line 94, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 1703, in _defer_task
    session.flush()
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3449, in flush
    self._flush(objects)
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush
    with util.safe_reraise():
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush
    flush_context.execute()
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
    rec.execute(self)
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
    util.preloaded.orm_persistence.save_obj(
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/orm/persistence.py", line 212, in save_obj
    for (
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/orm/persistence.py", line 373, in _organize_states_for_save
    for state, dict_, mapper, connection in _connections_for_states(
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/orm/persistence.py", line 1737, in _connections_for_states
    connection = uowtransaction.transaction.connection(base_mapper)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 629, in connection
    return self._connection_for_bind(bind, execution_options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 738, in _connection_for_bind
    conn = self._parent._connection_for_bind(bind, execution_options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 750, in _connection_for_bind
    conn = bind.connect()
           ^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/future/engine.py", line 412, in connect
    return super(Engine, self).connect()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 3325, in connect
    return self._connection_cls(self, close_with_result=close_with_result)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 96, in __init__
    else engine.raw_connection()
         ^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 3404, in raw_connection
    return self._wrap_pool_connect(self.pool.connect, _connection)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 3374, in _wrap_pool_connect
    Connection._handle_dbapi_exception_noconnection(
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 2208, in _handle_dbapi_exception_noconnection
    util.raise_(
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 3371, in _wrap_pool_connect
    return fn()
           ^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 327, in connect
    return _ConnectionFairy._checkout(self)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 894, in _checkout
    fairy = _ConnectionRecord.checkout(pool)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 493, in checkout
    rec = pool._do_get()
          ^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/pool/impl.py", line 256, in _do_get
    return self._create_connection()
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 273, in _create_connection
    return _ConnectionRecord(self)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 388, in __init__
    self.__connect()
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 690, in __connect
    with util.safe_reraise():
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 686, in __connect
    self.dbapi_connection = connection = pool._invoke_creator(self)
                                         ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/create.py", line 574, in connect
    return dialect.connect(*cargs, **cparams)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 598, in connect
    return self.dbapi.connect(*cargs, **cparams)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/psycopg2/__init__.py", line 122, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) connection to server at "localhost" (::1), port 3306 failed: server closed the connection unexpectedly
	This probably means the server terminated abnormally
	before or while processing the request.

Behavior Pattern

  1. Cloud Run job successfully starts

  2. Operator attempts to defer (pause execution)

  3. Immediate failure with above database error

  4. Airflow retries the task

  5. Often succeeds on 2nd/3rd retry (but not always)

Stack Trace Location

Fails at self.defer() call when trying to flush state to database:

  • During session.flush() in /opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py:3196

Minimal Reproduction

python

CloudRunExecuteJobOperator(
    task_id="any_task",
    project_id=<PROJECT_ID>,
    region="europe-north1",
    job_name="any-cloud-run-job",
    deferrable=True,  # Fails
    # deferrable=False works fine
)

What Makes This Strange

  1. Minimal load: Only 4 tasks - should not stress any component

  2. Intermittent: Sometimes works on retry, suggesting timing/initialization issue

  3. Port 3306: Error shows port 3306 (typically MySQL) but Composer uses PostgreSQL

  4. Fresh environment: No modifications from default Composer v3 setup

Workaround

Setting deferrable=False works consistently but eliminates the resource efficiency benefits.

Questions

  1. Is this a known issue with the triggerer component in Composer v3?

  2. Why would psycopg2 attempt to connect to port 3306?

  3. Are there any race conditions in triggerer initialization?

  4. Should we increase triggerer resources despite the minimal load?

What I’ve Verified

  • Environment is healthy (all components running)

  • Cloud Run jobs execute successfully when deferrable=False

  • No custom Airflow configurations

  • Triggerer component is running (1 pod, 0.5 vCPU, 1GB memory)

This seems like an infrastructure/initialization issue rather than a configuration problem, given it works intermittently and fails with minimal load.

Hi simvik,

An intermittent failure occurs with ‘CloudRunExecuteJobOperator’ when ‘deferrable=True’ in Composer v3, where a PostgreSQL driver (‘psycopg2’) unexpectedly attempts to connect to ‘localhost:3306’, a MySQL port, during the task deferral phase.

Here are the potential ways that might help with your use case:

  • Investigate Airflow Metadata Database Configuration: You may want to check your environment variables and configuration settings for anything that could be unintentionally redirecting your database connection to ‘localhost:3306’.
  • Temporarily Increase Triggered Resources: You may use this as a diagnostic step to determine whether your environment’s resource constraints are contributing to a potential race condition during triggered initialization, though it’s unlikely to resolve your specific localhost:3306 error.
  • Continue using deferrable=False: Continue using this fallback approach while you investigate the underlying cause of the issue.
  • Timed Delay to Uncover Race Conditions: You might try introducing a brief delay in task execution as a diagnostic workaround to help surface potential race conditions.

For more information, you can check the following documentation below:

If you continue to run into issues, consider reaching out to Google Cloud Support to further check underlying issues. When you contact them, be sure to provide as much detail as possible and include screenshots. This will help them understand your problem better and get it sorted out more quickly.

Hi Marvin,

Thank you for your reply. I have tried the above steps, but I have unfortunately been unable to get the operator working in a stable way. I suspect this to be a bug. I have filed a bug report here: Google Issue Tracker