Hello Team,
I have one cloud function deployed. The main moto is to get a file as soon as it lands in the bucket convert in the csv and put in the same bucket in different folder. One main.py file is there which will be having the entry function and one that will call my transformation function.
I have 6 transformation functions for 6 different entities. All are running fine except one entity. For one entity I am getting repetative event generation from the same incoming file and it is producing output file every time.
I have tackeled the event generation, repetative event generation and all. But still facing the same issue.
The deployment is -
-
gcloud run deploy $CLOUD_SERVICE_NAME --source . --function handle_gcs_event --base-image python312 --region $REGION --memory 1Gi --service-account=$SERVICE_ACCOUNT_EMAIL --no-allow-unauthenticated --ingress all
-
gcloud eventarc triggers create $EVENT_SERVICE_NAME --location=eu --destination-run-service=$CLOUD_SERVICE_NAME --destination-run-region=$REGION --event-filters=“type=google.cloud.storage.object.v1.finalized” --event-filters=“bucket=$BUCKET_NAME” --service-account=$SERVICE_ACCOUNT_EMAIL || echo “Eventarc is already setup…”
smaple main.py is -
import functions_framework
import json
import os
import tempfile
from google.cloud import storage
from google.api_core.exceptions import TooManyRequests
from mycode import myfunction
from mycode1 import myfunction1
GCS Folder Mapping
FOLDER_MAPPING = {
“Transformed/raw_incoming1”: “Transformed/output1”,
“Transformed/raw_incoming2”: “Transformed/output2”,
}
@functions_framework.cloud_event
def handle_gcs_event(cloud_event):
data = cloud_event.data
bucket_name = data[“bucket”]
file_name = data[“name”]
print(f"Event Triggered: {file_name} in {bucket_name}“)
print(f"data is: {data}”)
Skip if already processed (look for metadata)
if file_name.endswith(“.json”) or file_name.endswith(“.csv”):
if “transformed” in file_name:
print(f"Skipping already processed file: {file_name}")
return “Skipped already processed file”, 200
output_folder = map_folder(file_name)
if not output_folder:
print(f"No matching folder for {file_name}")
return “No matching folder”, 400
storage_client = storage.Client()
with tempfile.TemporaryDirectory() as temp_input_dir, tempfile.TemporaryDirectory() as temp_output_dir:
local_file_path = os.path.join(temp_input_dir, os.path.basename(file_name))
try:
download_from_gcs(storage_client, bucket_name, file_name, local_file_path)
except TooManyRequests:
print(“Rate limit exceeded, retrying…”)
return “Rate limit exceeded, try again later”, 429
except Exception as e:
print(f"Error downloading file: {e}“)
return f"Error: {e}”, 500
Route to correct transformer
if file_name.startswith(“Transformed/raw_incoming”):
profile_trns(temp_input_dir, temp_output_dir, output_folder)
elif file_name.startswith(“Transformed/raw_incoming”):
product_trns(temp_input_dir, temp_output_dir, output_folder)
elif file_name.startswith(“Transformed/raw_incoming”):
appointment_trns(temp_input_dir, temp_output_dir, output_folder)
elif file_name.startswith(“Transformed/raw_incoming”):
voucher_trns(temp_input_dir, temp_output_dir, output_folder)
elif file_name.startswith(“Transformed/raw_incoming”):
store_trns(temp_input_dir, temp_output_dir, output_folder)
elif file_name.startswith(“Transformed/raw_incoming”):
order_trns(temp_input_dir, temp_output_dir, output_folder)
else:
print(f"No valid transformer found for: {file_name}")
return “No transformer matched”, 400
Upload transformed files and tag as processed
for output_file in os.listdir(temp_output_dir):
local_output_path = os.path.join(temp_output_dir, output_file)
destination_path = os.path.join(output_folder, output_file)
upload_to_gcs(storage_client, bucket_name, local_output_path, destination_path)
print(f"Uploaded {output_file} to {destination_path}")
return “Success”, 200
def map_folder(file_name):
folder_path = os.path.dirname(file_name)
return FOLDER_MAPPING.get(folder_path, None)
def download_from_gcs(storage_client, bucket_name, source_path, local_path):
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(source_path)
blob.download_to_filename(local_path)
print(f"Downloaded {source_path} to {local_path}")
def upload_to_gcs(storage_client, bucket_name, local_file, destination_path):
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_path)
blob.upload_from_filename(local_file)
blob.metadata = {“goog-reserved-file-type”: “processed”}
blob.patch()
print(f"Uploaded {local_file} to {destination_path}")
the mycode.py is ----
import json
import csv
import os
from datetime import datetime, timezone
Function to transform required value
def transform_data(value ![]()
return [value] if value else [“”]
Header mapping (JSON keys → CSV column names)
HEADER_MAPPING = {
json keys : csv columns
}
Columns requiring transformation
TRANSFORM_COLUMNS =
Final CSV headers
CSV_HEADERS = list(HEADER_MAPPING.values()) + [“Source”]
Function to convert JSON to CSV
def profile_trns(input_dir, output_dir, output_folder ![]()
json_files = [f for f in os.listdir(input_dir) if f.endswith(‘.json’)]
if not json_files:
print(f"No JSON files found in {input_dir}. Exiting…")
return
json_file = json_files[0]
final_csv_name = os.path.splitext(json_file)[0] + “.csv”
final_csv_path = os.path.join(output_dir, final_csv_name)
with open(final_csv_path, mode=‘w’, newline=‘’, encoding=‘utf-8’) as csv_file:
writer = csv.writer(csv_file)
writer.writerow(CSV_HEADERS) # Write CSV header
Process each JSON file
for json_file in json_files:
process_json_file(input_dir, json_file, writer)
print(f"CSV file created successfully: {final_csv_path}")
Function to process a single JSON file
def process_json_file(input_dir, json_file, writer ![]()
with open(os.path.join(input_dir, json_file), ‘r’, encoding=‘utf-8’) as file:
for line in file:
line = line.strip()
if not line:
continue # Skip empty lines
try:
data = json.loads(line) # Load each JSON object separately
except json.JSONDecodeError:
print(f"Skipping malformed JSON in file {json_file}: {line}")
continue # Skip invalid JSON lines
if isinstance(data, dict ![]()
data = [data]
for record in data:
Add extra column
Prepare CSV row
row =
for key in HEADER_MAPPING.keys():
value = record.get(key, “”)
if key in TRANSFORM_COLUMNS:
value = transform_data(value)
row.append(value)
Append extra fields and source filename
row.append(json_file)
Write row to CSV
writer.writerow(row)
print(f"Processed {json_file}")
in logs I can see the json file event is repeating. and the csv files are getting more in count.