A simple DataFrame export application

In this example we will export a synthetic DataFrame out of Spark into a non existing table in MariaDB ColumnStore. The full code for this can be found in the ExportDataFrame.scala file in the mcsapi codebase.

First, all needed libraries are imported and the SparkContext is created.

ExportDataFrame.scala
17
18
19
20
21
22
23
24
25
import org.apache.spark.sql.{SparkSession,DataFrame}
import java.util.Properties
import java.sql.{DriverManager,Connection,PreparedStatement,SQLException}
import com.mariadb.columnstore.api.connector.ColumnStoreExporter

object DataFrameExportExample {
	def main(args: Array[String]) {
		// get a Spark session
		val spark = SparkSession.builder.appName("DataFrame export into MariaDB ColumnStore").getOrCreate()

Second, an example DataFrame that is going to be exported into ColumnStore is created. The DataFrame consists of two columns. One containing numbers from 0-127 and the other containing its ASCII character representation.

ExportDataFrame.scala
27
28
29
		// generate a sample DataFrame to be exported
		import spark.implicits._
		val df = spark.sparkContext.makeRDD(0 to 127).map(i => (i, i.toChar.toString)).toDF("number", "ASCII_representation")

Third, the JDBC connection necessary to execute DML statements to create the target table is set up.

ExportDataFrame.scala
31
32
33
34
35
36
37
38
39
40
		// set the variables for a JDBC connection
		var host = "um1"
		var user = "root"
		var password = ""
		
		val url = "jdbc:mysql://"+host+":3306/"
		val connectionProperties = new Properties()
		connectionProperties.put("user", user)
		connectionProperties.put("password", password)
		connectionProperties.put("driver", "org.mariadb.jdbc.Driver")

Fourth, the target table “spark_export” is created in the database “test”. Note that ColumnStoreExporter.generateTableStatement infers a suitable DML statement based on the DataFrame’s structure.

ExportDataFrame.scala
42
43
44
45
46
47
48
49
50
51
52
53
54
55
		// generate the target table
		val createTableStatement = ColumnStoreExporter.generateTableStatement(df, "test", "spark_export")
		
		var connection: Connection = null
		try {
			connection = DriverManager.getConnection(url, connectionProperties)
			val statement = connection.createStatement
			statement.executeQuery("CREATE DATABASE IF NOT EXISTS test")
			statement.executeQuery(createTableStatement)
		}catch {
			case e: Exception => System.err.println("error during create table statement execution: " + e)
		} finally {
			connection.close()
		}

Finally, the DataFrame gets imported into MariaDB ColumnStore by bypassing the SQL layer and injecting the data directly through MariaDB’s Bulk Write SDK.

ExportDataFrame.scala
57
58
59
60
61
		// export the DataFrame
		ColumnStoreExporter.export("test", "spark_export", df)
		spark.stop()
	}
}