I need to keep two BQ tables: a raw table that gets populated daily with content from a .csv file and a stage table that is merged daily with the raw table. In both these 2 tables I have 2 timestamp columns: insert_date_time and update_date_time. Please consider the scenario:
First .csv file comes, it is loaded into the raw table, both insert_date_time and update_date_time columns get CURRENT_TIMESTAMP() value. When merging between the raw and the stage table the content of the raw table is inserted as is to the stage table because the stage table is empty.
The next day, the .csv file comes with the deltaload(an existing record updated with new values, the new one, plus a set of new ones that did not exist before).All content of the .csv file is INSERTED into the raw table as is.
I want to merge the raw table with the stage table “incremental” and not with an “operation” file that does the “merge” with DML language.
How I want to merge:
Today’s records from the raw table that do not exist in the stage table are inserted in the stage table.
Today’s records from the raw table that can be identified by primary key in the stage table need to update the corresponding ones in the stage table except for the “insert_date_time” which has to be the original one, when the record was inserted in the stage table.
Can this flow be implemented with the “incremental” config type?
You can effectively implement this flow in BigQuery using the MERGE statement. Here’s how you’d accomplish this, along with potential optimizations.
Core Concepts
Raw Table: Your daily CSV uploads populate this table, and both insert_date_time and update_date_time initially receive CURRENT_TIMESTAMP().
Stage Table: Functions as the consolidated, updated table incorporating deltas from the raw table.
MERGE Statement: The powerful tool to elegantly insert new records from the raw table and update existing ones in the stage table.
Steps
Data Load: You’ll continue your existing process of loading daily CSV files into the raw table.
MERGE Implementation: Here’s a tailored MERGE statement to execute daily:
MERGE stage_table AS target
USING raw_table AS source
ON target.primary_key = source.primary_key -- Match on your primary key
WHEN MATCHED THEN
UPDATE SET
target.column1 = source.column1, -- Update regular columns
target.column2 = source.column2,
-- ... other columns to update
target.update_date_time = CURRENT_TIMESTAMP() -- Update timestamp
WHEN NOT MATCHED THEN
INSERT (primary_key, column1, column2, ..., insert_date_time, update_date_time)
VALUES (source.primary_key, source.column1, source.column2, ..., source.insert_date_time, source.update_date_time)
Explanation
The MERGE statement compares the stage (target) and raw (source) tables based on your primary key.
WHEN MATCHED: Existing records in the stage table get updated with values from the raw table, and the update_date_time is refreshed. The original insert_date_time remains unchanged.
WHEN NOT MATCHED: New records from the raw table are inserted into the stage table, preserving their original insert_date_time.
Optimization (Optional)
If your tables become very large, consider adding partitioning on a suitable column (e.g., a date column) to both the raw and stage tables. This will limit the MERGE operation to scan only relevant partitions, significantly improving performance.
Automation
Encapsulate this SQL and the load process into a scheduled Cloud Function or orchestrate this using a tool like Cloud Composer (Airflow) for a managed workflow.