GKEStartPodOperator pods marked as failed despite succeeding

When deferring GKEStartPodOperator, sometimes the pod finishes successfully but airflow marks it as a failure, typically when this happens 2 things happen :

  1. right after the deferral starts, a ConnectTimeoutError happens :
[2025-02-01, 00:21:49 UTC] {credentials_provider.py:402} INFO - Getting connection using `google.auth.default()` since no explicit credentials are provided. 
[2025-02-01, 00:21:49 UTC] {taskinstance.py:288} INFO - Pausing task as DEFERRED. dag_id=price_maps_gke_FR_PROD, task_id=run_price_maps_step1_fr_appt, run_id=scheduled__2025-01-01T00:15:00+00:00, execution_date=20250101T001500, start_date=20250201T002148 
[2025-02-01, 00:21:49 UTC] {taskinstance.py:340} ▶ Post task execution logs
[2025-02-02, 00:26:40 UTC] {local_task_job_runner.py:123} ▶ Pre task execution logs 
[2025-02-02, 00:26:41 UTC] {connection.py:277} WARNING - Connection schemes (type: google_cloud_platform) shall not contain '_' according to RFC3986.
[2025-02-02, 00:26:41 UTC] {base.py:84} INFO - Retrieving connection 'google_cloud_default' 
[2025-02-02, 00:26:41 UTC] {credentials_provider.py:402} INFO - Getting connection using `google.auth.default()` since no explicit credentials are provided. 
[2025-02-02, 00:28:51 UTC] {connectionpool.py:868} WARNING - Retrying (Retry(total=2, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ConnectTimeoutError(<urllib3.connection.HTTPSConnection object at 0x7a0ae6da1e50>, 'Connection to <redacted IP address> timed out. (connect timeout=None)')': /api/v1/namespaces/default/pods/price-maps-step1-fr-appt-952mvt6q
[2025-02-02, 00:29:55 UTC] {pod.py:834} INFO - [base] logs: <Pod logs start streaming correctly>
  1. Once the Pod finished and all logs are streamed correctly, a traceback for a aiohttp.client_exceptions.ClientConnectorError towards the same IP is printed :
[2025-02-02 01:32:14.118085+00:00] {pod.py:834} INFO - [base] logs: <Container logs reporting success of the task>
[2025-02-02 01:47:04.545146+00:00] {pod_manager.py:603} INFO - Pod price-maps-step1-fr-appt-952mvt6q has phase Running [2025-02-02 01:47:06.578366+00:00] {pod.py:966} INFO - Deleting pod: price-maps-step1-fr-appt-952mvt6q 
[2025-02-02 01:47:06.736706+00:00] {taskinstance.py:3312} ERROR - Task failed with exception Traceback (most recent call last): 
Traceback (most recent call last):
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 768, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 734, in _execute_callable
    return ExecutionCallableRunner(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/utils/operator_helpers.py", line 252, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 1824, in resume_execution
    return execute_callable(context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/google/cloud/operators/kubernetes_engine.py", line 809, in execute_complete
    return super().trigger_reentry(context, event)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 759, in trigger_reentry
    raise AirflowException(message)
airflow.exceptions.AirflowException: Traceback (most recent call last):
  File "/opt/python3.11/lib/python3.11/site-packages/aiohttp/connector.py", line 1109, in _wrap_create_connection
    sock = await aiohappyeyeballs.start_connection(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/aiohappyeyeballs/impl.py", line 104, in start_connection
    raise first_exception
  File "/opt/python3.11/lib/python3.11/site-packages/aiohappyeyeballs/impl.py", line 82, in start_connection
    sock = await _connect_sock(
           ^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/aiohappyeyeballs/impl.py", line 174, in _connect_sock
    await loop.sock_connect(sock, address)
  File "/opt/python3.11/lib/python3.11/asyncio/selector_events.py", line 638, in sock_connect
    return await fut
           ^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/asyncio/selector_events.py", line 678, in _sock_connect_cb
    raise OSError(err, f'Connect call failed {address}')
TimeoutError: [Errno 110] Connect call failed ('<Same IP as previous error>', 443)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/triggers/pod.py", line 162, in run
    event = await self._wait_for_container_completion()
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/triggers/pod.py", line 226, in _wait_for_container_completion
    pod = await self.hook.get_pod(self.pod_name, self.pod_namespace)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 757, in get_pod
    pod: V1Pod = await v1_api.read_namespaced_pod(
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/kubernetes_asyncio/client/api_client.py", line 185, in __call_api
    response_data = await self.request(
                    ^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/kubernetes_asyncio/client/rest.py", line 198, in GET
    return (await self.request("GET", url,
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/kubernetes_asyncio/client/rest.py", line 182, in request
    r = await self.pool_manager.request(**args)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/aiohttp/client.py", line 663, in _request
    conn = await self._connector.connect(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/aiohttp/connector.py", line 538, in connect
    proto = await self._create_connection(req, traces, timeout)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/aiohttp/connector.py", line 1050, in _create_connection
    _, proto = await self._create_direct_connection(req, traces, timeout)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/aiohttp/connector.py", line 1384, in _create_direct_connection
    raise last_exc
  File "/opt/python3.11/lib/python3.11/site-packages/aiohttp/connector.py", line 1353, in _create_direct_connection
    transp, proto = await self._wrap_create_connection(
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/aiohttp/connector.py", line 1124, in _wrap_create_connection
    raise client_error(req.connection_key, exc) from exc
aiohttp.client_exceptions.ClientConnectorError: Cannot connect to host <Same IP as previous error>:443 ssl:default [Connect call failed ('<Same IP as previous error>', 443)]

[2025-02-02 01:47:06.746442+00:00] {taskinstance.py:1226} INFO - Marking task as FAILED. dag_id=price_maps_gke_FR_PROD, task_id=run_price_maps_step1_fr_appt, run_id=scheduled__2025-01-01T00:15:00+00:00, execution_date=20250101T001500, start_date=20250201T002148, end_date=20250202T014706
[2025-02-02 01:47:06.746828+00:00] {taskinstance.py:1564} INFO - Executing callback at index 0: slack_failure_alert
[2025-02-02 01:47:07.359025+00:00] {taskinstance.py:340} ▶ Post task execution logs

This is problematic because other downsteam tasks also fail because of this, and i also can’t ignore the error because those tasks depend on objects created by this pod.

Hi @Amri_Tarik ,

Welcome to Google Cloud Community!

The “ConnectionTimeoutError” and “ClientConnectorError” suggests a possible network, server overload, rate limiting or configuration issue between Airflow worker and the Kubernetes API server. Although the pod completes successfully, Airflow may be unable to establish a connection with the Kubernetes API server, which results in the task failing.

Here are several suggestions that may help resolve the issue:

  • Connectivity: Ensure there are no issues on your connectivity or network latency, including checking your Firewalls and DNS, ensuring the network configuration in your composer environment is configured properly, assess if there is a high latency between your Cloud Composer environment and Kubernetes API server since it can contribute to timeout, network policies within GKE cluster, and check general network problems like network instability.
  • Rate Limiting: Ensure you’re not hitting the rate limit on the Kubernetes API.
  • Implement Exponential Backoff: Reduce your request rate and increase the wait time between attempts to avoid hitting the rate limit.
  • Resource Constraint: Airflow worker instances under heavy load can become temporarily unresponsive and lead to connection timeouts. Ensure that you are not hitting the resource limits on these instances.
  • Configuration: Ensure your Airflow configuration and environment is properly configured.

Was this helpful? If so, please accept this answer as “Solution”. If you need additional assistance, reply here within 2 business days and I’ll be happy to help.