To trigger a Dataform workflow from Google Cloud Composer, you need to use the Composer’s Apache Airflow environment to programmatically execute a Dataform job. This typically involves using the Airflow’s HTTP operator to make a call to the Dataform API, which in turn triggers the workflow.
Here’s a step-by-step guide on how to set this up:
1. Set Up Google Cloud Composer
Ensure that your Google Cloud Composer environment is up and running. You should have Apache Airflow installed as part of this environment.
2. Obtain Dataform API Credentials
To trigger a Dataform workflow, you need to authenticate with the Dataform API. This usually involves obtaining an API key or setting up OAuth credentials. Refer to Dataform’s documentation to get these credentials.
3. Create an Airflow DAG
You will create a Directed Acyclic Graph (DAG) in Airflow to define the workflow. This DAG will include a task to trigger the Dataform job.
Here’s a basic example of what the DAG might look like in Python:
from airflow import DAG
from airflow.operators.http_operator import SimpleHttpOperator
from datetime import datetime, timedelta
# Default arguments
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Define the DAG
dag = DAG('trigger_dataform_workflow',
default_args=default_args,
description='Trigger a Dataform workflow',
schedule_interval=timedelta(days=1))
# Task to trigger Dataform workflow
trigger_dataform = SimpleHttpOperator(
task_id='trigger_dataform',
http_conn_id='dataform_api_connection', # Replace with your connection ID
endpoint='your/dataform/api/endpoint', # Replace with your Dataform API endpoint
method='POST',
headers={"Content-Type": "application/json", "Authorization": "Bearer YOUR_API_KEY"},
data=json.dumps({"tag": "your_workflow_tag"}), # Replace with your workflow tag
dag=dag,
)
4. Set Up Airflow Connections
In the Airflow UI, set up a new HTTP connection (dataform_api_connection in the example) with the details of your Dataform API endpoint. This includes the API URL and any authentication headers required.
5. Deploy and Test the DAG
Deploy this DAG to your Airflow environment and test it to ensure it triggers the Dataform workflow as expected.