The core issue you’re facing is that while you’ve defined a primary key (id) in your BigQuery table, the Storage Write API doesn’t automatically enforce primary key constraints. Its primary function is efficient data loading, which doesn’t include checking for existing primary key conflicts. This results in duplicate id values during standard inserts.
The DDL for your BigQuery table includes NOT ENFORCED, meaning BigQuery won’t block duplicate id values. The Storage Write API focuses on raw data writes and doesn’t inherently check for primary key conflicts. To achieve the desired UPSERT behavior, you need to modify your Dataflow pipeline.
Here’s a refined pipeline design to ensure UPSERT behavior while respecting your primary key:
Buffer Updates: Use a windowing strategy to buffer incoming updates.
Group By Key: Group records by the id field.
Reduce and Deduplicate: For each key, keep only the latest update.
Write to BigQuery: Write the deduplicated records to BigQuery.
Data Ordering: For optimal efficiency, ensure your CDC events are delivered in a roughly sorted order based on the primary key.
Data Freshness: BigQuery’s automatic duplicate removal might take a short while. For real-time deduplication, consider using a temporary table and MERGE statements.
Primary Key Enforcement: To enforce the primary key, remove NOT ENFORCED from the DDL. Note that this will throw an error if you try to insert a duplicate key, so handle that in the pipeline.
Error Handling: Implement robust error handling to catch cases where duplicate writes fail due to primary key violations.
Troubleshooting and Refinement
Logging: Add detailed logging at various stages of your pipeline to pinpoint where duplicates arise.
Method Change: The STORAGE_WRITE_API is more appropriate for streaming but requires handling duplicates explicitly as shown.
Thank you @ms4446 ! I’ll be honest - its not a particularly great look to begin with when you respond on a support form with an AI generated response using my post as the prompt. It’s even worse when you’re a Google employee responding to a Google product query on Googles own community platform. And its even worse still when the response both does not address my issue and manages to be full of errors! Doesn’t reflect all that well on either yourself or Google as a Cloud provider, wouldn’t you agree?
That being said, on to your response itself:
Actually, it can. Using BigQuery CDC, update operations are handled as per the tables Primary Key as long as you set the pseudocolumn _CHANGE_TYPE, as you can see I’ve done in my code. (“To use CDC, set _CHANGE_TYPE when you stream row modifications using the Storage Write API”(
How would this help exactly? It’s a streaming pipeline that delivers row updates as soon as they are received by the subscriber. Even if I did want to wait X amount of time so as to batch my records and deduplicate, what happens on X+1 when that record is updated in my source data, after I’ve committed my buffered batch? I’ll of course get another duplicate.
Absolutely not. If using a temporary table and a MERGE statement was practical or desirable, I would not be using a streaming pipeline in the first place!
In any case, it seems that the Python SDK does not support such operations, whereas the Java implementation does, as per this blog post. So this is a major oversight in your documentation and your client libraries, as it’s not clear at all how you’re supposed to achieve this in a Python based pipeline. Adding the _CHANGE_TYPE pseudocolumn to the schema argument of the WriteToBigQuery function returns an error suggesting that columns starting with underscores are invalid in BigQuery.