I am using the google provided template ‘text on gcs bucket to bigquery’ to move some ndjson files from cloud storage into Big query tables. We have a requirement like once the files are loaded successfully into bigquery I need to trigger a cloud function that moves these files from current bucket to an archive bucket (This movement needs to happen only when the dataflow run is successful). I tried creating eventarc trigger on the cloud function based on dataflow job status change (The only direct option currently available). The problem is, both the dataflow job and the cloud function are getting triggered simultaneously while using this (as dataflow in ‘running’ state is considered as status change), and dataflow is failing as the bucket movement completes before the dataflow job and it is not able to find the source files.
I am using the below command from a cloud function to define and start the dataflow job.
service.projects().locations().templates().launch()
After some research, decided to write the dataflow job status to a pubsub topic and then set the trigger for bucket movement CF based on the topic. However, the configuration variable “–notificationsTopic” to write the status to a topic does not seem to be supported by service.projects().templates().launch(). I looked at using projects.locations.jobs.create() instead, but feel this is very complex to implement particularly when we have a separate python UDF for dropping any new fields in the input when that not present in BQ schema. I am new to GCP, can someone please guide me on how to achieve this with any simpler approach (like getting the dataflow job status within the existing trigger options itself instead of explicitly writing to pubsub) ? We are also not allowed to create any container dockers in our project needed for create() method.