ColumnStore Bulk Data Loading

You are viewing an old version of this article. View the current version here.

Overview

cpimport is a high-speed bulk load utility that imports data into ColumnStore tables in a fast and efficient manner. It accepts as input any flat file containing data that contains a delimiter between fields of data (i.e. columns in a table). The default delimiter is the pipe (‘|’) character, but other delimiters such as commas may be used as well. cpimport – performs the following operations when importing data into a MariaDB ColumnStore database:

  • Data is read from specified flat files.
  • Data is transformed to fit ColumnStore’s column-oriented storage design.
  • Redundant data is tokenized and logically compressed.
  • Data is written to disk.

It is important to note that:

  • The bulk loads are an append operation to a table so they allow existing data to be read and remain unaffected during the process.
  • The bulk loads do not write their data operations to the transaction log; they are not transactional in nature but are considered an atomic operation at this time. Information markers, however, are placed in the transaction log so the DBA is aware that a bulk operation did occur.
  • Upon completion of the load operation, a high water mark in each column file is moved in an atomic operation that allows for any subsequent queries to read the newly loaded data. This append operation provides for consistent read but does not incur the overhead of logging the data.

There are two primary steps to using the cpimport utility:

  1. Optionally create a job file that is used to load data from a flat file into multiple tables.
  2. Run the cpimport utility to perform the data import.

Syntax

The simplest form of cpimport command is

cpimport dbName tblName [loadFile]

The full syntax is like this:

cpimport dbName tblName [loadFile]
[-h] [-m mode] [-f filepath] [-d DebugLevel]
[-c readBufferSize] [-b numBuffers] [-r numReaders]
[-e maxErrors] [-B libBufferSize] [-s colDelimiter] [-E EnclosedByChar]
[-C escChar] [-j jobID] [-p jobFilePath] [-w numParsers]
[-n nullOption] [-P pmList] [-i] [-S] [-q batchQty]

positional parameters:
	dbName     Name of the database to load
	tblName    Name of table to load
	loadFile   Optional input file name in current directory,
			unless a fully qualified name is given.
			If not given, input read from STDIN.
Options:
	-b	Number of read buffers
	-c	Application read buffer size(in bytes)
	-d	Print different level(1-3) debug message
	-e	Max number of allowable error per table per PM
	-f	Data file directory path.
			Default is current working directory.
			In Mode 1, -f represents the local input file path.
			In Mode 2, -f represents the PM based input file path.
			In Mode 3, -f represents the local input file path.
	-l	Name of import file to be loaded, relative to -f path,
	-h	Print this message.
	-q	Batch Quantity, Number of rows distributed per batch in Mode 1
	-i	Print extended info to console in Mode 3.
	-j	Job ID. In simple usage, default is the table OID.
			unless a fully qualified input file name is given.
	-n	NullOption (0-treat the string NULL as data (default);
			1-treat the string NULL as a NULL value)
	-p	Path for XML job description file.
	-r	Number of readers.
	-s	'c' is the delimiter between column values.
	-B	I/O library read buffer size (in bytes)
	-w	Number of parsers.
	-E	Enclosed by character if field values are enclosed.
	-C	Escape character used in conjunction with 'enclosed by'
			character, or as part of NULL escape sequence ('\N');
			default is '\'
	-I	Import binary data; how to treat NULL values:
			1 - import NULL values
			2 - saturate NULL values
	-P	List of PMs ex: -P 1,2,3. Default is all PMs.
	-S	Treat string truncations as errors.
	-m	mode
			1 - rows will be loaded in a distributed manner across PMs.
			2 - PM based input files loaded onto their respective PM.
			3 - input files will be loaded on the local PM.

cpimport modes

Mode 1: Bulk Load from a central location with single data source file

In this mode, you run the cpimport from a central location(either UM or PM). The source file is located at this central location and the data from cpimport is distributed across all the PM nodes. If no mode is specified, then this is the default for cpimport mode. The central location where cpimport is being run from could be UM or any one of the PM.

cpimport-mode1

Example
cpimport -m1 mytest mytable mytable.tbl

Mode 2: Bulk load from central location with distributed data source files

In this mode, you run the cpimport from a central location(either UM or PM). The source data is in already partitioned data files residing on the PMs. Each PM should have the source data file of the same name but containing the partitioned data for the PM

cpimport-mode2

Example
cpimport -m2 mytest mytable /home/mydata/mytable.tbl

Mode 3: Parallel distributed bulk load

In this mode, you run cpimport from the individual PM nodes independently, which will import the source file that exists on that PM. Concurrent imports can be executed on every PM for the same table.

cpimport-mode3

Example
cpimport -m3 mytest mytable /home/mydata/mytable.tbl

Note:

  • The bulk loads are an append operation to a table so they allow existing data to be read and remain unaffected during the process.
  • The bulk loads do not write their data operations to the transaction log; they are not transactional in nature but are considered an atomic operation at this time. Information markers, however, are placed in the transaction log so the DBA is aware that a bulk operation did occur.
  • Upon completion of the load operation, a high water mark in each column file is moved in an atomic operation that allows for any subsequent queries to read the newly loaded data. This append operation provides for consistent read but does not incur the overhead of logging the data.

