Business scenario
Healthcare Data Engine(HDE) is a popular GCP based solution built on Cloud Healthcare APIs (CHC API) to help healthcare stakeholders transition to FHIR and promote interoperability between multiple data origination sources. HDE provides users the ability to run Mapping pipelines which helps in converting healthcare data into FHIR and Reconciliation pipelines helps form a longitudinal patient record view.
Both the Mapping and Reconciliation pipelines use Mapping configs or reconciliation rules stored in GCS buckets. Every time these configs change, HDE users are required to re-create the pipelines for the new changes to take effect.
This article talks about how to use Cloud Run Functions to auto re-create the HDE pipelines whenever there is a change to configs stored in GCS buckets.
What do we need
We will set up a Cloud Run Function to be triggered whenever a file is uploaded to the GCS bucket where the configs are stored. Cloud Run Function will delete the existing HDE pipeline and create a new one using the latest changes by making Cloud Healthcare API calls. In this article we will use changes to Reconciliation rules as an example to recreate the Reconciliation Pipeline, but the same approach can be followed for Mapping changes (Whistle and Data Mapper) as well, i.e. Delete the existing Mapping pipeline and create a new one using the latest mapping configs. Cloud Run Function will be written in Python.
Assumptions
This article assumes that the user has CHC APIs enabled, HDE pipelines running and GCS buckets to store configs already created. Please refer to HDE docs for steps on enabling/creating any of these if not already created.
Steps
Step 1 - Enable Cloud Run Functions (if not already enabled) and click on ‘CREATE FUNCTION’
Step 2 - Use the below snapshot as reference and set up the ‘Cloud Run Function’ configuration to be triggered when a file is uploaded to GCS bucket. ‘Bucket’ field should point to the GCS bucket where the Configs are stored and will be used while creating the pipeline.
Step 3 - Use the below folder/file structure and code snippets to create a Python based Cloud Run Function
Cloud Run Function:
|_ requirements.txt
|_ config.py
|_ main.py
requirements.txt
functions-framework==3.*
requests
google-auth-oauthlib
google-auth
config.py
PROJECT_ID = ""
LOCATION = ""
DATASET_NAME = ""
PIPELINE_NAME = ""
URI = ""
IMPORT_URI_PREFIX = ""
MATCHING_URI_PREFIX = ""
FHIR_STORE_NAME = ""
TARGET_SUBFOLDER = ''
main.py
from config import PROJECT_ID, LOCATION, DATASET_NAME, PIPELINE_NAME, URI, IMPORT_URI_PREFIX, MATCHING_URI_PREFIX, FHIR_STORE_NAME, TARGET_SUBFOLDER
import functions_framework
import json
import requests
import google.auth
import google.auth.transport.requests
from google.auth.exceptions import DefaultCredentialsError
from requests.exceptions import RequestException
# Triggered by a change in a storage bucket
@functions_framework.cloud_event
def recreate_hde_pipelines(cloud_event):
data = cloud_event.data
file_path = data['name']
# Check if the file was uploaded to the exact sub folder where mapping/recon rules are stored
if file_path.startswith(TARGET_SUBFOLDER):
DELETE_API_ENDPOINT = f"https://healthcare.googleapis.com/v1alpha2/projects/{PROJECT_ID}/locations/{LOCATION}/datasets/{DATASET_NAME}/pipelineJobs/{PIPELINE_NAME}"
CREATE_API_ENDPOINT = f"https://healthcare.googleapis.com/v1alpha2/projects/{PROJECT_ID}/locations/{LOCATION}/datasets/{DATASET_NAME}/pipelineJobs?pipelineJobId={PIPELINE_NAME}"
recon_config = {
"name": PIPELINE_NAME,
"disableLineage": "true",
"reconciliationPipelineJob": {
"mergeConfig": {
"whistleConfigSource": {
"uri": URI,
"importUriPrefix": IMPORT_URI_PREFIX
}
},
"matchingUriPrefix": MATCHING_URI_PREFIX,
"fhirStoreDestination": "projects/"+PROJECT_ID+"/locations/"+LOCATION+"/datasets/"+DATASET_NAME+"/fhirStores/"+FHIR_STORE_NAME
}
}
# Authentication (using Google's default credentials)
try:
creds, _ = google.auth.default()
auth_req = google.auth.transport.requests.Request()
creds.refresh(auth_req)
bearer_token = creds.token
print("Getting token...")
except DefaultCredentialsError as e:
return (f"Error obtaining credentials: {e}", 500)
# Set headers for the API call
headers = {
"Authorization": f"Bearer {bearer_token}",
"Content-Type": "application/json" # Adjust if needed
}
# Delete the existing pipeline
try:
print("Making DELETE API call...")
response = requests.delete(DELETE_API_ENDPOINT, headers=headers)
print("DELETE API Response: ", response.content)
response.raise_for_status() # Raise exception for bad responses (4xx or 5xx)
except RequestException as e:
error_msg = f"DELETE API call failed: {e}"
print(error_msg)
if response is not None:
error_msg += f" (Status code: {response.status_code})"
return (error_msg, 500) # Internal Server Error
print(f"{PIPELINE_NAME} pipeline deleted")
# Create a new pipeline
try:
print("Making CREATE API call...")
response = requests.post(CREATE_API_ENDPOINT, headers=headers, data=json.dumps(recon_config))
print("CREATE API Response: ", response.content)
response.raise_for_status() # Raise exception for bad responses (4xx or 5xx)
except RequestException as e:
error_msg = f"CREATE API call failed: {e}"
print(error_msg)
if response is not None:
error_msg += f" (Status code: {response.status_code})"
return (error_msg, 500) # Internal Server Error
return (f"Pipeline re-created", 200) # OK
else:
return('No Pipeline Recreation')
Code Explanation:
- We are storing all the variables or configurable fields in the
[config.py](http://config.py)file - Required libraries are mentioned in
requirements.txtfile - In
[main.py](http://main.py)we check if the file is uploaded to the sub folder that we want. This is especially useful in case we are using the same GCS bucket for storing other contents as well. - We first make DELETE API call to delete the existing pipeline and then make POST API call to create a new pipeline pointing to the config mentioned in ‘recon_config’
- In case you want to set this up for the Mapping Pipeline, then add the corresponding mapping variables to ‘config.py’ and use a variable similar to ‘recon_config’ to declare mapping configs. This should be a separate Cloud Run Function.
Step 4 - Once the above code has been added, go ahead and deploy the Cloud Run Function. This should now detect any file changes (Update/Insert) in the GCS bucket folders and trigger the Pipeline re creation process
Conclusion
HDE pipelines uses Mapping and Reconciliation process to convert healthcare data into FHIR and reconcile it to get Longitudinal Patient Record. Mapping or Reconciliation requirements can change over time and may require end users to re-create the corresponding pipelines. We can use the above mentioned simple steps to automate the pipeline re-creation process OR plugin this into any larger CICD workflows.

