Query Pipelining with MariaDB Connector/Node.js (Callback API)

Overview

Node.js developers can connect to MariaDB database products using MariaDB Connector/Node.js through the Callback API to pipeline queries.

Query Pipelining

Query Pipelining is available to the Callback API through the pipelining connection option in the createConnection() function.

When pipelining queries, MariaDB Connector/Node.js uses optimistic send, where all SQL statements within a transaction are sent together in FIFO (First In First Out) order, without waiting for a response to indicate whether or not the preceding statement was successful.

Query pipelining is useful when sending query requests to a remote server at a great network distance.

Transactions are recommended when using query pipelining, as an individual SQL statement cannot be prevented from running if another statement fails or encounters an error condition, since the statements are sent in bulk.

Code Example: Query Pipelining

The following example shows how to use query pipelining to add and update data to the example table created in example setup:

const mariadb = require("mariadb/callback");

function main() {
   let conn;

   try {
      // Establish Connection (with query pipelining set)
      conn = mariadb.createConnection({
         host: "192.0.2.50",
         user: "db_user",
         password: "db_user_password",
         database: "test",
         pipelining: true,
      });

      const contact = {
         a
         first_name: "John",
         last_name: "Smith",
         email: "john.smith@example.com",
      };

      const update = {
         first_name: "John",
         email: "johnsmith@example.com",
      };

       addContactsInPipeline(conn, contact, update);
   } catch (err) {
      // Manage Errors
      console.error("Error connecting to the database with pipeline: ", err);
   } finally {
      if (conn) conn.end(err => {
         if(err) {
            console.log("SQL error in closing connection: ", err);
         }
      })
   }
}

function addContactsInPipeline(conn, data, update) {
   // Start Transaction
   conn.beginTransaction(error => {
      if (error){
         console.log("SQL error in starting a transaction: ", error);
      } else {

         let hasError = false;

         // Contact
         conn.query(
            "INSERT INTO test.contacts(first_name, last_name, email) VALUES(?, ?, ?)",
            [data.first_name,data.last_name,data.email],
            (err,res,meta) => {
               if (err) {
                  console.error("Error loading data, reverting changes: ", err);
                  hasError = true;
                } else {
                   console.log(res);
                   console.log(meta);
                }
             }
          );

         // Update
         conn.query(
            "UPDATE test.contacts SET email = ? WHERE first_name = ?",
            [update.email, update.first_name],
            (err,res,meta) => {
               if (err) {
                  console.error("Error loading data, reverting changes: ", err);
                  hasError = true;
               } else {
                  console.log(res);
                  console.log(meta);
               }
               if (hasError) {
                  console.error("Error adding user, reverting changes: ", err);
                  conn.rollback(error => {
                     if (error) {
                        console.log("SQL error in rolling back a transaction: ", error);
                     }
                  });
               } else {
                  conn.commit(error => {
                     if (error) {
                        console.log("SQL error in committing a transaction: ", error);
                     }
                  });
               }
            }
         );
      }
   });
}

main();
const mariadb = require("mariadb/callback");

// Certificate Authority (CA)",
var serverCert = [fs.readFileSync(process.env.SKYSQL_CA_PEM, "utf8")];

function main() {
   let conn;

   try {
      // Establish Connection (with query pipelining set)
      conn = mariadb.createConnection({
         host: "example.skysql.net",
         port: 5009,
         ssl: { ca: serverCert },
         user: "db_user",
         password: "db_user_password",
         database: "test",
         pipelining: true,
      });

      const contact = {
         a
         first_name: "John",
         last_name: "Smith",
         email: "john.smith@example.com",
      };

      const update = {
         first_name: "John",
         email: "johnsmith@example.com",
      };

       addContactsInPipeline(conn, contact, update);
   } catch (err) {
      // Manage Errors
      console.error("Error connecting to the database with pipeline: ", err);
   } finally {
      if (conn) conn.end(err => {
         if(err) {
            console.log("SQL error in closing connection: ", err);
         }
      })
   }
}

function addContactsInPipeline(conn, data, update) {
   // Start Transaction
   conn.beginTransaction(error => {
      if (error){
         console.log("SQL error in starting a transaction: ", error);
      } else {

         let hasError = false;

         // Contact
         conn.query(
            "INSERT INTO test.contacts(first_name, last_name, email) VALUES(?, ?, ?)",
            [data.first_name,data.last_name,data.email],
            (err,res,meta) => {
               if (err) {
                  console.error("Error loading data, reverting changes: ", err);
                  hasError = true;
                } else {
                   console.log(res);
                   console.log(meta);
                }
             }
          );

         // Update
         conn.query(
            "UPDATE test.contacts SET email = ? WHERE first_name = ?",
            [update.email, update.first_name],
            (err,res,meta) => {
               if (err) {
                  console.error("Error loading data, reverting changes: ", err);
                  hasError = true;
               } else {
                  console.log(res);
                  console.log(meta);
               }
               if (hasError) {
                  console.error("Error adding user, reverting changes: ", err);
                  conn.rollback(error => {
                     if (error) {
                        console.log("SQL error in rolling back a transaction: ", error);
                     }
                  });
               } else {
                  conn.commit(error => {
                     if (error) {
                        console.log("SQL error in committing a transaction: ", error);
                     }
                  });
               }
            }
         );
      }
   });
}

main();
  • Two SQL statements are pipelined: one to add data and the other to update data.

  • The beginTransaction() method begins a transaction.

  • In the body of the transaction, an if statement catches any errors.

  • MariaDB Connector/Node.js adds data to the database using the query() function with an INSERTINSERT statement.

  • The body of the query() function checks for and reports any errors, then prints the returned results and metadata.

  • MariaDB Connector/Node.js updates data in the database using the query() function with an UPDATEUPDATE statement.

  • The body of the query() function checks for and reports any errors, then logs the results. If it encounters an error, it calls the rollback() functions to revert the changes in the transaction. If it encounters no errors, it calls the commit() function to commit the transactions.

  • The callback function outputs the result, and the metadata fields to the console.

  • MariaDB Connector/Node.js closes the connection with conn.end() to free up resources.

The output from the script lists number of affectedRows for each of the DML statements. Metadata is undefined.

OkPacket { affectedRows: 1, insertId: 6, warningStatus: 0 }
undefined
OkPacket { affectedRows: 2, insertId: 0, warningStatus: 0 }
undefined