A simple DataFrame export application¶
In this example we will export a synthetic DataFrame out of Spark into a non existing table in MariaDB ColumnStore. The full code for this can be found in the ExportDataFrame.scala file in the mcsapi codebase.
First, all needed libraries are imported and the SparkContext is created.
17 18 19 20 21 22 23 24 25 | import org.apache.spark.sql.{SparkSession,DataFrame}
import java.util.Properties
import java.sql.{DriverManager,Connection,PreparedStatement,SQLException}
import com.mariadb.columnstore.api.connector.ColumnStoreExporter
object DataFrameExportExample {
def main(args: Array[String]) {
// get a Spark session
val spark = SparkSession.builder.appName("DataFrame export into MariaDB ColumnStore").getOrCreate()
|
Second, an example DataFrame that is going to be exported into ColumnStore is created. The DataFrame consists of two columns. One containing numbers from 0-127 and the other containing its ASCII character representation.
27 28 29 | // generate a sample DataFrame to be exported
import spark.implicits._
val df = spark.sparkContext.makeRDD(0 to 127).map(i => (i, i.toChar.toString)).toDF("number", "ASCII_representation")
|
Third, the JDBC connection necessary to execute DML statements to create the target table is set up.
31 32 33 34 35 36 37 38 39 40 | // set the variables for a JDBC connection
var host = "um1"
var user = "root"
var password = ""
val url = "jdbc:mysql://"+host+":3306/"
val connectionProperties = new Properties()
connectionProperties.put("user", user)
connectionProperties.put("password", password)
connectionProperties.put("driver", "org.mariadb.jdbc.Driver")
|
Fourth, the target table “spark_export” is created in the database “test”.
Note that ColumnStoreExporter.generateTableStatement
infers a suitable DML statement based on the DataFrame’s structure.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 | // generate the target table
val createTableStatement = ColumnStoreExporter.generateTableStatement(df, "test", "spark_export")
var connection: Connection = null
try {
connection = DriverManager.getConnection(url, connectionProperties)
val statement = connection.createStatement
statement.executeQuery("CREATE DATABASE IF NOT EXISTS test")
statement.executeQuery(createTableStatement)
}catch {
case e: Exception => System.err.println("error during create table statement execution: " + e)
} finally {
connection.close()
}
|
Finally, the DataFrame gets imported into MariaDB ColumnStore by bypassing the SQL layer and injecting the data directly through MariaDB’s Bulk Write SDK.
57 58 59 60 61 | // export the DataFrame
ColumnStoreExporter.export("test", "spark_export", df)
spark.stop()
}
}
|