Bulking loading data from STDIN

Data can be loaded from STDIN into ColumnStore by simply not including the loadFile parameter

Example:

cpimport db1 table1

Bulk loading output of SELECT FROM Table(s)

Standard in can also be used to directly pipe the output from an arbitrary SELECT statement into cpimport. The select statement may select from non-columnstore tables such as MyISAM or InnoDB. In the example below, the db2.source_table is selected from, using the -N flag to remove non-data formatting.

mcsmysql -e 'select * from source_table;' -N db2 | 
/usr/local/mariadb/columnstore/cpimport -j501 -s '\t' -f STDIN

Bulk loading into multiple tables

There are two ways multiple tables can be loaded in parallel.

  1. Run multiple cpimport jobs simultaneously. Tables per import should be unique or PMs for each import should be unique if using mode 3.
  2. Use colxml utility : colxml creates an XML job file for your database schema before you can import data. Multiple tables may be imported by either importing all tables within a schema or listing specific tables using the -t option in colxml. Then, using cpimport, that uses the job file generated by colxml. Here is an example of how to use colxml and cpimport to import data into all the tables in a database schema
colxml mytest -j299
cpimport -m1 -j299

colxml syntax

Usage: colxml [options] dbName

Options: 
   -d Delimiter (default '|')
   -e Maximum allowable errors (per table)
   -h Print this message
   -j Job id (numeric)
   -l Load file name
   -n "name in quotes"
   -p Path for XML job description file that is generated
   -s "Description in quotes"
   -t Table name
   -u User
   -r Number of read buffers
   -c Application read buffer size (in bytes)
   -w I/O library buffer size (in bytes), used to read files
   -x Extension of file name (default ".tbl")
   -E EnclosedByChar (if data has enclosed values)
   -C EscapeChar
   -b Debug level (1-3)

Example usage of colxml

The following tables comprise a database name ‘tpch2’:

MariaDB[tpch2]> show tables;
+---------------+
| Tables_in_tpch2 |
+--------------+
| customer    |
| lineitem    |
| nation      |
| orders      |
| part        |
| partsupp    |
| region      |
| supplier    |
+--------------+
8 rows in set (0.00 sec)
  1. First, put delimited input data file for each table in /usr/local/mariadb/columnstore/data/bulk/data/import. Each file should be named <tblname>.tbl.
  2. Run colxml for the load job for the ‘tpch2’ database as shown here:
/usr/local/mariadb/columnstore/bin/colxml tpch2 -j500
Running colxml with the following parameters:
2015-10-07 15:14:20 (9481) INFO :
Schema: tpch2
Tables:
Load Files:
-b 0
-c 1048576
-d |
-e 10
-f CSV
-j 500
-n
-p /usr/local/mariadb/columnstore/data/bulk/job/
-r 5
-s
-u
-w 10485760
-x tbl
File completed for tables:
tpch2.customer
tpch2.lineitem
tpch2.nation
tpch2.orders
tpch2.part
tpch2.partsupp
tpch2.region
tpch2.supplier
Normal exit.

Now actually run cpimport to use the job file generated by the colxml execution

/usr/local/mariadb/columnstore/bin/cpimport -j 500
Bulkload root directory : /usr/local/mariadb/columnstore/data/bulk
job description file : Job_500.xml
2015-10-07 15:14:59 (9952) INFO : successfully load job file /usr/local/mariadb/columnstore/data/bulk/job/Job_500.xml
2015-10-07 15:14:59 (9952) INFO : PreProcessing check starts
2015-10-07 15:15:04 (9952) INFO : PreProcessing check completed
2015-10-07 15:15:04 (9952) INFO : preProcess completed, total run time : 5 seconds
2015-10-07 15:15:04 (9952) INFO : No of Read Threads Spawned = 1
2015-10-07 15:15:04 (9952) INFO : No of Parse Threads Spawned = 3
2015-10-07 15:15:06 (9952) INFO : For table tpch2.customer: 150000 rows processed and 150000 rows inserted.
2015-10-07 15:16:12 (9952) INFO : For table tpch2.nation: 25 rows processed and 25 rows inserted.
2015-10-07 15:16:12 (9952) INFO : For table tpch2.lineitem: 6001215 rows processed and 6001215 rows inserted.
2015-10-07 15:16:31 (9952) INFO : For table tpch2.orders: 1500000 rows processed and 1500000 rows inserted.
2015-10-07 15:16:33 (9952) INFO : For table tpch2.part: 200000 rows processed and 200000 rows inserted.
2015-10-07 15:16:44 (9952) INFO : For table tpch2.partsupp: 800000 rows processed and 800000 rows inserted.
2015-10-07 15:16:44 (9952) INFO : For table tpch2.region: 5 rows processed and 5 rows inserted.
2015-10-07 15:16:45 (9952) INFO : For table tpch2.supplier: 10000 rows processed and 10000 rows inserted.

Comments

Comments loading...
Content reproduced on this site is the property of its respective owners, and this content is not reviewed in advance by MariaDB. The views, information and opinions expressed by this content do not necessarily represent those of MariaDB or any other party.