Composer - Trigger a Dataform Repository with gcp cloud composer

Hi All,

I am performing a Poc to trigger dataform via gcp composer, I am using dataform to perform ETL for a multiple clients depending on the client name i pass as an argument. My requirement is to be able to create a Dag which will trigger the dataform for a client by sending the client_name.

For example
Below is how my workflow_settings.yaml looks like in a dataform repo, It performs the ETL operations for a client which is sent via code variable, In the below example it would for client A.

defaultProject: project_id
defaultLocation: europe-west1
defaultDataset: dataform_tm_poc
defaultAssertionDataset: dataform_assertions
dataformCoreVersion: 3.0.0-beta.4
vars:
code: client A

Can anybody please help me in setting up the part of the composer where i can trigger the dataform for a particular client by sending the client code as an argument.

6 Likes


Hi @TA3



Welcome to Google Cloud Community!



To trigger a Dataform workflow from Google Cloud Composer, you can follow these steps:

1. Set Up Your Cloud Composer Environment: Configure your Cloud Composer environment and define necessary parameters.
1. Define Cloud Storage: Each Cloud Composer environment comes with an associated Cloud Storage bucket.
1. Create Your Workflow Structure: Utilize Python, as Cloud Composer is based on Apache Airflow, to define your workflow.


Here’s a basic Python example of what your DAG might resemble:



python<br>from airflow import DAG<br>from airflow.providers.google.cloud.operators.dataform import DataformRunOperator<br>from airflow.operators.dummy import DummyOperator<br>from airflow.utils.dates import days_ago<br><br>#Declare DAG<br>with DAG(<br>#adjust to your needs<br> dag_id='dataform_client_dag',<br> default_args={'owner': 'airflow'},<br> start_date=days_ago(1),<br> schedule_interval=None, #manual<br> catchup=False,<br> tags=['dataform', 'client'],<br>) as dag:<br><br> # Create a task to run Dataform for a specific client<br> def run_dataform_for_client(client_name):<br> return DataformRunOperator(<br> task_id=f'run_dataform_{client_name}',<br> project_id='your_project_id', #replace with your project<br> location='europe-west1', #replace with your location<br> workspace='your_dataform_workspace',<br> dataform_config_path='path/to/your/dataform/repo', #set your path<br> dataform_run_options={<br> 'workspace_options': {<br> 'vars': {<br> 'code': client_name<br> }<br> }<br> }<br> )<br><br> # Example usage:<br> client_a_task = run_dataform_for_client('client A')<br> client_b_task = run_dataform_for_client('client B')<br><br> # Define a start and end task<br> start_task = DummyOperator(task_id='start')<br> end_task = DummyOperator(task_id='end')<br><br> # Define task dependencies<br> start_task >> client_a_task >> end_task<br> start_task >> client_b_task >> end_task<br>​<br>




4. Upload the DAG file to your Airflow environment. Ensure it’s placed in the correct directory.



Note:

- Trigger your DAG: You can manually trigger your DAG or schedule it to run on a regular basis.
- Ensure that you have the necessary permissions in your GCP project to run Dataform and interact with the required resources.


I hope the above information is helpful.

3 Likes