DEV Community

Shaine Ismail
Shaine Ismail

Posted on

Calling a stored Procedure SQL Server stored procedure from Spark

Not really a regular thing people need to do and there are options to insert the record set into a temp table which means that you can go directly into data frame. But that is an option that you need your DBA's to switch on.

the following uses a jdbc connection and a result set into a RDD then into a dataframe

import java.sql.DriverManager
import java.sql.Connection

import org.apache.spark.sql.types.{StructType, StructField, StringType,IntegerType};
import java.sql.ResultSet

//driver = 'com.microsoft.sqlserver.jdbc.SQLServerDriver' 

val username = ""
val pass = ""
val url = "jdbc:sqlserver://<host>:<port>;databaseName=<db>"



val columns = Seq ("DateCreated", "Action", "ServerName", "IPAddress", "Domain")

val schema = StructType(List(
    StructField("DateCreated", StringType, nullable = true),
    StructField("Action", StringType,  nullable = true),
    StructField("ServerName", StringType,  nullable = true),
    StructField("IPAddress", StringType,  nullable = true),
    StructField("Domain", StringType,  nullable = true)
))


def parseResultSet(rs: ResultSet): Row = {
    val resultSetRecord = columns.map(c => rs.getString(c))
    Row(resultSetRecord:_*)
}

def resultSetToIter(rs: ResultSet)(f: ResultSet => Row): Iterator[Row] = new Iterator[Row] {
    def hasNext: Boolean = rs.next()
    def next(): Row = f(rs)
}

def paralleliseResultSet(rs: ResultSet, spark:SparkSession): DataFrame = {
  val rdd = spark.sparkContext.parallelize(resultSetToIter(rs(parseResultSet).toSeq)     
  spark.createDataFrame(rdd, schema)
}

val conn = DriverManager.getConnection(url, username, pass)
val rs = conn.createStatement.executeQuery("exec <sp name>")
val df: DataFrame = paralleliseResultSet(rs,spark)
df.createOrReplaceTempView("df")

spark.sql("""select * from df""").show(10, False)

Top comments (0)