Passing Parameters to Dataform

Hi @ms4446

I have the following use case and shall b grateful if you can help.

a) I have an Airflow DAG which I am running through Composer
b) I have 2 static tables in BigQuery (lets call it Table A and Table B)
c) I am running the Airflow DAG daily
d) On each day, I will have a query in Dataform which will do extract some columns from Table A and Table B and get a new Table called Table C
e) Table C is ultimately sent to GCS
f) On each day (Monday/Tuesday/Wednesday/Thursday/Friday/Saturday/Sunday), I have to extract different cols from Table And Table B
g) So,

Monday columns are: col1, col2
Tuesday columns are: col3, col4
Wednesday columns are: col5, col6

And so on..

My question is:

  1. How to pass these columns to Dataform? I can write a Python Function , use Python Operator, I know the logic.
  2. But how to call dataform with the column names passed as parameters?
  3. How to write Dataform query using the parameters?

Can you please help/provide some sample (syntaxes) to do this?

In implementing dynamic column selection in Dataform through Cloud Composer, the approach begins by storing column configurations in a BigQuery table, which both Cloud Composer and Dataform can access. This table, indexed by the day of the week, defines which columns from Tables A and B should be selected on each day.

  1. Parameter Storage in BigQuery The column configurations are stored in a BigQuery table as follows:
CREATE TABLE IF NOT EXISTS your_project.your_dataset.column_config (
  day_of_week STRING,
  table_a_columns ARRAY<STRING>,
  table_b_columns ARRAY<STRING>
);

INSERT INTO your_project.your_dataset.column_config
VALUES
  ('Monday', ['col1', 'col2'], ['col7', 'col8']),
  ('Tuesday', ['col3', 'col4'], ['col9', 'col10']),
  ('Wednesday', ['col5', 'col6'], ['col11', 'col12']);
  -- ... rest of the days
  1. Airflow DAG (Python) The Airflow DAG runs daily, determines the current day of the week, and queries the BigQuery table to retrieve the relevant columns. The following Python code demonstrates how this is done:
<strong>from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.bigquery import</strong> **BigQueryHook** 

<strong>from airflow.providers.google.cloud.operators.dataform import DataformCreateCompilationResultOperator
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'retries': 1
}

dag = DAG('dataform_dynamic_columns', default_args=default_args, schedule_interval='@daily')

def get_column_config(**kwargs):
    today = datetime.today().strftime('%A')  # Get day of the week (e.g., 'Monday')
    hook = BigQueryHook()
    query = f"""
        SELECT table_a_columns, table_b_columns
        FROM your_project.your_dataset.column_config
        WHERE day_of_week = '{today}'
    """
    result = hook.get_pandas_df(query)
    kwargs['ti'].xcom_push(key='table_a_columns', value=result['table_a_columns'].tolist()[0])
    kwargs['ti'].xcom_push(key='table_b_columns', value=result['table_b_columns'].tolist()[0])

get_config_task = PythonOperator(
    task_id='get_column_config',
    python_callable=get_column_config,
    provide_context=True,
    dag=dag
)

create_compilation_task = DataformCreateCompilationResultOperator(
    task_id='create_dataform_compilation',
    dataform_repository_id='your_dataform_repo_id',
    compilation_result={
        "git_commitish": "main",  # Or your branch name
        "workspace": {
            "vars": {
                "table_a_columns": "{{ ti.xcom_pull(task_ids='get_column_config', key='table_a_columns') }}",
                "table_b_columns": "{{ ti.xcom_pull(task_ids='get_column_config', key='table_b_columns') }}"
            }
        }
    },
    dag=dag
)

get_config_task >> create_compilation_task</strong>

  1. Dataform SQLX Query The Dataform SQLX query dynamically constructs the SELECT statement using the columns passed from Airflow:
config {
  type: "table",
  schema: "your_dataset",
  name: "table_c"
}

select
  {{ vars.table_a_columns | join(', ') }}  -- Dynamically inject columns
from
  your_project.your_dataset.table_a

union all

select
  {{ vars.table_b_columns | join(', ') }}  -- Dynamically inject columns
from
  your_project.your_dataset.table_b

@ayushmaheshwari also, feel free to utilize the following links for your future reference: