Trigger Postgres stored procedure using PySpark

Hi Folks,

I have created a stored procedure in Postgres SQL named **“source”.**add_missing_columns. I want to trigger it using PySpark or Python code.

Below is the code Im trying to execute but getting error,

from pyspark.sql import SparkSession

Create a SparkSession

spark = SparkSession.builder
.appName(“Trigger PostgreSQL Stored Procedure”)
.getOrCreate()

Define PostgreSQL connection properties

url = “jdbc:postgresql://[HOST]:[PORT]/[DATABASE]”
properties = {
“user”: “[USERNAME]”,
“password”: “[PASSWORD]”,
“driver”: “org.postgresql.Driver”
}

Define the name of the stored procedure

procedure_name = “"Stg".add_missing_columns”

Define the call to the stored procedure with parameters

procedure_call = "CALL " + procedure_name + “(‘InLd’, ‘InStg’, ‘AnalytEDW’)”

Print the SQL query statement

print(“SQL Query Statement:”, procedure_call)

Execute the stored procedure using a SQL query

spark.read.jdbc(url=url, table="(SELECT " + procedure_call + “)”, properties=properties)

Stop the SparkSession

spark.stop()

SQL Query Statement: CALL “StgEntr”.add_missing_columns(‘InLd’, ‘InStg’, ‘AnalytEDW’)

Error - py4j.protocol.Py4JJavaError: An error occurred while calling o69.jdbc.
: org.postgresql.util.PSQLException: ERROR: syntax error at or near “.”

Has anyone tried this in any case? Any leads on this are much appreciated!

Thanks,

Vigneswar Jeyaraj

The error message you’re encountering, “ERROR: syntax error at or near .,”, indicates a likely issue with how the stored procedure’s name is being specified or handled within the SQL context.

When using this method, be mindful of connection management. Ensure connections are properly closed to avoid resource leaks. Consider using a connection pool or context manager for better handling. See the following code as an example:

from pyspark.sql import SparkSession

Initialize a Spark session

spark = SparkSession.builder
.appName(“Trigger PostgreSQL Stored Procedure”)
.getOrCreate()

Define PostgreSQL connection properties

url = “jdbc:postgresql://[HOST]:[PORT]/[DATABASE]”
properties = {
“user”: “[USERNAME]”,
“password”: “[PASSWORD]”,
“driver”: “org.postgresql.Driver”
}

Correctly quote the schema and procedure name if necessary

procedure_name = “"source".add_missing_columns”

Construct the SQL command to call the stored procedure with parameters

procedure_call = f"CALL {procedure_name}(‘InLd’, ‘InStg’, ‘AnalytEDW’)"

Display the SQL command to be executed

print(“SQL Query Statement:”, procedure_call)

Try executing the stored procedure using Spark SQL

try:
spark.sql(procedure_call)
print(“Procedure executed successfully via Spark SQL”)
except Exception as e:
print(“Failed via Spark SQL. Error:”, e)

If Spark SQL fails, attempt direct JDBC execution

try:
with spark._jvm.java.sql.DriverManager.getConnection(url, properties[“user”], properties[“password”]) as conn:
with conn.prepareCall(procedure_call) as stmt:
stmt.execute() # Execute the procedure without fetching results
print(“Procedure executed successfully via direct JDBC”)
except Exception as jdbc_error:
print(“Failed via direct JDBC. Error:”, jdbc_error)

Clean up by stopping the Spark session

spark.stop()