I have the following SLQX and relative table already created in my dataset:
config {
type: "incremental",
schema: "analytics_sanitized_test",
description: "...",
columns: {
...
},
bigquery: {
partitionBy: "DATE_TRUNC(created_at, MONTH)",
requirePartitionFilter: true,
clusterBy: ["created_at", "user_id"]
},
tags: ["daily"],
protected: true
}
SELECT
src.userId AS user_id,
TRIM(LOWER(src.market)) AS market,
TRIM(LOWER(src.banReason)) AS ban_reason,
src.createdAt AS created_at
FROM
${ref("analytics", "user_banned")} AS src
${
when(incremental(),
`LEFT JOIN ${self()} AS trg ON
(trg.user_id = src.userId
AND DATE(trg.created_at) >= DATE_SUB(CURRENT_DATE(), INTERVAL 3 DAY))`)
}
WHERE
${
when(incremental(),
` trg.user_id IS NULL
AND DATE(src.createdAt) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)`,
` DATE(src.createdAt) != CURRENT_DATE() `)
}
QUALIFY ROW_NUMBER() OVER (PARTITION BY src.userId ORDER BY src.createdAt) = 1
It doesn’t matter if I run it manually from the UI or schedule it from the Workflow Configs, the logs always comment out the incremental version of the query.
See below:
BEGIN
CREATE SCHEMA IF NOT EXISTS `xxx.analytics_sanitized_test` OPTIONS(location="us-central1");
EXCEPTION WHEN ERROR THEN
IF NOT CONTAINS_SUBSTR(@@error.message, "already exists: dataset") AND
NOT CONTAINS_SUBSTR(@@error.message, "too many dataset metadata update operations") AND
NOT CONTAINS_SUBSTR(@@error.message, "User does not have bigquery.datasets.create permission")
THEN
RAISE USING MESSAGE = @@error.message;
END IF;
END;
BEGIN
DECLARE dataform_table_type DEFAULT (
SELECT ANY_VALUE(table_type)
FROM `xxx.analytics_sanitized_test.INFORMATION_SCHEMA.TABLES`
WHERE table_name = 'user_banned'
);
IF dataform_table_type IS NOT NULL THEN
IF dataform_table_type = 'VIEW' THEN DROP VIEW IF EXISTS `xxx.analytics_sanitized_test.user_banned`;
ELSEIF dataform_table_type = 'MATERIALIZED VIEW' THEN DROP MATERIALIZED VIEW IF EXISTS `xxx.analytics_sanitized_test.user_banned`;
END IF;
END IF;
IF dataform_table_type IS NOT NULL THEN
BEGIN
DECLARE dataform_columns ARRAY<STRING>;
DECLARE dataform_columns_list STRING;
SET dataform_columns = (
SELECT
ARRAY_AGG(DISTINCT "`" || column_name || "`")
FROM `xxx.analytics_sanitized_test.INFORMATION_SCHEMA.COLUMNS`
WHERE table_name = 'user_banned'
);
SET dataform_columns_list = (
SELECT
STRING_AGG(column)
FROM UNNEST(dataform_columns) AS column);
EXECUTE IMMEDIATE
"""
CREATE OR REPLACE PROCEDURE `xxx.analytics_sanitized_test.df_74f33f61c341912e584a78ac9ef2255f9b3477ea4e719e4aca2cd6be2a44a00c`() OPTIONS(strict_mode=false)
BEGIN
INSERT INTO `xxx.analytics_sanitized_test.user_banned`
(""" || dataform_columns_list || """)
SELECT """ || dataform_columns_list || """
FROM (
SELECT
src.userId AS user_id,
TRIM(LOWER(src.market)) AS market,
TRIM(LOWER(src.banReason)) AS ban_reason,
src.createdAt AS created_at
FROM
`xxx.analytics.user_banned` AS src
LEFT JOIN `xxx.analytics_sanitized_test.user_banned` AS trg ON
(trg.user_id = src.userId
AND DATE(trg.created_at) >= DATE_SUB(CURRENT_DATE(), INTERVAL 3 DAY))
WHERE
trg.user_id IS NULL
AND DATE(src.createdAt) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
QUALIFY ROW_NUMBER() OVER (PARTITION BY src.userId ORDER BY src.createdAt) = 1
);
END;
""";
CALL `xxx.analytics_sanitized_test.df_74f33f61c341912e584a78ac9ef2255f9b3477ea4e719e4aca2cd6be2a44a00c`();
DROP PROCEDURE IF EXISTS `xxx.analytics_sanitized_test.df_74f33f61c341912e584a78ac9ef2255f9b3477ea4e719e4aca2cd6be2a44a00c`;
END;
ELSE
BEGIN
CREATE TABLE IF NOT EXISTS `xxx.analytics_sanitized_test.user_banned`
PARTITION BY DATE_TRUNC(created_at, MONTH)
CLUSTER BY created_at, user_id
OPTIONS(description='''Track user ban events''', require_partition_filter=true)
AS (
SELECT
src.userId AS user_id,
TRIM(LOWER(src.market)) AS market,
TRIM(LOWER(src.banReason)) AS ban_reason,
src.createdAt AS created_at
FROM
`xxx.analytics.user_banned` AS src
WHERE
DATE(src.createdAt) != CURRENT_DATE()
QUALIFY ROW_NUMBER() OVER (PARTITION BY src.userId ORDER BY src.createdAt) = 1
);
END;
END IF;
END;
The desired outcome is that, once the incremental table is present in the target BQ dataset, it renders/compile the incremental version and not the non-incremental one. Or at least uses the INSERT INTO and not the CREATE OR REPLACE TABLE statement
Further information:
-
I’m on Dataform 2.9.0 and this happen with all the tables in my project.
-
Read the documentation available on incremental tables (here)
-
Browsed the Google Cloud Community forum (here)
-
Created a copy of the source table without stream attached
-
Changed the definition inside the when(incremental()..) to keep it at the bottom.
-
Run with and without Full Refresh from UI and Workflow Configs
-
Removed protected: true
-
revert version 2.9.0 to 2.8.4
-
keep generating the same incremental table over and over again from Workspace
Thanks.