Concurrent UPDATES or INSERTS to a BQ table

Hello Community,

I am building a validation process for checking whether a .csv file was successfully ingested into a BQ table.

I have a BQ table: ingestion_status(table_name, status, ingestion_date) that maintains the ingestion status of the incoming .csv files daily.

This is the flow:

  1. A .csv file is added to a GCS bucket.

  2. A Java cloud function is triggered, inserts the .csv file into a corresponding BQ table.

  3. A validation is performed to validate the ingestion process. The result of this validation is put to the same BQ table: ingestion_status.

This is the Java Code:

public TableResult updateCsvFileIngestionStatus(String tableName, Boolean status) {
    String query = buildUpdateStatusQuery(tableName, status);
    try {
        int randomNumber = getRandomNumber();
        logger.info("File: {} has waited: {} seconds to update status", tableName, randomNumber);

        return bigQueryProcessor.executeQuery(query);
    } catch (InterruptedException e) {
        logger.error(BQ002.name(), e, INTERRUPTED_EXCEPTION, query);
    } catch (BigQueryException e) {
        logger.error(BQ001.name(), e, BIG_QUERY_EXCEPTION, query);
    }

    return null;
}

Please note that every day I get over 30 .csv files trat trigger 30 cloud functions that simultaneously try to update the same ingestion_status BQ table.
In this scenario I got the following BQ exception:

Query error: Could not serialize access to table due to concurrent update

Please note that 30 is greater than the 2 concurrent updates and the queue size limit of 20 for concurrent DML UPDATES.

I changed the code to this:

public TableResult updateCsvFileIngestionStatus(String tableName, Boolean status) {
    String query = buildUpdateStatusQuery(tableName, status);
    try {
        int randomNumber = getRandomNumber();
        Thread.sleep(MILLIS_MULTIPLICATION_FACTOR * randomNumber);
        logger.info("File: {} has waited: {} seconds to update status", tableName, randomNumber);

        return bigQueryProcessor.executeQuery(query);
    } catch (InterruptedException e) {
        logger.error(BQ002.name(), e, INTERRUPTED_EXCEPTION, query);
    } catch (BigQueryException e) {
        logger.error(BQ001.name(), e, BIG_QUERY_EXCEPTION, query);
    }

    return null;
}

The wait time is a random number between 0 and 600 seconds.
After this change I fixed the previous BQ error BUT I get these types of Big Query errors:

com.google.cloud.bigquery.BigQueryException: Unexpected end of file from server
com.google.cloud.bigquery.BigQueryException: Error writing request body to server
com.google.cloud.bigquery.BigQueryException: Broken pipe

I also tried to replace UPDATES with INSERTS but to no success.

From documentation I got this:

Queued mutating DML statements per table 20 statements A table can have up to 20 mutating DML statements in the queue waiting to run. If you submit additional mutating DML statements for the table, then those statements fail.

and this:

Maximum rate of DML statements for each table 25 statements every 10 seconds Your project can run up to 25 DML statements every 10 seconds for each table. Both INSERT and mutating DML statements contribute to this limit.

I really need this validation to work since I am planning to further use the BQ ingestion_status table in other Dataform assertion scripts that will prevent sequential Dataform workflows (MERGE) to get executed in case of a failed .csv file ingestion.

Do you have any idea how could I concurrent insert/update records of a single BQ table from concurrently running Cloud Functions?

Thank you,
Best regards,
Valentin

5 Likes

Hi @valentiniacov ,

Thank you for the comprehensive explanation of the challenges you’re facing with concurrent updates to your BigQuery ingestion_status table. Ensuring efficient and reliable updates is crucial.

Here is how you can build on the you strategies suggested:

1. Distributed Queue with Google Cloud Pub/Sub:

Pub/Sub is a powerful tool for managing concurrency. Here’s how to enhance the subscriber logic:

  • Batching and Error Handling:

    • Instead of processing messages individually, batch them in manageable groups (e.g., 100 messages) before executing a MERGE operation. This significantly reduces BigQuery interactions and improves efficiency.
    • Implement robust error handling within the subscriber, meticulously capturing and logging exceptions. Utilize Pub/Sub’s dead-letter topic to isolate failed messages for later investigation and reprocessing.
  • Subscriber Code (with Batching and Error Handling):

public void processPubSubMessages(List<PubsubMessage> messages) {
    List<Map<String, Object>> rows = new ArrayList<>();
    for (PubsubMessage message : messages) {
        try {
            // Extract data from message (e.g., using JSON parsing)
            Map<String, Object> row = ...;
            rows.add(row);
        } catch (Exception e) {
            // Handle message parsing errors (log, send to dead-letter topic, etc.)
            logger.error("Error parsing message: {}", e.getMessage());
            // ... (consider sending message to dead-letter topic)
        }
    }

    if (!rows.isEmpty()) {
        try {
            String query = buildBatchMergeQuery(rows);
            bigQueryProcessor.executeQuery(query);
        } catch (BigQueryException e) {
            // Handle BigQuery errors (log, retry with backoff, etc.)
            logger.error("BigQuery error: {}", e.getMessage());
            // ... (consider retrying with exponential backoff)
        }
    }
}

2. Exponential Backoff and Retry Logic:

The refined retryBigQueryOperation function with jitter is a solid foundation. Consider these enhancements:

  • Error-Specific Backoff: Tailor the backoff strategy based on the error type. For instance, use shorter backoff times for rate limit errors and longer ones for connection issues. This prevents unnecessary delays while respecting BigQuery’s limits.
  • Circuit Breaker Pattern: Introduce a circuit breaker mechanism to temporarily halt retries if a predefined error threshold is met. This safeguards BigQuery from excessive requests during prolonged disruptions.

3. Batch Updates via Scheduled Jobs:

If real-time updates aren’t critical, leverage a staging table and scheduled batch updates. A Cloud Scheduler job can trigger a Cloud Function or Cloud Run service to perform the update, streamlining the process and potentially reducing costs.

4. Partitioned and Clustered Tables:

Beyond partitioning by ingestion_date and clustering by table_name and status, explore partitioning by other relevant fields like file_name or source_system. This further enhances query performance and data organization.

5. Monitoring and Alerting:

As emphasized, comprehensive monitoring is key. Use Cloud Monitoring to track vital metrics such as Pub/Sub message latency, BigQuery query performance, and error rates. Configure alerts to promptly notify you of anomalies or critical issues, enabling proactive intervention.

Additional Considerations:

  • Data Consistency: If strong consistency is a requirement, investigate BigQuery’s streaming inserts or transactions. However, be mindful of their limitations and trade-offs.
  • Cost Optimization: Thoroughly evaluate the cost implications of each strategy, factoring in storage, query usage, and Pub/Sub message volume. Utilize BigQuery cost controls and Pub/Sub quotas to manage costs effectively.
4 Likes

Thank you for the answer.

3 Likes