I have source table which contains the change stream, I’m trying to replication the table in BigQuery using Dataform. For that I want to read the records from change stream table and insert or update records in the destination table.
When I tried creating two output files one containing insert and other update publish function I get the error
Duplicate action name detected. Names within a schema must be unique across tables, declarations, assertions, and operations
Insert.js →
publish("car_table", {type: "incremental"})
.query(ctx => {
`SELECT src.id, src.brand, src.count FROM ${ctx.ref("car_table_changelog")} AS src WHERE src._metadata_spanner_mod_type="INSERT"`
});
Update.js →
publish("car_table", {type: "incremental"})
.query(ctx => `SELECT src.id, src.brand, src.count FROM ${ctx.ref("car_table_changelog")} AS src WHERE src._metadata_spanner_mod_type="UPDATE"`);
How can I achieve it? Can I combine both the SQL statements into one publish function?
Yes, you can combine both the SQL statements into one publish function. One way to achieve this is by using the CASE statement to categorize the type of operation (INSERT or UPDATE). Here’s an example:
publish(“car_table”, {type: “incremental”}).query(ctx => SELECT src.id, src.brand, src.count, CASE src._metadata_spanner_mod_type WHEN 'INSERT' THEN 'insert' WHEN 'UPDATE' THEN 'update' ELSE 'unknown' END AS action FROM ${ctx.ref("car_table_changelog")} AS src);
This will produce a table with an additional action column that indicates whether the record is an insert or update. However, note that this action column is more for informational purposes. Dataform will handle the insertion and updating of records automatically when you use {type: "incremental"} based on the primary key of the table.
publish("car_table", {type: "incremental"})
.query(ctx => `SELECT src.id, src.brand, src.count FROM ${ctx.ref("car_table_changelog")} AS src ORDER BY src._metadata_spanner_commit_timestamp`);
I understand. Dataform’s publish function is designed to create or append data to a destination table, not to directly handle upserts (insert or update). To achieve upserts, you’ll need a different approach in the context of BigQuery.
One approach is to use BigQuery’s MERGE statement:
MERGE INTO car_table AS target
USING car_table_changelog AS source
ON target.id = source.id
WHEN MATCHED AND source._metadata_spanner_mod_type = ‘UPDATE’ THEN
UPDATE SET
target.brand = source.brand,
target.count = source.count
WHEN NOT MATCHED AND source._metadata_spanner_mod_type = ‘INSERT’ THEN
INSERT (id, brand, count)
VALUES (source.id, source.brand, source.count);
This MERGE statement will handle both inserts and updates based on the _metadata_spanner_mod_type column.
In Dataform, you can create a script (not a publish script) that runs this MERGE statement. This script can be scheduled to run periodically to apply changes from the car_table_changelog to the car_table.
Remember, the exact solution might need adjustments based on your specific dataset and requirements.
Dataform’s GitHub repository: This contains sample projects, integrations, and more. It’s a useful resource to see real-world setups and understand potential advanced configurations.
Dataform Web Tracking Example: This is a sample project that transforms raw Google Analytics and database logs into a set of tables suitable for analysis.
A sample script to run the MERGE statement in BigQuery can be written in either SQL or a programming language such as JavaScript.
Here is a sample SQL script:
MERGE INTO car_table AS target
USING car_table_changelog AS source
ON target.id = source.id
WHEN MATCHED AND source._metadata_spanner_mod_type = ‘UPDATE’ THEN
UPDATE SET
target.brand = source.brand,
target.count = source.count
WHEN NOT MATCHED AND source._metadata_spanner_mod_type = ‘INSERT’ THEN
INSERT (id, brand, count)
VALUES (source.id, source.brand, source.count);
This script can be run in the BigQuery console or using the BigQuery API.
Here is a sample JavaScript script to run the MERGE statement in BigQuery:
const query = MERGE INTO car_table AS target USING car_table_changelog AS source ON target.id = source.id WHEN MATCHED AND source._metadata_spanner_mod_type = 'UPDATE' THEN UPDATE SET target.brand = source.brand, target.count = source.count WHEN NOT MATCHED AND source._metadata_spanner_mod_type = 'INSERT' THEN INSERT (id, brand, count) VALUES (source.id, source.brand, source.count);;
f as part of a MERGE a new row is inserted in the target table, the newly inserted row is not eligible for a match with rows from the source table. Matching is based on the state the tables are in when the query is started.
The Merge update DML won’t work as my source table contains both inserts and updates for the same primary key.
I tried different approach by separating the two activities into two files.
File: insert_tables.sqlx
config {
type: "operations",
schema: "dataform",
hasOutput: true
}
INSERT INTO dataform.car_table (id, brand, count)
SELECT src.id, src.brand, src.count
FROM ${ref("car_table_changelog")} as src
WHERE src._metadata_spanner_mod_type like 'INSERT'
ORDER BY src._metadata_spanner_commit_timestamp
File: update_tables.sqlx
config {
type: "operations",
schema: "dataform",
hasOutput: true,
dependencies: ["insert_tables"]
}
UPDATE dataform.car_table target
SET
target.brand = source.brand,
target.count = source.count
FROM ${ref("car_table_changelog")} as source
WHERE target.id = source.id AND source._metadata_spanner_mod_type like 'UPDATE'
Even this with error UPDATE/MERGE must match at most one source row for each target row