Cloud Run Functions with Eventrac

Hello Team,

I have one cloud function deployed. The main moto is to get a file as soon as it lands in the bucket convert in the csv and put in the same bucket in different folder. One main.py file is there which will be having the entry function and one that will call my transformation function.

I have 6 transformation functions for 6 different entities. All are running fine except one entity. For one entity I am getting repetative event generation from the same incoming file and it is producing output file every time.

I have tackeled the event generation, repetative event generation and all. But still facing the same issue.

The deployment is -

  • gcloud run deploy $CLOUD_SERVICE_NAME --source . --function handle_gcs_event --base-image python312 --region $REGION --memory 1Gi --service-account=$SERVICE_ACCOUNT_EMAIL --no-allow-unauthenticated --ingress all

  • gcloud eventarc triggers create $EVENT_SERVICE_NAME --location=eu --destination-run-service=$CLOUD_SERVICE_NAME --destination-run-region=$REGION --event-filters=“type=google.cloud.storage.object.v1.finalized” --event-filters=“bucket=$BUCKET_NAME” --service-account=$SERVICE_ACCOUNT_EMAIL || echo “Eventarc is already setup…”

smaple main.py is -

import functions_framework
import json
import os
import tempfile
from google.cloud import storage
from google.api_core.exceptions import TooManyRequests
from mycode import myfunction
from mycode1 import myfunction1

GCS Folder Mapping

FOLDER_MAPPING = {
“Transformed/raw_incoming1”: “Transformed/output1”,
“Transformed/raw_incoming2”: “Transformed/output2”,

}

@functions_framework.cloud_event
def handle_gcs_event(cloud_event):

data = cloud_event.data
bucket_name = data[“bucket”]
file_name = data[“name”]

print(f"Event Triggered: {file_name} in {bucket_name}“)
print(f"data is: {data}”)

Skip if already processed (look for metadata)

if file_name.endswith(“.json”) or file_name.endswith(“.csv”):
if “transformed” in file_name:
print(f"Skipping already processed file: {file_name}")
return “Skipped already processed file”, 200

output_folder = map_folder(file_name)
if not output_folder:
print(f"No matching folder for {file_name}")
return “No matching folder”, 400

storage_client = storage.Client()

with tempfile.TemporaryDirectory() as temp_input_dir, tempfile.TemporaryDirectory() as temp_output_dir:
local_file_path = os.path.join(temp_input_dir, os.path.basename(file_name))

try:
download_from_gcs(storage_client, bucket_name, file_name, local_file_path)
except TooManyRequests:
print(“Rate limit exceeded, retrying…”)
return “Rate limit exceeded, try again later”, 429
except Exception as e:
print(f"Error downloading file: {e}“)
return f"Error: {e}”, 500

Route to correct transformer

if file_name.startswith(“Transformed/raw_incoming”):
profile_trns(temp_input_dir, temp_output_dir, output_folder)
elif file_name.startswith(“Transformed/raw_incoming”):
product_trns(temp_input_dir, temp_output_dir, output_folder)
elif file_name.startswith(“Transformed/raw_incoming”):
appointment_trns(temp_input_dir, temp_output_dir, output_folder)
elif file_name.startswith(“Transformed/raw_incoming”):
voucher_trns(temp_input_dir, temp_output_dir, output_folder)
elif file_name.startswith(“Transformed/raw_incoming”):
store_trns(temp_input_dir, temp_output_dir, output_folder)
elif file_name.startswith(“Transformed/raw_incoming”):
order_trns(temp_input_dir, temp_output_dir, output_folder)

else:
print(f"No valid transformer found for: {file_name}")
return “No transformer matched”, 400

Upload transformed files and tag as processed

for output_file in os.listdir(temp_output_dir):
local_output_path = os.path.join(temp_output_dir, output_file)
destination_path = os.path.join(output_folder, output_file)
upload_to_gcs(storage_client, bucket_name, local_output_path, destination_path)
print(f"Uploaded {output_file} to {destination_path}")

return “Success”, 200

def map_folder(file_name):
folder_path = os.path.dirname(file_name)
return FOLDER_MAPPING.get(folder_path, None)

def download_from_gcs(storage_client, bucket_name, source_path, local_path):
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(source_path)
blob.download_to_filename(local_path)
print(f"Downloaded {source_path} to {local_path}")

def upload_to_gcs(storage_client, bucket_name, local_file, destination_path):
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_path)
blob.upload_from_filename(local_file)
blob.metadata = {“goog-reserved-file-type”: “processed”}
blob.patch()
print(f"Uploaded {local_file} to {destination_path}")

the mycode.py is ----

import json
import csv
import os
from datetime import datetime, timezone

Function to transform required value

def transform_data(value :disappointed_face:
return [value] if value else [“”]

Header mapping (JSON keys → CSV column names)

HEADER_MAPPING = {
json keys : csv columns
}

Columns requiring transformation

TRANSFORM_COLUMNS =

Final CSV headers

CSV_HEADERS = list(HEADER_MAPPING.values()) + [“Source”]

Function to convert JSON to CSV

def profile_trns(input_dir, output_dir, output_folder :disappointed_face:

json_files = [f for f in os.listdir(input_dir) if f.endswith(‘.json’)]

if not json_files:
print(f"No JSON files found in {input_dir}. Exiting…")
return
json_file = json_files[0]
final_csv_name = os.path.splitext(json_file)[0] + “.csv”

final_csv_path = os.path.join(output_dir, final_csv_name)

with open(final_csv_path, mode=‘w’, newline=‘’, encoding=‘utf-8’) as csv_file:
writer = csv.writer(csv_file)
writer.writerow(CSV_HEADERS) # Write CSV header

Process each JSON file

for json_file in json_files:
process_json_file(input_dir, json_file, writer)

print(f"CSV file created successfully: {final_csv_path}")

Function to process a single JSON file

def process_json_file(input_dir, json_file, writer :disappointed_face:

with open(os.path.join(input_dir, json_file), ‘r’, encoding=‘utf-8’) as file:
for line in file:
line = line.strip()
if not line:
continue # Skip empty lines

try:
data = json.loads(line) # Load each JSON object separately
except json.JSONDecodeError:
print(f"Skipping malformed JSON in file {json_file}: {line}")
continue # Skip invalid JSON lines

if isinstance(data, dict :disappointed_face:
data = [data]

for record in data:

Add extra column

Prepare CSV row

row =
for key in HEADER_MAPPING.keys():
value = record.get(key, “”)
if key in TRANSFORM_COLUMNS:
value = transform_data(value)
row.append(value)

Append extra fields and source filename

row.append(json_file)

Write row to CSV

writer.writerow(row)

print(f"Processed {json_file}")

in logs I can see the json file event is repeating. and the csv files are getting more in count.

Hi @ArnabBatabyal ,

Welcome to Google Cloud Community!

From your setup, it looks like you’re hitting a classic Cloud Storage + Eventarc behavior. This isn’t a bug, it’s expected: Cloud Run Functions events are delivered at-least-once — meaning the same finalized event for a file can trigger your Cloud Run multiple times. If your transformation outputs the CSV back into the same bucket, that new file (or any metadata update) can also sometimes trigger another event, especially if the path naming isn’t carefully separated or if metadata like blob.patch() triggers unintended updates. Google recommends making Cloud Run functions idempotent to handle this.

Was this helpful? If so, please accept this answer as “Solution”. If you need additional assistance, reply here within 2 business days and I’ll be happy to help.