Customized MySQL LOAD DATA LOCAL INFILE handlers with libmysqlclient

One of the lesser known (and used) MySQL Client API calls seems to be mysql_set_local_infile_handler() which allows to override the default LOAD DATA LOCAL INFILE behavior on the client side, allowing to import data in text form, e.g. in CSV format, from other sources than actual local files. I searched for some more detailed info than the reference page in the manual for this, or some example code using this mechanism, but couldn’t really find any, so this post is going to try to close this gap. This allows for things similar to PostgreSQL’s COPY FROM STDIN mechanism in a way, there is a substantial difference between the two DMBS’s approaches here: with COPY FROM STDIN an application enters a sort of push mode where text data can be sent using specific library functions the mysql client library API implements a pull model instead where the client library takes control and requests application data via specific callbacks that need to be registered up front with mysql_set_local_infile_handler() before executing a LOAD DATA INFILE query.

Setting things up

mysql_set_local_infile_handler() expects four callback function pointers, one for initialization, one for fetching a single line of text, one for cleaning up, and an error handler. It also expects a mysql connection handle as first parameter, and a pointer to user defined per-connection data that will be passed to the init function later. This data pointer may point to data common to all invocations of the custom handler. In the example code I’m just using it to point to a descriptive name.

Call flow

The call flow for the various infile callbacks triggered while processing a LOAD DATA INFILE query looks like this, with the upper part being the regular case: init() being called once, then read() repeatedly until it can’t provide any more data, and finally the end() callback. The lower paths show the error handling case that is triggered by init() returning a non-zero value, or by read() returning a negative lenght to indicate an error. In this case the error() callback is called, followed by a call to end().

                      +------ 0
                      |
  +--------+          |  +--------+ |                  +-------+
  | init() |-+- == 0 -+->| read() |-+- == 0 ---------->| end() |
  +--------+ |           +--------+ |               ^  +-------+
                                                    |
            != 0                   ---------+-->| error() |-+
                                        +---------+ 

init()

int local_infile_init(void **instance_data, const char *filename, void *handler_data)

The init function is called first whenever a LOAD DATA LOCAL query is issued. It receives a pointer-pointer where it can store the pointer to local state data to be used for this LOAD operation, the filename used in the LOAD statement, and the per-connection user data pointer that was passed into mysql_set_local_infile_handler() earlier. In the example code below I’m keeping track of line numbers via the user data pointer.

read()

int local_infile_read(void *instance_data, char *buf, unsigned int buf_len)

The read() function gets called repeatedly until no more data is available. It receives the instance data pointer you stored in init(), a pointer to a buffer to store INFILE data into, and the max. amount of bytes that can be put into this buffer. You don’t have to pass a complete single line of data at a time, you can pass multiple input lines, or just part of a line, you just have to make sure not to exceed the buffer size. Data passed from the read() handler will simply be transfered over to the server, and all parsing, including splitting it into lines, happens on that side. The read() handler shall return the number of bytes that have been put into the buffer. A value of zero indicates that all data has been read and that read() should not be called again. A negative value indicates an error and will also terminate reading. There is no way to generate warnings here, just errors.

error()

int local_infile_error(void *instance_data, char *error_msg, unsigned int error_msg_len)

The error() handler is called after returning a non-zero value from init() or a negative number from read(). It receives the data pointer you’ve set up in init() and a buffer pointer plus length to write an error message to. A numeric error code can be passed as the return value. There’s no direct way to set error number and error code when hitting an error in init() or read() right away. If you need to pass on something descriptive from where the error happened to the error() handler you have to take care of doing so using the data pointer (or via global variables).

void local_infile_end(void *instance_data)

The end() handler is called after read() has returned a zero length to indicate “end of data”, or right after the error handler. Its sole purpose is to free any resources that you may have allocated in init().

Summary

