ColumnStoreへの一括データロード
Contents
概要
cpimportはColumnStoreのテーブルへ高速で効率的なデータ挿入を実現する一括ロードユーティリティです。このユーティリティはデータをデリミタ(区切り文字)で区切ったテキストファイルを入力とします。既定の区切り文字は('|')ですが、コンマなどを使うことも可能です。データは、CREATE TABLE文と同じ順番に並んでいる必要があり、日付の値は'yyyy-mm-dd'形式で入力する必要があります。
cpimport – MariaDB ColumnStoreデータベースへデータをインポートする際に以下の操作を行います。
- データを指定されたフラットファイルから読み込む。
- データをColumnStoreのカラム型ストレージに合うように変換する。
- 冗長データは符号化され、論理的に圧縮される。
- データがディスクに書き込まれる。
以下が重要なポイントとなります。
- 一括ロードはテーブルにに対する追加操作となるため、既存のデータは読み込み可能で、処理中は影響を受けません。
- 一括ロードはトランザクションログに出力しません。これらはトランザクション処理ではありませんが、アトミックな操作とみなすことは可能です。しかし、バルク操作が行われたという目印はログに出力されるため、バルク操作が行われたことを判別することは可能です。
- ロード処理が終了すると、それぞれのカラムのハイウォータマークはアトミックな操作の中で移動され、新たに読み込んだデータを読み込むことが可能となります。この追加処理は、一貫性のある読み込みをサポートしつつ、データロギングのオーバーヘッドを発生させません。
cpimportユーティリティを使用するには二つの主なステップがあります。
- (オプション) ジョブファイルを作成し、一つのフラットファイルから複数テーブルへロードするようにします。
- cpimportを実行しデータをインポートします。
構文
最もシンプルなcpimportコマンドの実行方法は以下の通りです。
cpimport dbName tblName [loadFile]
cpimportのオプションの詳細を以下に示します。
cpimport dbName tblName [loadFile] [-h] [-m mode] [-f filepath] [-d DebugLevel] [-c readBufferSize] [-b numBuffers] [-r numReaders] [-e maxErrors] [-B libBufferSize] [-s colDelimiter] [-E EnclosedByChar] [-C escChar] [-j jobID] [-p jobFilePath] [-w numParsers] [-n nullOption] [-P pmList] [-i] [-S] [-q batchQty] positional parameters: dbName Name of the database to load tblName Name of table to load loadFile Optional input file name in current directory, unless a fully qualified name is given. If not given, input read from STDIN. Options: -b Number of read buffers -c Application read buffer size(in bytes) -d Print different level(1-3) debug message -e Max number of allowable error per table per PM -f Data file directory path. Default is current working directory. In Mode 1, -f represents the local input file path. In Mode 2, -f represents the PM based input file path. In Mode 3, -f represents the local input file path. -l Name of import file to be loaded, relative to -f path, -h Print this message. -q Batch Quantity, Number of rows distributed per batch in Mode 1 -i Print extended info to console in Mode 3. -j Job ID. In simple usage, default is the table OID. unless a fully qualified input file name is given. -n NullOption (0-treat the string NULL as data (default); 1-treat the string NULL as a NULL value) -p Path for XML job description file. -r Number of readers. -s 'c' is the delimiter between column values. -B I/O library read buffer size (in bytes) -w Number of parsers. -E Enclosed by character if field values are enclosed. -C Escape character used in conjunction with 'enclosed by' character, or as part of NULL escape sequence ('\N'); default is '\' -I Import binary data; how to treat NULL values: 1 - import NULL values 2 - saturate NULL values -P List of PMs ex: -P 1,2,3. Default is all PMs. -S Treat string truncations as errors. -m mode 1 - rows will be loaded in a distributed manner across PMs. 2 - PM based input files loaded onto their respective PM. 3 - input files will be loaded on the local PM.
cpimportモード
モード1:一つのデータファイルから対象モジュールに一括読み込みを行います。
このモードでは、対象モジュール(ユーザーモジュールかパフォーマンスモジュールのいずれか)の場所でcpimportを実行します。ソースファイルはそのモジュールの場所にあり、cpimportは全てのパフォーマンスモジュールに分散されます。もしモードの指定がない場合、このモード1が規定の動作となります。 cpimportが実行される場所は、ユーザーモジュールかパフォーマンスモジュールのいずれかになります。
Example cpimport -m1 mytest mytable mytable.tbl
モード2:複数の分散されたデータファイルから対象モジュールに一括読み込みを行います。
このモードでは、対象モジュール(ユーザーモジュールかパフォーマンスモジュールのいずれか)の場所でcpimportを実行します。ソースファイルは既に分割された状態でパフォーマンスモジュール上に存在しています。それぞれのパフォーマンスモジュールは同名のファイルを持ち、パフォーマンスモジュールごとに分割されたデータを持っています。
Example cpimport -m2 mytest mytable /home/mydata/mytable.tbl
モード3:並行分散一括読み込みを行います。
このモードでは、cpimportをそれぞれのパフォーマンスモジュール上で個別に実行します。ソースファイルはパフォーマンスモジュール上に存在し、並列して、パフォーマンスモジュール上の同じテーブルにデータがロードされます。
Example cpimport -m3 mytest mytable /home/mydata/mytable.tbl
メモ:
- 一括ロードはテーブルへの追記動作のため、実行中も、既存のデータ読み込みは可能で、処理中も影響を受けません。
- 一括ロードは処理中動作をトランザクションログに出力しません。
Note:
- The bulk loads are an append operation to a table so they allow existing data to be read and remain unaffected during the process.
- The bulk loads do not write their data operations to the transaction log; they are not transactional in nature but are considered an atomic operation at this time. Information markers, however, are placed in the transaction log so the DBA is aware that a bulk operation did occur.
- Upon completion of the load operation, a high water mark in each column file is moved in an atomic operation that allows for any subsequent queries to read the newly loaded data. This append operation provides for consistent read but does not incur the overhead of logging the data.
Bulking loading data from STDIN
Data can be loaded from STDIN into ColumnStore by simply not including the loadFile parameter
Example:
cpimport db1 table1
Bulk loading output of SELECT FROM Table(s)
Standard in can also be used to directly pipe the output from an arbitrary SELECT statement into cpimport. The select statement may select from non-columnstore tables such as MyISAM or InnoDB. In the example below, the db2.source_table is selected from, using the -N flag to remove non-data formatting. The -q flag tells the mysql client to not cache results which will avoid possible timeouts causing the load to fail.
mcsmysql -q -e 'select * from source_table;' -N db2 | /usr/local/mariadb/columnstore/cpimport -j501 -s '\t' -f STDIN
Bulk loading into multiple tables
There are two ways multiple tables can be loaded in parallel.
- Run multiple cpimport jobs simultaneously. Tables per import should be unique or PMs for each import should be unique if using mode 3.
- Use colxml utility : colxml creates an XML job file for your database schema before you can import data. Multiple tables may be imported by either importing all tables within a schema or listing specific tables using the -t option in colxml. Then, using cpimport, that uses the job file generated by colxml. Here is an example of how to use colxml and cpimport to import data into all the tables in a database schema
colxml mytest -j299 cpimport -m1 -j299
colxml syntax
Usage: colxml [options] dbName Options: -d Delimiter (default '|') -e Maximum allowable errors (per table) -h Print this message -j Job id (numeric) -l Load file name -n "name in quotes" -p Path for XML job description file that is generated -s "Description in quotes" -t Table name -u User -r Number of read buffers -c Application read buffer size (in bytes) -w I/O library buffer size (in bytes), used to read files -x Extension of file name (default ".tbl") -E EnclosedByChar (if data has enclosed values) -C EscapeChar -b Debug level (1-3)
Example usage of colxml
The following tables comprise a database name ‘tpch2’:
MariaDB[tpch2]> show tables; +---------------+ | Tables_in_tpch2 | +--------------+ | customer | | lineitem | | nation | | orders | | part | | partsupp | | region | | supplier | +--------------+ 8 rows in set (0.00 sec)
- First, put delimited input data file for each table in /usr/local/mariadb/columnstore/data/bulk/data/import. Each file should be named <tblname>.tbl.
- Run colxml for the load job for the ‘tpch2’ database as shown here:
/usr/local/mariadb/columnstore/bin/colxml tpch2 -j500 Running colxml with the following parameters: 2015-10-07 15:14:20 (9481) INFO : Schema: tpch2 Tables: Load Files: -b 0 -c 1048576 -d | -e 10 -f CSV -j 500 -n -p /usr/local/mariadb/columnstore/data/bulk/job/ -r 5 -s -u -w 10485760 -x tbl File completed for tables: tpch2.customer tpch2.lineitem tpch2.nation tpch2.orders tpch2.part tpch2.partsupp tpch2.region tpch2.supplier Normal exit.
Now actually run cpimport to use the job file generated by the colxml execution
/usr/local/mariadb/columnstore/bin/cpimport -j 500 Bulkload root directory : /usr/local/mariadb/columnstore/data/bulk job description file : Job_500.xml 2015-10-07 15:14:59 (9952) INFO : successfully load job file /usr/local/mariadb/columnstore/data/bulk/job/Job_500.xml 2015-10-07 15:14:59 (9952) INFO : PreProcessing check starts 2015-10-07 15:15:04 (9952) INFO : PreProcessing check completed 2015-10-07 15:15:04 (9952) INFO : preProcess completed, total run time : 5 seconds 2015-10-07 15:15:04 (9952) INFO : No of Read Threads Spawned = 1 2015-10-07 15:15:04 (9952) INFO : No of Parse Threads Spawned = 3 2015-10-07 15:15:06 (9952) INFO : For table tpch2.customer: 150000 rows processed and 150000 rows inserted. 2015-10-07 15:16:12 (9952) INFO : For table tpch2.nation: 25 rows processed and 25 rows inserted. 2015-10-07 15:16:12 (9952) INFO : For table tpch2.lineitem: 6001215 rows processed and 6001215 rows inserted. 2015-10-07 15:16:31 (9952) INFO : For table tpch2.orders: 1500000 rows processed and 1500000 rows inserted. 2015-10-07 15:16:33 (9952) INFO : For table tpch2.part: 200000 rows processed and 200000 rows inserted. 2015-10-07 15:16:44 (9952) INFO : For table tpch2.partsupp: 800000 rows processed and 800000 rows inserted. 2015-10-07 15:16:44 (9952) INFO : For table tpch2.region: 5 rows processed and 5 rows inserted. 2015-10-07 15:16:45 (9952) INFO : For table tpch2.supplier: 10000 rows processed and 10000 rows inserted.