Hi all,
I’ve built a Python-based Google Cloud Function that is triggered whenever a CSV file is uploaded to Firebase Storage (GCS). The function is supposed to:
-
Detect the upload
-
Download and parse the CSV
-
Connect to a PostgreSQL Cloud SQL database
-
Insert the rows into a table
The function successfully reaches the early stages. Based on the logs, it does the following:
Function triggered
File detected: FolderName/
Downloading file: FolderName/
File downloaded successfully
? CSV headers: […]
Establishing DB connection…
But it never reaches the log line after that:
DB connection established.
See full Python code below.
Which leads me to think it’s hanging or failing silently at the connection step.
Here’s the message appearing in the log:
2025-06-03 15:58:49.901 HAEC
POST504 72 octets 300 s APIs-Google; (+https://developers.google.com/webmasters/APIs-Google.html) https://csv-processor-qjaapsghda-ew.a.run.app/?__GCP_CloudEventsMode=GCS_NOTIFICATION
What I’ve already tried:
-
Confirmed the file is valid and CSV parsing works
-
Verified that environment variables are set and correct
-
Added logs before connector.connect()
-
Checked service account has Cloud SQL Client role
-
I’m using Cloud SQL with a public IP and was able to push data from my local machine without any issues, so I can confirm that the database is up, running, and accessible.
I’ve been testing for several days now, including trying the serverless VPC connector with a private IP — which unfortunately made no difference.
There must be something I’m missing or misconfiguring, and frankly, I’m still quite new to getting this type of setup to work.
I’d really appreciate if someone could guide me step by step through the proper setup to get this working reliably. Any help would be hugely appreciated!
Thanks in advance.
import functions_framework
from google.cloud import storage
from google.cloud.sql.connector import Connector
import pg8000.native
import csv
import os
import logging
import io
Set up logging
logging.basicConfig(level=logging.INFO)
Create connector instance (shared across invocations)
connector = Connector()
Get DB credentials from environment variables
INSTANCE_CONNECTION_NAME = os.environ.get(“CLOUD_SQL_CONNECTION_NAME”)
DB_USER = os.environ.get(“DB_USER”)
DB_PASS = os.environ.get(“DB_PASS”)
DB_NAME = os.environ.get(“DB_NAME”)
Securely get a connection using the Cloud SQL Python Connector
def get_connection():
logging.info(" Establishing DB connection…“)
try:
conn = connector.connect(
INSTANCE_CONNECTION_NAME,
“pg8000”,
user=DB_USER,
password=DB_PASS,
db=DB_NAME
)
logging.info(” DB connection established.“)
return conn
except Exception as e:
logging.error(f” DB connection failed: {e}", exc_info=True)
raise
@functions_framework.cloud_event
def process_csv(cloud_event):
logging.info(" Function triggered")
bucket_name = cloud_event.data[“bucket”]
file_name = cloud_event.data[“name”]
logging.info(f" File detected: {file_name} in bucket: {bucket_name}")
if not file_name.lower().endswith(“.csv”):
logging.info(f" Skipping non-CSV file: {file_name}")
return
try:
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_name)
logging.info(f" Downloading file: {file_name}“)
csv_data = blob.download_as_text(encoding=‘utf-8’)
logging.info(f” File downloaded successfully")
reader = csv.reader(io.StringIO(csv_data))
headers = next(reader, None)
logging.info(f"? CSV headers: {headers}")
conn = get_connection()
cursor = conn.cursor()
inserted_count = 0
logging.info(" Processing CSV rows…")
for idx, row in enumerate(reader, start=1):
if not row:
continue
row = [None if val == “” else val for val in row]
try:
cursor.execute(
“”"
INSERT INTO IMP_CSV_DAT (
IMP_DTE, IMP_INV, IMP_CMP, IMP_GAD, IMP_UID,
IMP_BAR, IMP_BOS, IMP_URL, IMP_RTN, IMP_TRF
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
“”“,
row
)
inserted_count += 1
except Exception as row_err:
logging.warning(f” Row {idx} insert failed: {row_err}")
continue
conn.commit()
cursor.close()
conn.close()
logging.info(f" Inserted {inserted_count} rows from {file_name} into IMP_CSV_DAT")
blob.delete()
logging.info(f" Deleted processed file: {file_name}")
except Exception as e:
logging.error(f" Error processing file {file_name}: {str(e)}", exc_info=True)