While local infile handlers allow for some interesting alternatives to simple client side local file imports the current implementation feels a bit complicated and not like a good fit for applications that want to avoid the SQL parsing overhead on bulk imports, but want / need to drive the process instead of passing control to the client library. To summarize this in an itemized list:

  • Pull model works for reading data from a different stream than a simple local file, but not so much for applications that just want to avoid the SQL parsing overhead (and its synchronous nature) on importing bulk data, but are a better match for a push model like the one the PostgreSQL PQ library is using
  • No insight into the LOAD DATA statement or the related table / column meta data apart of the file name parameter
  • No flow control in the protocol beyond what TCP provides. There’s especially no way to keep the connection alive if the input stream stalls for longer than net_read_timeout
  • Error handling: I’d personally prefer the more direct way error handling is done in the UDF (User Defined Functions) API on the server side where all callbacks that may want to report an error can do so via an extra error message buffer passed as a parameter right away, but it is probably way too late to complain about this some ten years after the fact …

Protocol wise the PostgreSQL and MySQL implementations are not that different though, so maybe an alternative call interface similar to the PQputCopyData() / PQputCopyEnd() approach in libpq to libmysqlclient could be a nice addition to support both the push and pull approaches. ( … to be continued …)

Example

The following example code can be compiled using

gcc `mysql_config --cflags` infile_handler.c -o infile_handler `mysql_config --libs`

  1. /* Copyright (C) 2014 Hartmut Holzgraefe <hartmut@skysql.com>
  2.  
  3.    This program is free software; you can redistribute it and/or modify
  4.    it under the terms of the GNU General Public License as published by
  5.    the Free Software Foundation; either version 2 of the License, or
  6.    (at your option) any later version.
  7.  
  8.    This program is distributed in the hope that it will be useful,
  9.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  10.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  11.    GNU General Public License for more details.
  12.  
  13.    You should have received a copy of the GNU General Public License
  14.    along with this program; if not, write to the Free Software
  15.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  16. */
  17.  
  18. /*
  19. * MySQL C client API example: mysql_set_local_infile_handler()
  20. *
  21. */
  22.  
  23. /* configure server access here,
  24.    needs the following table in the
  25.    given MYSQL_DB:
  26.  
  27.    CREATE TABLE local_infile_test(
  28.      id  INT,
  29.      msg VARCHAR(100)
  30.    ) ENGINE=myisam;
  31.  
  32. */
  33.  
  34. #define MYSQL_HOST "127.0.0.1"
  35. #define MYSQL_DB   "test"
  36. #define MYSQL_USER "root"
  37. #define MYSQL_PWD  ""
  38.  
  39. /* config part ends here */
  40.  
  41. /* standard C headers */
  42. #include <stdlib.h>
  43. #include <stdio.h>
  44. #include <string.h>
  45. #include <time.h>
  46.  
  47. /* MySQL specific headers */
  48. #include <mysql.h>
  49. #include <errmsg.h>
  50.  
  51. /* init() handler
  52.  
  53.    void ** instance_data -> put pointer to allocated instance data here
  54.    const char filename   -> filename as given in LOAD DATA LOCAL statement
  55.    void * handler_data   -> as given as last parameter to mysql_set_local_infile_handler()
  56.  
  57.    return int: 0 on success, non-zero on errors
  58. */
  59. int local_infile_init(void **instance_data, const char *filename, void *handler_data)
  60. {
  61.   int *line_counter;
  62.  
  63.   printf("INIT '%s' ('%s')n", filename, (char *)handler_data);
  64.  
  65.   line_counter = malloc(sizeof(int));
  66.   if (NULL == line_counter) {
  67.     fprintf(stderr, "malloc failed in local_infile_init()n");
  68.     *instance_data = NULL;
  69.     return 1;
  70.   }
  71.  
  72.   *line_counter = 0;
  73.   *instance_data = (void *)line_counter;
  74.  
  75.   return 0;
  76. }
  77.  
  78. /* read() handler
  79.  
  80.    void *instance_data -> as set in init()
  81.    char *buf           -> store read data here
  82.    unsigned int buflen -> but not more than this number of bytes
  83.  
  84.    return int:    number of bytes stored in buf
  85.                OR 0 on end of data
  86.                OR negative number on errors
  87. */
  88. int local_infile_read(void *instance_data, char *buf, unsigned int buf_len)
  89. {
  90.   int *line_counter = (int *)instance_data;
  91.  
  92.   printf("READ line %d (buf_len: %u)n", ++(*line_counter), buf_len);
  93.  
  94.   switch (*line_counter) {
  95.   case 1:
  96.     strcpy(buf, "23,barn"); /* we should check that we're not
  97.                                 exceeding buf_len, skipped for
  98.                                 keeping the example short here
  99.                              */
  100.     break;
  101.  
  102.   case 2:
  103.     strcpy(buf, "42,barn");
  104.     break;
  105.  
  106.   default:
  107.     {
  108.       switch (time(NULL) % 2) {
  109.         /* 50% chance of clean exit
  110.            or triggering the error handler */
  111.       case 0:
  112.         printf("no more datan");
  113.         return 0;
  114.       case 1:
  115.         printf("forcing errorn");
  116.         return -1;
  117.       }
  118.     }
  119.   }
  120.  
  121.   return strlen(buf);
  122. }
  123.  
  124. /* end() handler
  125.  
  126.    void * instance_data -> as set in init() handler
  127. */
  128. void local_infile_end(void *instance_data)
  129. {
  130.   printf("ENDn");
  131.  
  132.   if (instance_data) {
  133.     free(instance_data);
  134.   }
  135. }
  136.  
  137. /* error() handler
  138.  
  139.    void * instance_data -> as set in init() handler
  140.    char * error_msg     -> store terminated textual error message here
  141.    unsigned int msg_len -> but no more than this many bytes, including
  142.  
  143.    return int: numeric error code
  144. */
  145. int local_infile_error(void *instance_data, char *error_msg, unsigned int error_msg_len)
  146. {
  147.   printf("ERRORn");
  148.  
  149.   /* instance data uninitialized -> init failed */
  150.   if (NULL == instance_data) {
  151.     fprintf(stderr, "allocation failure in init()n");
  152.     strcpy(error_msg, "allocation failure in init()");
  153.     return CR_OUT_OF_MEMORY;
  154.   }
  155.  
  156.   /* otherwise: read failed */
  157.   sprintf(error_msg,
  158.           "read() error on reading line %d",
  159.           *((int *)instance_data));
  160.  
  161.   return CR_UNKNOWN_ERROR;
  162. }
  163.  
  164.  
  165.  
  166. int main(int argc, char **argv)
  167. {
  168.   MYSQL *mysql = NULL;
  169.   int opt_local_infile = 1;
  170.   int stat;
  171.  
  172.   /* initialize client connection handle */
  173.   mysql = mysql_init(mysql);
  174.   if (!mysql) {
  175.     puts("Init faild, out of memory?");
  176.     return EXIT_FAILURE;
  177.   }
  178.  
  179.   /* enable local infile handling */
  180.   mysql_options(mysql, MYSQL_OPT_LOCAL_INFILE, (char*) &opt_local_infile);
  181.  
  182.   /* connect to the server */
  183.   if (!mysql_real_connect(mysql,              /* MYSQL structure to use */
  184.                           MYSQL_HOST,         /* server hostname or IP address */
  185.                           MYSQL_USER,         /* mysql user */
  186.                           MYSQL_PWD,          /* password */
  187.                           MYSQL_DB,           /* default database to use, NULL for none */
  188.                           0,                  /* port number, 0 for default */
  189.                           NULL,               /* socket file or named pipe name */
  190.                           0                   /* client flags */
  191.                           )) {
  192.     fprintf(stderr, "Connect failed: %sn", mysql_error(mysql));
  193.     exit(EXIT_FAILURE);
  194.   } else {
  195.     printf("Connect OKn");
  196.   }
  197.  
  198.   /* register our own infile handler */
  199.   mysql_set_local_infile_handler(mysql,
  200.                                  local_infile_init,
  201.                                  local_infile_read,
  202.                                  local_infile_end,
  203.                                  local_infile_error,
  204.                                  "testhandler");
  205.  
  206.   /* now trigger infile handling */
  207.   stat = mysql_query(mysql, "LOAD DATA LOCAL INFILE 'some_file' "
  208.                             "     INTO TABLE local_infile_test "
  209.                             "   FIELDS TERMINATED BY ','");
  210.   if (stat) {
  211.     /* errno and error should be those set by local_infile_error() */
  212.     fprintf(stderr, "stat: %d %sn", mysql_errno(mysql), mysql_error(mysql));
  213.   }
  214.  
  215.   /* cleanup */
  216.   mysql_close(mysql);
  217.   return EXIT_SUCCESS;
  218. }