Hey everyone,
I’m currently working on scheduling a Dataflow pipeline, and I’m facing a challenge in parameterizing the source file path based on the latest received file in our Google Cloud Storage (GCS) bucket.
Every day, we receive a new source file with the naming convention “SALES_YYYY-MM-DD”. Since the date suffix changes daily according to the current date, I need to dynamically select the appropriate file for processing in the pipeline.
I’ve tried to set up parameters as shown in the screenshot, but I keep encountering an “object not found” error.
Could anyone provide guidance on the correct approach for this situation? I’d greatly appreciate any help or insights you can offer!
Thanks in advance,
Murali
3 Likes
The “Object not found” error likely arises because the wildcard (%Y-%m-%d) used directly within the Cloud Storage input file path doesn’t automatically select the latest file. While effective for pattern matching, it requires an additional mechanism to dynamically identify and retrieve the most recent file.
The Solution: Orchestration and Dynamic Parameterization
Step 1: Cloud Function Trigger
- Set up a Google Cloud Function to trigger on new file events in your GCS bucket (e.g., a Finalize/Create event). Ensure this also accounts for any subsequent file modifications if your process involves changes to files post-creation.
Step 2: Find the Latest File
- Within the Cloud Function, implement logic to:
- List objects in your GCS bucket with a prefix or pattern (e.g., ‘SALES_YYYY-MM-DD.csv’).
- Filter and sort this list by the
time_created property in descending order.
- Select the most recent file from this sorted list.
Step 3: Dynamic Parameter in Dataflow Template
- Modify your Dataflow template to accept an
input_file parameter, which will specify the path to the file to be processed.
Step 4: Launch Dataflow from Cloud Function
- Utilize the Google Cloud Dataflow API within the Cloud Function to launch your Dataflow job, passing the path of the latest file as the
input_file parameter.
Implementation Notes
- Cloud Function Runtime: Choose a runtime (e.g., Python) that is compatible with GCS interactions.
- GCS Client Libraries: Use these libraries to facilitate interactions with your storage bucket.
- Dataflow API: Employ the
projects.locations.templates.launch method from the Dataflow API to initiate your pipeline with the necessary parameters.
Example (Python Cloud Function):
from google.cloud import storage
from google.cloud import dataflow_v1beta3
import datetime
def launch_dataflow(event, context):
storage_client = storage.Client()
bucket = storage_client.bucket('your_bucket_name')
blobs = list(bucket.list_blobs(prefix='SALES_'))
if blobs: # Check if any blobs match the pattern
# Adjust for time zone (if needed)
latest_blob = max(blobs, key=lambda blob: blob.time_created)
dataflow_client = dataflow_v1beta3.DataflowServiceClient()
# Launch Dataflow job with latest_blob.name as input_file
# ... (rest of the Dataflow launch code)
else:
print("No matching files found.")
Additional Considerations:
- Concurrency: If you expect multiple files to be uploaded within a short period, you might want to explore ways to handle concurrency in your Cloud Function to avoid conflicts.
- Logging: Implementing comprehensive logging within your Cloud Function can be invaluable for debugging and monitoring the health of your workflow.
2 Likes