Not really a regular thing people need to do and there are options to insert the record set into a temp table which means that you can go directly into data frame. But that is an option that you need your DBA's to switch on.
the following uses a jdbc connection and a result set into a RDD then into a dataframe
import java.sql.DriverManager
import java.sql.Connection
import org.apache.spark.sql.types.{StructType, StructField, StringType,IntegerType};
import java.sql.ResultSet
//driver = 'com.microsoft.sqlserver.jdbc.SQLServerDriver'
val username = ""
val pass = ""
val url = "jdbc:sqlserver://<host>:<port>;databaseName=<db>"
val columns = Seq ("DateCreated", "Action", "ServerName", "IPAddress", "Domain")
val schema = StructType(List(
StructField("DateCreated", StringType, nullable = true),
StructField("Action", StringType, nullable = true),
StructField("ServerName", StringType, nullable = true),
StructField("IPAddress", StringType, nullable = true),
StructField("Domain", StringType, nullable = true)
))
def parseResultSet(rs: ResultSet): Row = {
val resultSetRecord = columns.map(c => rs.getString(c))
Row(resultSetRecord:_*)
}
def resultSetToIter(rs: ResultSet)(f: ResultSet => Row): Iterator[Row] = new Iterator[Row] {
def hasNext: Boolean = rs.next()
def next(): Row = f(rs)
}
def paralleliseResultSet(rs: ResultSet, spark:SparkSession): DataFrame = {
val rdd = spark.sparkContext.parallelize(resultSetToIter(rs(parseResultSet).toSeq)
spark.createDataFrame(rdd, schema)
}
val conn = DriverManager.getConnection(url, username, pass)
val rs = conn.createStatement.executeQuery("exec <sp name>")
val df: DataFrame = paralleliseResultSet(rs,spark)
df.createOrReplaceTempView("df")
spark.sql("""select * from df""").show(10, False)
Top comments (0)