Query Pipelining with MariaDB Connector/Node.js (Promise API)

Overview

Node.js developers can connect to MariaDB database products using MariaDB Connector/Node.js through the Callback API to pipeline queries.

Query Pipeline

Query Pipelining is available to the Promise API through the pipelining connection option in the createConnection() function.

When pipelining queries, MariaDB Connector/Node.js uses optimistic send, where all SQL statements within a transaction are sent together in FIFO (First In First Out) order, without waiting for a response to indicate whether or not the preceding statement was successful.

Query pipelining is useful when sending query requests to a remote server at a great network distance.

Transactions are recommended when using query pipelining, as an individual SQL statement cannot be prevented from running if another statement fails or encounters an error condition, since the statements are sent in bulk.

Code Example: Query Pipelining

The following example shows how to use query pipelining to add and update data to the example table:

const mariadb = require("mariadb");

// Main Function
async function main() {
   let conn;

   try {

      // Establish Connection (with query pipelining set)
      conn = await mariadb.createConnection({
         host: "192.0.2.50",
         user: "db_user",
         password: "db_user_password",
         database: "test",
         pipelining: true,
      });

      var contacts = [
         ["John", "Smith", "john.smith@example.com"],
         ["Jon", "Smith", "jon.smith@example.com"],
         ["Johnny", "Smith", "johnny.smith@example.com"],
      ];

      const update = {
         first_name: "John",
         email: "johnsmith@example.com",
      };

      await addContactsInPipeline(conn, contacts, update);

   } catch (err) {
      // Manage Errors
      console.error("Error connecting to the database with pipeline: ", err);
   }  finally {
      // Close Connection
      if (conn) conn.close();
   }
}

async function addContactsInPipeline(conn, data, update) {
   try {

      // Start Transaction
      await conn.beginTransaction();

      try {
         // Contact
         conn.batch(
            "INSERT INTO test.contacts(first_name, last_name, email) VALUES(?, ?, ?)",
            data
         );

         // Update
         await conn.query(
            "UPDATE test.contacts SET email = ? WHERE first_name = ?",
            [update.email, update.first_name]
         );

         // Commit Changes
         await conn.commit();
      } catch (err) {
         console.error("Error adding user, reverting changes: ", err);
         await conn.rollback();
         throw err;
      }
   } catch (err) {
      console.error("Error starting a transaction: ", err);
      throw err;
   }
}

main();
const mariadb = require("mariadb");

// Certificate Authority (CA)",
var serverCert = [fs.readFileSync(process.env.SKYSQL_CA_PEM, "utf8")];

// Main Function
async function main() {
   let conn;

   try {

      // Establish Connection (with query pipelining set)
      conn = await mariadb.createConnection({
         host: "example.skysql.net",
         port: 5009,
         ssl: { ca: serverCert },
         user: "db_user",
         password: "db_user_password",
         database: "test",
         pipelining: true,
      });

      var contacts = [
         ["John", "Smith", "john.smith@example.com"],
         ["Jon", "Smith", "jon.smith@example.com"],
         ["Johnny", "Smith", "johnny.smith@example.com"],
      ];

      const update = {
         first_name: "John",
         email: "johnsmith@example.com",
      };

      await addContactsInPipeline(conn, contacts, update);

   } catch (err) {
      // Manage Errors
      console.error("Error connecting to the database with pipeline: ", err);
   }  finally {
      // Close Connection
      if (conn) conn.close();
   }
}

async function addContactsInPipeline(conn, data, update) {
   try {

      // Start Transaction
      await conn.beginTransaction();

      try {
         // Contact
         conn.batch(
            "INSERT INTO test.contacts(first_name, last_name, email) VALUES(?, ?, ?)",
            data
         );

         // Update
         await conn.query(
            "UPDATE test.contacts SET email = ? WHERE first_name = ?",
            [update.email, update.first_name]
         );

         // Commit Changes
         await conn.commit();
      } catch (err) {
         console.error("Error adding user, reverting changes: ", err);
         await conn.rollback();
         throw err;
      }
   } catch (err) {
      console.error("Error starting a transaction: ", err);
      throw err;
   }
}

main();
  • Two SQL statements are pipelined; one to add data and the other to update data.

  • MariaDB Connector/Node.js uses the beginTransaction() function to initiate a transaction.

  • MariaDB Connector/Node.js uses the batch() function with an INSERTINSERT statement to add a series of rows to the database.

  • MariaDB Connector/Node.js uses the query() function with an UPDATEUPDATE statement to update the database, it then waits for the update operation to complete before moving on.

  • MariaDB Connector/Node.js commits the transaction then waits for the operation to complete.

  • The catch() block catches any errors raised by the operation. If an error occurs, the Connector reports it and rolls back the transaction.

  • MariaDB Connector/Node.js closes the connection with conn.close() to free up resources.

Confirm the data was properly added and updated by using MariaDB Client to execute a SELECTSELECT statement:

SELECT * FROM test.contacts;
+----+------------+-----------+--------------------------+
| id | first_name | last_name | email                    |
+----+------------+-----------+--------------------------+
|  1 | John       | Smith     | johnsmith@example.com    |
|  2 | Jon        | Smith     | jon.smith@example.com    |
|  3 | Johnny     | Smith     | johnny.smith@example.com |
+----+------------+-----------+--------------------------+