Hello Community,
I am building a validation process for checking whether a .csv file was successfully ingested into a BQ table.
I have a BQ table: ingestion_status(table_name, status, ingestion_date) that maintains the ingestion status of the incoming .csv files daily.
This is the flow:
-
A .csv file is added to a GCS bucket.
-
A Java cloud function is triggered, inserts the .csv file into a corresponding BQ table.
-
A validation is performed to validate the ingestion process. The result of this validation is put to the same BQ table: ingestion_status.
This is the Java Code:
public TableResult updateCsvFileIngestionStatus(String tableName, Boolean status) {
String query = buildUpdateStatusQuery(tableName, status);
try {
int randomNumber = getRandomNumber();
logger.info("File: {} has waited: {} seconds to update status", tableName, randomNumber);
return bigQueryProcessor.executeQuery(query);
} catch (InterruptedException e) {
logger.error(BQ002.name(), e, INTERRUPTED_EXCEPTION, query);
} catch (BigQueryException e) {
logger.error(BQ001.name(), e, BIG_QUERY_EXCEPTION, query);
}
return null;
}
Please note that every day I get over 30 .csv files trat trigger 30 cloud functions that simultaneously try to update the same ingestion_status BQ table.
In this scenario I got the following BQ exception:
Query error: Could not serialize access to table due to concurrent update
Please note that 30 is greater than the 2 concurrent updates and the queue size limit of 20 for concurrent DML UPDATES.
I changed the code to this:
public TableResult updateCsvFileIngestionStatus(String tableName, Boolean status) {
String query = buildUpdateStatusQuery(tableName, status);
try {
int randomNumber = getRandomNumber();
Thread.sleep(MILLIS_MULTIPLICATION_FACTOR * randomNumber);
logger.info("File: {} has waited: {} seconds to update status", tableName, randomNumber);
return bigQueryProcessor.executeQuery(query);
} catch (InterruptedException e) {
logger.error(BQ002.name(), e, INTERRUPTED_EXCEPTION, query);
} catch (BigQueryException e) {
logger.error(BQ001.name(), e, BIG_QUERY_EXCEPTION, query);
}
return null;
}
The wait time is a random number between 0 and 600 seconds.
After this change I fixed the previous BQ error BUT I get these types of Big Query errors:
com.google.cloud.bigquery.BigQueryException: Unexpected end of file from server
com.google.cloud.bigquery.BigQueryException: Error writing request body to server
com.google.cloud.bigquery.BigQueryException: Broken pipe
I also tried to replace UPDATES with INSERTS but to no success.
From documentation I got this:
| Queued mutating DML statements per table | 20 statements | A table can have up to 20 mutating DML statements in the queue waiting to run. If you submit additional mutating DML statements for the table, then those statements fail. |
|---|
and this:
| Maximum rate of DML statements for each table | 25 statements every 10 seconds | Your project can run up to 25 DML statements every 10 seconds for each table. Both INSERT and mutating DML statements contribute to this limit. |
|---|
I really need this validation to work since I am planning to further use the BQ ingestion_status table in other Dataform assertion scripts that will prevent sequential Dataform workflows (MERGE) to get executed in case of a failed .csv file ingestion.
Do you have any idea how could I concurrent insert/update records of a single BQ table from concurrently running Cloud Functions?
Thank you,
Best regards,
Valentin