Time optimization in the composer dataproc communication

Hi all,

I’m using Cloud Composer (version 2.6.3+composer) to orchestrate all my data pipeline. I’m using Dataproc Serverless which I’m creating from Composer using the following code:

conversion_parquet = DataprocCreateBatchOperator(
task_id=“conversion_parquet”,
project_id=PROJECT_ID,
region=REGION,
batch=BATCH_CONFIG_PROCESS,
batch_id=BATCH_ID_PROCESS,
polling_interval_seconds=5,
)

Here is the problem, in the dataproc batch code I’m printing the timestamp, and when the script finishes the time stamp is, for example, 14:03:50.361774 . But in my Airflow task the ended time is 14:05:11 .

Why takes more than 1 minute to sync ? I configured the polling interval in the task to 5 secs but it seems to not be working. How can I reduce this time?

I don’t know if it is necessary but the Dataproc batch configuration is the following:

“pyspark_batch”: {
“main_python_file_uri”: PYTHON_FILE_LOCATION_PROCESS,
“args”: python_args,
“python_file_uris”: [PYTHON_UTILS_FILES, PYTHON_DEPENDENCES_FILES]
},
“environment_config” : {
“execution_config”:{

“service_account”: f"dataproc-sa@{PROJECT_ID}.iam.gserviceaccount.com"
},
“peripherals_config”: {
“spark_history_server_config”: {
“dataproc_cluster”: f"dataproc.googleapis.com/projects/{PROJECT_ID}/regions/europe-west1/clusters/dataproc-01",
},
},
},
“runtime_config” : {
“version”: “2.1”,
}

Thank you !

The delay you’re experiencing between the completion of your Dataproc Serverless job and the Cloud Composer task’s end time is mainly due to the shutdown overhead associated with Dataproc Serverless, especially when the Spark History Server is enabled. When your job’s code finishes executing—indicated by the timestamp you printed—Dataproc Serverless doesn’t immediately mark the job as complete. Instead, it undertakes additional tasks such as cleaning up resources, finalizing logs, and shutting down the underlying infrastructure. This cleanup process can introduce delays of a minute or more before the job status is updated to “completed” in the Dataproc API.

Enabling the Spark History Server adds further overhead. You’ve configured your job to use the Spark History Server by specifying the spark_history_server_config in your peripherals_config. This configuration requires the job to write event logs to GCS and interact with the specified Dataproc cluster. As a result, additional time is spent ensuring all logs are properly written and resources are synchronized, which extends the job’s completion time.

To reduce this delay, you can disable the Spark History Server in your batch configuration. By removing or commenting out the spark_history_server_config, you eliminate the need for the job to write detailed event logs to the cluster, thereby shortening the cleanup process. Here’s how you can modify your batch configuration:

"environment_config": {
    "execution_config": {
        "service_account": f"dataproc-sa@{PROJECT_ID}.iam.gserviceaccount.com"
    },
    # Remove or comment out the peripherals_config section
    # "peripherals_config": {
    #     "spark_history_server_config": {
    #         "dataproc_cluster": f"dataproc.googleapis.com/projects/{PROJECT_ID}/regions/europe-west1/clusters/dataproc-01",
    #     },
    # },
},

Keep in mind that some overhead is unavoidable with Dataproc Serverless due to its nature of managing resources on your behalf. Even with a polling interval set to every 5 seconds in Airflow, the task will only complete once Dataproc reports the job as finished. Disabling the Spark History Server should significantly reduce the delay between your Dataproc job’s completion and the Cloud Composer task’s end time, but it may not eliminate it entirely.

Be sure to consider how disabling the Spark History Server might affect your ability to debug or monitor your jobs. While you won’t have access to the detailed event logs provided by the Spark History Server, you can still use Cloud Logging and other monitoring tools for troubleshooting.