A simple DataFrame export application¶
In this example we will export a synthetic DataFrame out of Spark into a non existing table in MariaDB ColumnStore. The full code for this can be found in the ExportDataFrame.py file in the mcsapi codebase.
First, all needed libraries are imported and the SparkContext is created.
18 19 20 21 22 23 | import columnStoreExporter
from pyspark.sql import SparkSession, Row
import mysql.connector as mariadb
#Create the spark session
spark = SparkSession.builder.appName("DataFrame export into MariaDB ColumnStore").getOrCreate()
|
Second, an example DataFrame that is going to be exported into ColumnStore is created. The DataFrame consists of two columns. One containing numbers from 0-127 and the other containing its ASCII character representation.
25 26 | #Generate the DataFrame to export
df = spark.createDataFrame(spark.sparkContext.parallelize(range(0, 128)).map(lambda i: Row(number=i, ASCII_representation= chr(i))))
|
Third, the JDBC connection necessary to execute DML statements to create the target table is set up.
28 29 30 31 | #JDBC connection parameter
user = "root"
host = "um1"
password = ""
|
Fourth, the target table “pyspark_export” is created in the database “test”.
Note that ColumnStoreExporter.generateTableStatement
infers a suitable DML statement based on the DataFrame’s structure.
33 34 35 36 37 38 39 40 41 42 43 44 45 | #Create the target table
createTableStatement = columnStoreExporter.generateTableStatement(df, "test", "pyspark_export")
try:
conn = mariadb.connect(user=user, database='', host=host, password=password)
cursor = conn.cursor()
cursor.execute("CREATE DATABASE IF NOT EXISTS test")
cursor.execute(createTableStatement)
except mariadb.Error as err:
print("Error during table creation: ", err)
finally:
if cursor: cursor.close()
if conn: conn.close()
|
Finally, the DataFrame gets imported into MariaDB ColumnStore by bypassing the SQL layer and injecting the data directly through MariaDB’s Bulk Write SDK.
47 48 49 | #Export the DataFrame into ColumnStore
columnStoreExporter.export("test","pyspark_export",df)
spark.stop()
|