binary=Trueimport mariadb
# Simple URI
conn = mariadb.connect("mariadb://user:password@localhost:3306/mydb")
# URI with query parameters
conn = mariadb.connect("mariadb://user:password@localhost/mydb?autocommit=true&binary=true")
# Keyword arguments override URI values
conn = mariadb.connect("mariadb://user:password@localhost/mydb", database="otherdb")import mariadb
# URI connection (recommended)
with mariadb.connect("mariadb://example_user:GHbe_Su3B8@localhost/test") as connection:
print(connection.character_set)
# Keyword arguments (still supported)
with mariadb.connect(user="example_user", host="localhost", database="test", password="GHbe_Su3B8") as connection:
print(connection.character_set)
# Binary protocol enabled at connection level
with mariadb.connect("mariadb://localhost/test?binary=true") as connection:
cursor = connection.cursor() # Uses binary protocol by default
cursor.execute("SELECT * FROM users WHERE id = ?", (1,))utf8mb4import asyncio
import mariadb
async def main():
# URI connection
conn = await mariadb.asyncConnect("mariadb://user:password@localhost/mydb")
# Or with keyword arguments
conn = await mariadb.asyncConnect(
host="localhost",
user="user",
password="password",
database="mydb"
)
cursor = await conn.cursor()
await cursor.execute("SELECT * FROM users WHERE id = ?", (1,))
row = await cursor.fetchone()
await cursor.close()
await conn.close()
asyncio.run(main())pip install mariadb[pool]import mariadb
pool = mariadb.create_pool(
host="localhost",
user="user",
password="password",
database="mydb",
min_size=5,
max_size=20
)
with pool.acquire() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT 1")import asyncio
import mariadb
async def main():
pool = await mariadb.create_async_pool(
host="localhost",
user="user",
password="password",
database="mydb",
min_size=10,
max_size=50
)
async with await pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute("SELECT 1")
await pool.close()
asyncio.run(main())import mariadb
from mariadb.constants import FIELD_TYPE
print(FIELD_TYPE.GEOMETRY == mariadb.BINARY)
print(FIELD_TYPE.DATE == mariadb.DATE)
print(FIELD_TYPE.VARCHAR == mariadb.BINARY)True
True
Falsefrom mariadb.constants import *import mariadb
from mariadb.constants import *
# connection parameters
conn_params= {
"user" : "example_user",
"password" : "GHbe_Su3B8",
"host" : "localhost"
}
with mariadb.connect(**conn_params) as connection:
# test if LOAD DATA LOCAL INFILE is supported
if connection.server_capabilities & CAPABILITY.LOCAL_FILES:
print("Server supports LOCAL INFILE")Server supports LOCAL INFILEimport mariadb
from mariadb.constants import *
# connection parameters
conn_params= {
"user" : "example_user",
"password" : "wrong_password",
"host" : "localhost"
}
# try to establish a connection
try:
connection= mariadb.connect(**conn_params)
except mariadb.OperationalError as Err:
if Err.errno == ERR.ER_ACCESS_DENIED_ERROR:
print("Access denied. Wrong password!")Access denied. Wrong password!cursor.callproc("my_storedprocedure", (1,"foo"))import mariadb
from mariadb.constants import EXT_FIELD_TYPE
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
cursor = conn.cursor()
cursor.execute("SELECT id, data, location FROM test_table")
metadata = cursor.metadata
if metadata:
for i, ext_type in enumerate(metadata['ext_type_or_format']):
if ext_type == EXT_FIELD_TYPE.JSON:
print(f"Column {i} is JSON type")
elif ext_type == EXT_FIELD_TYPE.UUID:
print(f"Column {i} is UUID type")
elif ext_type == EXT_FIELD_TYPE.POINT:
print(f"Column {i} is POINT geometry type")
cursor.close()
conn.close()Noneboolimport mariadb
conn = mariadb.connect(
host='localhost',
port=3306,
user='myuser',
password='mypassword',
database='mydb'
)conn = mariadb.connect(
host='localhost',
user='myuser',
password='mypassword',
database='mydb',
ssl_ca='/path/to/ca-cert.pem',
ssl_cert='/path/to/client-cert.pem',
ssl_key='/path/to/client-key.pem',
ssl_verify_cert=True
)# Basic URI
conn = mariadb.connect("mariadb://myuser:mypassword@localhost:3306/mydb")
# URI with SSL parameters
conn = mariadb.connect(
"mariadb://myuser:mypassword@localhost/mydb",
ssl_ca='/path/to/ca-cert.pem',
ssl_verify_cert=True
)conn = mariadb.connect(
host='localhost',
user='myuser',
password='mypassword',
database='mydb',
connect_timeout=5.0,
socket_timeout=60.0,
compress=True
)# Return rows as dictionaries
conn = mariadb.connect(
host='localhost',
user='myuser',
password='mypassword',
database='mydb',
dictionary=True
)
cursor = conn.cursor()
cursor.execute("SELECT id, name FROM users LIMIT 1")
row = cursor.fetchone()
print(row['name']) # Access by column name# Multiple hosts for automatic failover
conn = mariadb.connect(
host='primary.example.com,secondary.example.com,tertiary.example.com',
port=3306,
user='myuser',
password='mypassword',
database='mydb'
)import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
# Create a transaction ID for distributed transaction
xid = conn.xid(1, "global_tx_12345", "branch_001")
print(f"XID: {xid}") # Output: (1, 'global_tx_12345', 'branch_001')
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
# Start explicit transaction
conn.begin()
cursor = conn.cursor()
cursor.execute("INSERT INTO accounts (name, balance) VALUES (?, ?)", ("Alice", 1000))
cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE name = ?", ("Alice",))
# Commit the transaction
conn.commit()
cursor.close()
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
cursor = conn.cursor()
try:
cursor.execute("INSERT INTO users (name, email) VALUES (?, ?)",
("John Doe", "john@example.com"))
cursor.execute("INSERT INTO logs (action) VALUES (?)",
("User created",))
# Commit both inserts as a single transaction
conn.commit()
print("Transaction committed successfully")
except mariadb.Error as e:
# Rollback on error
conn.rollback()
print(f"Error: {e}")
finally:
cursor.close()
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
print(f"Current user: {conn.user}")
print(f"Current database: {conn.database}")
# Switch to a different user and database
try:
conn.change_user("app_user", "app_pass", "app_db")
print(f"Changed to user: {conn.user}")
print(f"Changed to database: {conn.database}")
except mariadb.Error as e:
print(f"Failed to change user: {e}")
finally:
conn.close()import mariadb
# Using context manager (recommended - auto-closes)
with mariadb.connect("mariadb://user:password@localhost/mydb") as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
users = cursor.fetchall()
cursor.close()
# Connection automatically closed here
# Manual close
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
try:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM users")
count = cursor.fetchone()[0]
print(f"Total users: {count}")
finally:
conn.close() # Always close in finally block# Default cursor (unbuffered, text protocol, returns tuples)
cursor = conn.cursor()
# Buffered cursor (stores entire result set in memory)
cursor = conn.cursor(buffered=True)
# Binary protocol cursor (prepared statements)
cursor = conn.cursor(binary=True)
# Dictionary cursor (access columns by name)
cursor = conn.cursor(dictionary=True)
row = cursor.fetchone()
print(row['column_name'])
# Named tuple cursor (access columns as attributes)
cursor = conn.cursor(named_tuple=True)
row = cursor.fetchone()
print(row.column_name)import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
try:
# Dump debug information to server log
# Requires SUPER privilege
conn.dump_debug_info()
print("Debug info dumped to server log")
except mariadb.Error as e:
print(f"Error: {e}")
# Output: Error: Access denied; you need (at least one of) the SUPER privilege(s)
conn.close()# connection parameters
conn_params= {
"user" : "example_user",
"password" : "GHbe_Su3B8",
"host" : "localhost"
}
with mariadb.connect(**conn_params) as connection:
string = 'This string contains the following special characters: \\,"'
print(connection.escape_string(string))This string contains the following special characters: \\,\"import mariadb
import time
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
cursor = conn.cursor()
# Get list of active connections
cursor.execute("SHOW PROCESSLIST")
processes = cursor.fetchall()
print("Active connections:")
for proc in processes:
conn_id, user, host, db, command, time_val, state, info = proc
print(f"ID: {conn_id}, User: {user}, DB: {db}, Command: {command}")
# Kill a specific connection (requires SUPER privilege)
target_connection_id = 123
try:
conn.kill(target_connection_id)
print(f"Connection {target_connection_id} killed successfully")
except mariadb.Error as e:
print(f"Error killing connection: {e}")
cursor.close()
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
try:
# Check if connection is alive
conn.ping()
print("Connection is alive")
except mariadb.InterfaceError:
print("Connection lost")
finally:
conn.close()import mariadb
import time
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
try:
cursor = conn.cursor()
cursor.execute("SELECT 1")
cursor.close()
# Simulate connection timeout or network issue
# In real scenario, connection might be lost
# Reconnect with same credentials
conn.reconnect()
print("Reconnected successfully")
cursor = conn.cursor()
cursor.execute("SELECT 2")
cursor.close()
except mariadb.Error as e:
print(f"Error: {e}")
finally:
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
cursor = conn.cursor()
# Set some session variables
cursor.execute("SET @my_var = 100")
cursor.execute("SELECT @my_var")
print(cursor.fetchone()) # Output: (100,)
# Reset connection - clears session state
conn.reset()
# Previous cursor is now invalid, create new one
cursor = conn.cursor()
cursor.execute("SELECT @my_var")
print(cursor.fetchone()) # Output: (None,) - variable cleared
cursor.close()
conn.close()import mariadb
conn = mariadb.connect(
host="localhost",
user="root",
password="secret"
)
print(f"Current database: {conn.database}") # Output: None
# Select a database
conn.select_db("mydb")
print(f"Current database: {conn.database}") # Output: mydb
cursor = conn.cursor()
cursor.execute("SELECT DATABASE()")
print(cursor.fetchone()) # Output: ('mydb',)
cursor.close()
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
cursor = conn.cursor()
# Initially no warnings
print(conn.show_warnings()) # Output: None
# Generate a warning by inserting value out of range
cursor.execute("SET session sql_mode=''")
cursor.execute("CREATE TEMPORARY TABLE test_warn (a tinyint)")
cursor.execute("INSERT INTO test_warn VALUES (300)") # Value too large for tinyint
# Get warnings
warnings = conn.show_warnings()
if warnings:
for level, code, message in warnings:
print(f"{level} ({code}): {message}")
# Output: Warning (1264): Out of range value for column 'a' at row 1
cursor.close()
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
# Create transaction ID
xid = conn.xid(0, "global_tx_001", "branch_001")
# Begin distributed transaction
conn.tpc_begin(xid)
cursor = conn.cursor()
cursor.execute("INSERT INTO orders (product, quantity) VALUES (?, ?)", ("Widget", 10))
cursor.close()
# Single-phase commit (no prepare)
conn.tpc_commit(xid)
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
# Create transaction ID
xid = conn.xid(0, "global_tx_002", "branch_002")
# Begin distributed transaction
conn.tpc_begin(xid)
cursor = conn.cursor()
cursor.execute("UPDATE inventory SET quantity = quantity - 5 WHERE product = ?", ("Widget",))
cursor.close()
# Prepare transaction (phase 1)
conn.tpc_prepare()
# Commit transaction (phase 2)
conn.tpc_commit()
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
xid = conn.xid(0, "global_tx_003", "branch_003")
conn.tpc_begin(xid)
cursor = conn.cursor()
cursor.execute("INSERT INTO transactions (amount) VALUES (?)", (100.00,))
cursor.close()
# Prepare the transaction (phase 1 of 2PC)
conn.tpc_prepare()
# At this point, transaction is prepared but not committed
# Can now commit or rollback
conn.tpc_commit()
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
# Get list of pending prepared transactions
pending_xids = conn.tpc_recover()
if pending_xids:
print(f"Found {len(pending_xids)} pending transactions")
for xid_data in pending_xids:
print(f"Pending XID: {xid_data}")
# Can commit or rollback these transactions
# xid = conn.xid(*xid_data)
# conn.tpc_commit(xid)
else:
print("No pending transactions")
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
xid = conn.xid(0, "global_tx_004", "branch_004")
conn.tpc_begin(xid)
cursor = conn.cursor()
try:
cursor.execute("UPDATE accounts SET balance = balance - 1000 WHERE id = ?", (1,))
cursor.execute("UPDATE accounts SET balance = balance + 1000 WHERE id = ?", (2,))
# Check for errors
cursor.execute("SELECT balance FROM accounts WHERE id = 1")
balance = cursor.fetchone()[0]
if balance < 0:
# Rollback the distributed transaction
conn.tpc_rollback()
print("Transaction rolled back - insufficient funds")
else:
conn.tpc_prepare()
conn.tpc_commit()
print("Transaction committed")
except mariadb.Error as e:
conn.tpc_rollback()
print(f"Error: {e}")
finally:
cursor.close()
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
if conn.server_mariadb:
print("Connected to MariaDB server")
print(f"Version: {conn.server_info}")
# Use MariaDB-specific features
cursor = conn.cursor()
cursor.execute("SELECT JSON_DETAILED('{\"a\": 1}')")
else:
print("Connected to MySQL server")
print(f"Version: {conn.server_info}")
# Use MySQL-compatible features only
conn.close()boolrow.column_nameFalseimport mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
cursor = conn.cursor()
cursor.execute("SELECT id, name FROM users")
for row in cursor:
print(f"ID: {row[0]}, Name: {row[1]}")
cursor.close()
conn.close()# Stream results to reduce memory usage
cursor = conn.cursor(buffered=False)
cursor.execute("SELECT * FROM large_table")
for row in cursor:
process_row(row) # Process one row at a time
cursor.close()# Access columns by name
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT id, name, email FROM users WHERE id = ?", (123,))
user = cursor.fetchone()
print(f"Name: {user['name']}, Email: {user['email']}")
cursor.close()# Access columns as attributes
cursor = conn.cursor(named_tuple=True)
cursor.execute("SELECT id, name, email FROM users WHERE id = ?", (123,))
user = cursor.fetchone()
print(f"Name: {user.name}, Email: {user.email}")
cursor.close()# Use prepared statements for this cursor
cursor = conn.cursor(binary=True)
# First execution prepares the statement
cursor.execute("SELECT * FROM users WHERE id = ?", (1,))
# Subsequent executions reuse the prepared statement
cursor.execute("SELECT * FROM users WHERE id = ?", (2,))
cursor.close()# Dictionary cursor with streaming results
cursor = conn.cursor(dictionary=True, buffered=False)
cursor.execute("SELECT * FROM large_table")
for row in cursor:
print(f"Processing: {row['name']}")
cursor.close()>>>cursor.execute("CREATE PROCEDURE p1(IN i1 VAR CHAR(20), OUT o2 VARCHAR(40))"
"BEGIN"
" SELECT 'hello'"
" o2:= 'test'"
"END")
>>>cursor.callproc('p1', ('foo', 0))
>>> cursor.sp_outparams
False
>>> cursor.fetchone()
('hello',)
>>> cursor.nextset()
True
>>> cursor.sp_outparams
True
>>> cursor.fetchone()
('test',)data= [
(1, 'Michael', 'Widenius')
(2, 'Diego', 'Dupin')
(3, 'Lawrin', 'Novitsky')
]
cursor.executemany("INSERT INTO colleagues VALUES (?, ?, ?)", data)import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
cursor = conn.cursor()
cursor.execute("SELECT id, name, email FROM users")
# Fetch all rows at once
rows = cursor.fetchall()
for row in rows:
print(f"ID: {row[0]}, Name: {row[1]}, Email: {row[2]}")
print(f"Total rows: {len(rows)}")
cursor.close()
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
cursor = conn.cursor()
cursor.execute("SELECT id, name FROM users ORDER BY id")
# Fetch rows in batches of 10
while True:
rows = cursor.fetchmany(10)
if not rows:
break
print(f"Processing batch of {len(rows)} rows")
for row in rows:
print(f" ID: {row[0]}, Name: {row[1]}")
cursor.close()
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
cursor = conn.cursor()
cursor.execute("SELECT id, name FROM users WHERE id = ?", (1,))
# Fetch single row
row = cursor.fetchone()
if row:
print(f"User found: ID={row[0]}, Name={row[1]}")
else:
print("User not found")
cursor.close()
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
cursor = conn.cursor()
# Set arraysize for batch processing
cursor.arraysize = 100
cursor.execute("SELECT * FROM large_table")
# fetchmany() will now fetch 100 rows at a time by default
while True:
rows = cursor.fetchmany()
if not rows:
break
print(f"Processing {len(rows)} rows")
cursor.close()
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
# Unbuffered cursor (default) - streams results, low memory usage
cursor1 = conn.cursor(buffered=False)
print(f"Buffered: {cursor1.buffered}") # Output: False
cursor1.execute("SELECT * FROM large_table") # 1 million rows
# Rows are streamed one at a time, not all loaded into memory
for row in cursor1:
process_row(row) # Memory efficient
cursor1.close()
# Buffered cursor - fetches all results immediately into memory
cursor2 = conn.cursor(buffered=True)
print(f"Buffered: {cursor2.buffered}") # Output: True
cursor2.execute("SELECT * FROM small_table") # 100 rows
rows = cursor2.fetchall() # All rows loaded into memory at once
# Connection is now free for other operations
cursor2.close()
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
# Use unbuffered for large result sets to avoid memory issues
cursor = conn.cursor(buffered=False)
cursor.execute("SELECT * FROM huge_table") # Millions of rows
# Process rows one at a time without loading all into memory
for row in cursor:
# Each row is fetched on demand
process_large_row(row)
cursor.close()
# Use buffered for small result sets when you need connection freedom
cursor = conn.cursor(buffered=True)
cursor.execute("SELECT * FROM config WHERE active = 1") # Few rows
config = cursor.fetchall() # Safe to load all into memory
cursor.close()
# Can now use connection for other operations immediately
cursor2 = conn.cursor()
cursor2.execute("SELECT COUNT(*) FROM users")
count = cursor2.fetchone()[0]
cursor2.close()
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
# Using context manager (recommended - auto-closes)
with conn.cursor() as cursor:
cursor.execute("SELECT COUNT(*) FROM users")
count = cursor.fetchone()[0]
print(f"Total users: {count}")
# Cursor automatically closed here
# Manual close
cursor = conn.cursor()
try:
cursor.execute("SELECT * FROM users LIMIT 5")
rows = cursor.fetchall()
finally:
cursor.close() # Always close in finally block
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
cursor = conn.cursor()
# Access connection from cursor
print(f"Database: {cursor.connection.database}")
print(f"User: {cursor.connection.user}")
print(f"Connection ID: {cursor.connection.connection_id}")
cursor.close()
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
cursor = conn.cursor()
cursor.execute("SELECT id, name, email, created_at FROM users LIMIT 1")
# Get column information
if cursor.description:
print("Column Information:")
for i, col in enumerate(cursor.description):
print(f"\nColumn {i}:")
print(f" Name: {col[0]}")
print(f" Type: {col[1]}")
print(f" Nullable: {col[6]}")
print(f" Table: {col[8]}")
print(f" Original Name: {col[9]}")
# Example output:
# Column 0:
# Name: id
# Type: 3
# Nullable: 0
# Table: users
# Original Name: id
cursor.close()
conn.close()import mariadb
from mariadb.constants import FIELD_TYPE, FIELD_FLAG
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
cursor = conn.cursor()
cursor.execute("SELECT content FROM documents LIMIT 1")
if cursor.description[0][1] == FIELD_TYPE.BLOB:
if cursor.description[0][7] & FIELD_FLAG.BINARY:
print("Column is BLOB")
else:
print("Column is TEXT")
cursor.close()
conn.close()import mariadb
from mariadb.constants import FIELD_TYPE, FIELD_FLAG, EXT_FIELD_TYPE
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
cursor = conn.cursor()
cursor.execute("""
SELECT
id,
name AS user_name,
email,
created_at
FROM users
LIMIT 1
""")
# Get complete metadata
metadata = cursor.metadata
if metadata:
print("Complete Column Metadata:")
print(f"Number of columns: {len(metadata['field'])}\n")
for i in range(len(metadata['field'])):
print(f"Column {i}:")
print(f" Field (alias): {metadata['field'][i]}")
print(f" Original field: {metadata['org_field'][i]}")
print(f" Table (alias): {metadata['table'][i]}")
print(f" Original table: {metadata['org_table'][i]}")
print(f" Schema: {metadata['schema'][i]}")
print(f" Type: {metadata['type'][i]}")
print(f" Charset: {metadata['charset'][i]}")
print(f" Length: {metadata['length'][i]}")
print(f" Max length: {metadata['max_length'][i]}")
print(f" Decimals: {metadata['decimals'][i]}")
print(f" Flags: {metadata['flags'][i]}")
print()
# Example output:
# Column 0:
# Field (alias): id
# Original field: id
# Table (alias): users
# Original table: users
# Schema: mydb
# Type: 3
# Charset: binary
# Length: 11
# Max length: 1
# Decimals: 0
# Flags: 16899
cursor.close()
conn.close()import mariadb
from mariadb.constants import FIELD_TYPE, EXT_FIELD_TYPE
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
cursor = conn.cursor()
# Create table with extended types
cursor.execute("""
CREATE TEMPORARY TABLE test_types (
data JSON,
user_id UUID,
ip_addr INET4,
location POINT
)
""")
cursor.execute("SELECT data, user_id, ip_addr, location FROM test_types")
metadata = cursor.metadata
# Check extended types
for i, field_name in enumerate(metadata['field']):
ext_type = metadata['ext_type_or_format'][i]
base_type = metadata['type'][i]
print(f"{field_name}:")
print(f" Base type: {base_type}")
if ext_type == EXT_FIELD_TYPE.JSON:
print(f" Extended type: JSON")
elif ext_type == EXT_FIELD_TYPE.UUID:
print(f" Extended type: UUID")
elif ext_type == EXT_FIELD_TYPE.INET4:
print(f" Extended type: INET4")
elif ext_type == EXT_FIELD_TYPE.POINT:
print(f" Extended type: POINT (Geometry)")
print()
cursor.close()
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
cursor = conn.cursor()
# Create a stored procedure with OUT parameter
cursor.execute("DROP PROCEDURE IF EXISTS calculate_total")
cursor.execute("""
CREATE PROCEDURE calculate_total(
IN user_id INT,
OUT total_amount DECIMAL(10,2)
)
BEGIN
SELECT SUM(amount) INTO total_amount
FROM orders
WHERE user_id = user_id;
END
""")
# Call the procedure with OUT parameter
cursor.callproc("calculate_total", (123, 0))
# First check if current result set contains output parameters
print(f"Has output params: {cursor.sp_outparams}") # Output: True
# Fetch the output parameter value
result = cursor.fetchone()
total = result[0]
print(f"Total amount: {total}")
cursor.close()
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
cursor = conn.cursor()
# Create procedure that returns data AND has OUT parameter
cursor.execute("DROP PROCEDURE IF EXISTS get_user_stats")
cursor.execute("""
CREATE PROCEDURE get_user_stats(
IN user_id INT,
OUT order_count INT
)
BEGIN
-- First result set: user details
SELECT id, name, email FROM users WHERE id = user_id;
-- Set OUT parameter
SELECT COUNT(*) INTO order_count
FROM orders
WHERE user_id = user_id;
END
""")
# Call the procedure
cursor.callproc("get_user_stats", (123, 0))
# First result set: user details
print(f"Has output params: {cursor.sp_outparams}") # Output: False
user = cursor.fetchone()
print(f"User: {user}")
# Move to next result set (OUT parameters)
cursor.nextset()
print(f"Has output params: {cursor.sp_outparams}") # Output: True
out_params = cursor.fetchone()
order_count = out_params[0]
print(f"Order count: {order_count}")
cursor.execute("DROP PROCEDURE IF EXISTS get_user_stats")
cursor.close()
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
cursor = conn.cursor(binary=True)
# Call procedure using CALL statement with binary protocol
cursor.execute("CALL calculate_total(?, ?)", (123, 0))
# Check if result contains output parameters
if cursor.sp_outparams:
result = cursor.fetchone()
print(f"Total: {result[0]}")
cursor.close()
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
cursor = conn.cursor(buffered=False) # Default
# Execute SELECT
cursor.execute("SELECT * FROM users")
# Rowcount is -1 until all rows are fetched
print(f"Rowcount before fetch: {cursor.rowcount}") # Output: -1
# Fetch all rows
rows = cursor.fetchall()
# Now rowcount is available
print(f"Rowcount after fetch: {cursor.rowcount}") # Output: 150 (actual count)
print(f"Rows fetched: {len(rows)}") # Output: 150
cursor.close()
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
cursor = conn.cursor(buffered=True)
# Execute SELECT
cursor.execute("SELECT * FROM users WHERE active = 1")
# Rowcount is immediately available for buffered cursors
print(f"Rowcount: {cursor.rowcount}") # Output: 42 (immediately)
# Fetch the rows
rows = cursor.fetchall()
print(f"Rows fetched: {len(rows)}") # Output: 42
cursor.close()
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
cursor = conn.cursor()
# INSERT - rowcount shows affected rows
cursor.execute("INSERT INTO users (name, email) VALUES (?, ?)",
("John Doe", "john@example.com"))
print(f"Rows inserted: {cursor.rowcount}") # Output: 1
# UPDATE - rowcount shows affected rows
cursor.execute("UPDATE users SET active = 1 WHERE created_at < NOW()")
print(f"Rows updated: {cursor.rowcount}") # Output: 25
# DELETE - rowcount shows affected rows
cursor.execute("DELETE FROM users WHERE active = 0")
print(f"Rows deleted: {cursor.rowcount}") # Output: 10
# UPDATE with no matching rows
cursor.execute("UPDATE users SET active = 1 WHERE id = 99999")
print(f"Rows updated: {cursor.rowcount}") # Output: 0 (no rows matched)
conn.commit()
cursor.close()
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
cursor = conn.cursor()
# Insert multiple rows
data = [
("Alice", "alice@example.com"),
("Bob", "bob@example.com"),
("Charlie", "charlie@example.com")
]
cursor.executemany("INSERT INTO users (name, email) VALUES (?, ?)", data)
print(f"Total rows inserted: {cursor.rowcount}") # Output: 3
conn.commit()
cursor.close()
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
cursor = conn.cursor()
# Update a specific user
user_id = 123
cursor.execute("UPDATE users SET last_login = NOW() WHERE id = ?", (user_id,))
if cursor.rowcount == 0:
print(f"Warning: User {user_id} not found or not updated")
elif cursor.rowcount == 1:
print(f"User {user_id} updated successfully")
conn.commit()
else:
print(f"Error: Multiple rows affected ({cursor.rowcount})")
conn.rollback()
cursor.close()
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
cursor = conn.cursor()
# Execute a query
cursor.execute("SELECT * FROM users WHERE id = ?", (123,))
# Get the executed statement
print(f"Last statement: {cursor.statement}")
# Output: Last statement: SELECT * FROM users WHERE id = ?
# Execute another query
cursor.execute("UPDATE users SET last_login = NOW() WHERE id = ?", (123,))
print(f"Last statement: {cursor.statement}")
# Output: Last statement: UPDATE users SET last_login = NOW() WHERE id = ?
cursor.close()
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
cursor = conn.cursor()
# Execute statement that may generate warnings
cursor.execute("SET session sql_mode=''")
cursor.execute("CREATE TEMPORARY TABLE test_warn (a tinyint)")
cursor.execute("INSERT INTO test_warn VALUES (300)") # Out of range
# Check warning count from cursor
if cursor.warnings > 0:
print(f"Number of warnings: {cursor.warnings}")
# Get detailed warnings from connection
warnings = conn.show_warnings()
for level, code, message in warnings:
print(f"{level} ({code}): {message}")
# Output: Warning (1264): Out of range value for column 'a' at row 1
cursor.close()
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
cursor = conn.cursor()
cursor.execute("SELECT id, name FROM users ORDER BY id LIMIT 5")
print(f"Initial rownumber: {cursor.rownumber}") # Output: 0
# Fetch rows one by one
row1 = cursor.fetchone()
print(f"After 1st fetch: {cursor.rownumber}") # Output: 1
row2 = cursor.fetchone()
print(f"After 2nd fetch: {cursor.rownumber}") # Output: 2
# Fetch remaining rows
remaining = cursor.fetchall()
print(f"After fetchall: {cursor.rownumber}") # Output: 5
cursor.close()
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
cursor = conn.cursor()
# Before executing any query
print(f"Field count: {cursor.field_count}") # Output: 0
# Execute SELECT with 3 columns
cursor.execute("SELECT id, name, email FROM users LIMIT 1")
print(f"Field count: {cursor.field_count}") # Output: 3
# Execute SELECT with all columns
cursor.execute("SELECT * FROM users LIMIT 1")
print(f"Field count: {cursor.field_count}") # Output: (number of columns in users table)
# Execute non-SELECT statement
cursor.execute("UPDATE users SET active = 1 WHERE id = 1")
print(f"Field count: {cursor.field_count}") # Output: 0 (no result set)
cursor.close()
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
cursor = conn.cursor()
print(f"Cursor closed: {cursor.closed}") # Output: False
# Execute a query
cursor.execute("SELECT 1")
print(f"Cursor closed: {cursor.closed}") # Output: False
# Close the cursor
cursor.close()
print(f"Cursor closed: {cursor.closed}") # Output: True
# Trying to use a closed cursor raises an error
try:
cursor.execute("SELECT 2")
except mariadb.ProgrammingError as e:
print(f"Error: {e}") # Output: Error: Cursor is closed
conn.close()import mariadb
conn = mariadb.connect("mariadb://user:password@localhost/mydb")
cursor = conn.cursor()
print(f"Cursor closed: {cursor.closed}") # Output: False
# Close the connection
conn.close()
# Cursor is now also considered closed
print(f"Cursor closed: {cursor.closed}") # Output: True