In implementing dynamic column selection in Dataform through Cloud Composer, the approach begins by storing column configurations in a BigQuery table, which both Cloud Composer and Dataform can access. This table, indexed by the day of the week, defines which columns from Tables A and B should be selected on each day.
- Parameter Storage in BigQuery The column configurations are stored in a BigQuery table as follows:
CREATE TABLE IF NOT EXISTS your_project.your_dataset.column_config (
day_of_week STRING,
table_a_columns ARRAY<STRING>,
table_b_columns ARRAY<STRING>
);
INSERT INTO your_project.your_dataset.column_config
VALUES
('Monday', ['col1', 'col2'], ['col7', 'col8']),
('Tuesday', ['col3', 'col4'], ['col9', 'col10']),
('Wednesday', ['col5', 'col6'], ['col11', 'col12']);
-- ... rest of the days
- Airflow DAG (Python) The Airflow DAG runs daily, determines the current day of the week, and queries the BigQuery table to retrieve the relevant columns. The following Python code demonstrates how this is done:
<strong>from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.bigquery import</strong> **BigQueryHook**
<strong>from airflow.providers.google.cloud.operators.dataform import DataformCreateCompilationResultOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1
}
dag = DAG('dataform_dynamic_columns', default_args=default_args, schedule_interval='@daily')
def get_column_config(**kwargs):
today = datetime.today().strftime('%A') # Get day of the week (e.g., 'Monday')
hook = BigQueryHook()
query = f"""
SELECT table_a_columns, table_b_columns
FROM your_project.your_dataset.column_config
WHERE day_of_week = '{today}'
"""
result = hook.get_pandas_df(query)
kwargs['ti'].xcom_push(key='table_a_columns', value=result['table_a_columns'].tolist()[0])
kwargs['ti'].xcom_push(key='table_b_columns', value=result['table_b_columns'].tolist()[0])
get_config_task = PythonOperator(
task_id='get_column_config',
python_callable=get_column_config,
provide_context=True,
dag=dag
)
create_compilation_task = DataformCreateCompilationResultOperator(
task_id='create_dataform_compilation',
dataform_repository_id='your_dataform_repo_id',
compilation_result={
"git_commitish": "main", # Or your branch name
"workspace": {
"vars": {
"table_a_columns": "{{ ti.xcom_pull(task_ids='get_column_config', key='table_a_columns') }}",
"table_b_columns": "{{ ti.xcom_pull(task_ids='get_column_config', key='table_b_columns') }}"
}
}
},
dag=dag
)
get_config_task >> create_compilation_task</strong>
- Dataform SQLX Query The Dataform SQLX query dynamically constructs the SELECT statement using the columns passed from Airflow:
config {
type: "table",
schema: "your_dataset",
name: "table_c"
}
select
{{ vars.table_a_columns | join(', ') }} -- Dynamically inject columns
from
your_project.your_dataset.table_a
union all
select
{{ vars.table_b_columns | join(', ') }} -- Dynamically inject columns
from
your_project.your_dataset.table_b