Hello everyone,
I am currently facing a situation where I am trying to implement a custom MERGE (INCREMENTAL TABLE) with DataForm. Here, I have the primary keys along with two metadata columns: LOAD_BATCH and DRIVER_ACTION_CODE.
My objective is to implement a SQLX Incremental that, given a date, creates an intermediate view that selects occurrences of the PKs for the most recent LOAD_BATCH. Following this, it should apply the MERGE, firstly where DRIVER_ACTION_CODE equals “D” to delete, and then proceed to merge where DRIVER_ACTION_CODE is either “I” or “U”.
So far, I have managed to accomplish this using JS with type=“operation”, but I am looking to do it in a more scalable manner and take advantage of the features that DataForm offers.
Does anyone have any recommendations or tips on how to best achieve this?
Here is my current code:
includes/merge_helper.js
function generateCreateTableQuery(context) {
return `
CREATE TABLE IF NOT EXISTS ${context.tabla_silver} (
${context.columns.filter(column => column).map(column => `${column} ${context.columnDataTypes[column]}`).join(", ")}
)
-- PARTITION BY ${context.partition_column}
CLUSTER BY ${context.clustering_columns.join(", ")}`
}
function generateDeleteQuery(context) {
return `
MERGE ${context.tabla_silver} as t
USING (
SELECT ${context.columns.join(", ")}
FROM ${context.tabla_bronce} as s
WHERE 1=1
and load_batch >= '${context.startDate}'
and ${context.TRANSACTION_COL} IN ('D')
) s
ON ${context.pk_columns.map(pk_column => `t.${pk_column} = s.${pk_column}`).join(" AND ")}
WHEN MATCHED AND s.${context.TRANSACTION_COL} = 'D' THEN
DELETE
`};
function generateUpsertQuery(context) {
return `
MERGE ${(context.tabla_silver)} as t
USING (
SELECT ${context.columns.join(", ")}
FROM ${context.tabla_bronce} as s
WHERE 1=1
and load_batch >= '${context.startDate}'
and ${context.TRANSACTION_COL} IN ('I', 'U')
) s
ON ${context.pk_columns.map(pk_column => `t.${pk_column} = s.${pk_column}`).join(" AND ")}
WHEN MATCHED THEN
UPDATE SET ${context.columns.map(column => `t.${column} = s.${column}`).join(", ")}
WHEN NOT MATCHED THEN
INSERT (${context.columns.join(", ")})
VALUES (${context.columns.map(column => `s.${column}`).join(", ")})
`;
}
function generateBaseQuery(context) {
return `
WITH base AS (
SELECT ${context.pk_columns.join(", ")}, MAX(LOAD_BATCH) AS date_max
FROM (
SELECT ROW_NUMBER () OVER (PARTITION BY ${context.pk_columns.join(", ")} ORDER BY LOAD_BATCH DESC) Rn,
${context.pk_columns.join(", ")}, LOAD_BATCH
FROM ${context.tabla_bronce}
where load_batch >= '${context.startDate}'
)
WHERE RN = 1
GROUP BY ${context.pk_columns.join(", ")}
), query_fin AS (
SELECT op.*
FROM base rn
LEFT JOIN (select * from ${context.tabla_bronce} where load_batch >= '${context.startDate}') op
ON ${context.pk_columns.map(pk_column => `op.${pk_column} = rn.${pk_column}`).join(" AND ")} AND op.LOAD_BATCH = rn.date_max
WHERE ${context.pk_columns.map(pk_column => `op.${pk_column}`).join(" AND ")} IS NOT NULL
)
select * from query_fin
`;
}
Next, I create a view using the base query in a SQLX type view. After that, in a SQLX type=‘operations’, I use the other three queries to create the table if needed, then merge the “D”, and finally merge the “I” and “U”
The idea is do exactly the same steps but with a incremental sqlx or similar…
Looking forward to your responses.
Thank you in advance!