This is the fourth part of the series where we create service to interact with mysql server in rails using mysql2 gem.
Requirements
- [x] Service to connect with external mysql server
- [x] Perform basic query: select, insert and update
- [x] Prepared statement
- [ ] Perform transactions
- [ ] Perform join query
In previous three articles, we created a service, added methods to help us perform select, insert and update operations and also added method to help us in performing prepared statements. Today we will be looking at performing transactions in mysql server using mysql2 gem.
In this blog
We will be learning the following in this blog:
- Perform transactions
Transaction
A transaction helps us in performing multiple queries to database. Though each query is performed one by one, the concept of transaction is either perform all queries or none at all which means even if one query fails, changes made by all other queries will be undone from the database.
Transaction is very helpful when we have to make sure that all queries are performed successfully. The most famous example for this is money transfer via bank, i.e. when one person transfers amount to another persons' account, amount from first account should be decreased and amount from second account should be increased. This can't be failed as this affects one/both person severely. In this case transaction is used to ensure that decrease and increase of amount is made on both side or transfer is failed as a whole.
Performing Transaction
Here is what we will do for supporting transactions in our service:
- Accept
transaction_attributes_arrayparameter in bothinsertandupdatemethod.transaction_attributesis an array of hashes which includes name of a table for the query, it's primary column and finally attribute hash needed to perform the operation. - Create new method
prepare_transaction_querieswhich will taketransaction_attributes_arrayas params and return array of prepared queries. - In
insertandupdate, we will push existing/main query to transaction queries array for performing transactions. - For performing transactions we will add a method
perform_transactionwhich will accept a block i.e. queries here. -
perform_transactionmethod will then call another method calledtransactionwhich will wrap all queries inside BEGIN and COMMIT and execute them one by one. This is a standard way of performing transactions in MySQL. Also we will rescue and execute ROLLBACK in case any of the query in the array fails to execute.
Code
INSERT_QUERY_TYPE = 'insert'.freeze
UPDATE_QUERY_TYPE = 'update'.freeze
def insert(attributes, transaction_attributes_array = [])
query = prepare_query(attributes, INSERT_QUERY_TYPE)
transaction_queries = prepare_transaction_queries(transaction_attributes_array, INSERT_QUERY_TYPE)
transaction_queries.push(query)
perform_mysql_operation do
perform_transaction(INSERT_QUERY_TYPE, transaction_queries)
puts 'Record inserted!'
end
end
def update(id, attributes, transaction_attributes_array = [])
query = prepare_query(attributes, UPDATE_QUERY_TYPE)
transaction_queries = prepare_transaction_queries(transaction_attributes_array, UPDATE_QUERY_TYPE)
transaction_queries.push(query)
perform_mysql_operation do
perform_transaction(UPDATE_QUERY_TYPE, transaction_queries, id)
puts 'Record Updated!'
end
end
private
def prepare_insert_query(keys, transaction_table = nil)
columns = keys.join(', ')
substituted_columns = keys.map { '?' }.join(', ')
table_name = transaction_table || table
"INSERT INTO #{table_name} (#{columns}) VALUES (#{substituted_columns})"
end
def prepare_update_query(keys, transaction_table = nil, transaction_primary_column = nil)
columns = keys.map { |key| "#{key} = ?" }.join(', ')
table_name = transaction_table || table
primary_column_name = transaction_primary_column || primary_column
"UPDATE #{table_name} SET #{columns} WHERE #{primary_column_name} = ?"
end
def primary_column_hash(query_type, primary_column, attributes)
return {} if primary_column.nil? || query_type == INSERT_QUERY_TYPE
column_hash = {}
primary_column_symbol = primary_column.to_sym
column_hash[primary_column_symbol] = attributes[primary_column_symbol]
{
**column_hash,
primary_column_name: primary_column
}
end
def prepared_query_by_type(query_type, keys, transaction_table = nil, transaction_primary_column = nil)
if query_type == INSERT_QUERY_TYPE
prepare_insert_query(keys, transaction_table)
else
prepare_update_query(keys, transaction_table, transaction_primary_column)
end
end
def prepare_query(attributes, type, transaction_table = nil, transaction_primary_column = nil)
raise 'Attributes cannot be empty' if attributes.empty?
keys = attributes.keys
values = attributes.values
{
prepared_query: prepared_query_by_type(type, keys, transaction_table, transaction_primary_column),
values: values
}
end
def params_for_prepare_query(query_type, transaction_attribute)
attributes = transaction_attribute[:attributes]
transaction_table = transaction_attribute[:table]
default_params = [attributes, query_type, transaction_table]
return default_params if query_type == INSERT_QUERY_TYPE
transaction_primary_column = transaction_attribute[:primary_column]
default_params.push(transaction_primary_column)
end
def prepare_transaction_queries(attributes_array, type)
attributes_array.map do |transaction_attribute|
params = params_for_prepare_query(type, transaction_attribute)
{
**primary_column_hash(type, transaction_attribute[:primary_column], transaction_attribute[:attributes]),
**prepare_query(*params)
}
end
end
def transaction
raise ArgumentError, 'No block was given' unless block_given?
begin
mysql_client.query('BEGIN')
yield
mysql_client.query('COMMIT')
rescue StandardError => e
mysql_client.query('ROLLBACK')
raise e
end
end
def perform_insert_transaction(transaction_queries)
transaction_queries.each do |transaction_query|
statement = mysql_client.prepare(transaction_query[:prepared_query])
statement.execute(*transaction_query[:values])
end
end
def perform_update_transaction(transaction_queries, main_table_id)
transaction_queries.each do |transaction_query|
values = transaction_query[:values]
primary_column_name = transaction_query[:primary_column_name]
record_id = primary_column_name && transaction_query[primary_column_name.to_sym] || main_table_id
values.push(record_id)
statement = mysql_client.prepare(transaction_query[:prepared_query])
statement.execute(*values)
end
end
def perform_transaction(query_type, transaction_queries, main_table_id = nil)
transaction do
if query_type == INSERT_QUERY_TYPE
perform_insert_transaction(transaction_queries)
else
perform_update_transaction(transaction_queries, main_table_id)
end
end
end
Explanation
There's a lot of refactoring going on here. Don't get overwhelmed just yet, we will go through each one of them. We had to refactor existing methods to support transactions. Let's now go through each methods and understand the refactor as well as transactions process.
- insert, update
-
insertandupdateis taking additional paramtransaction_attributes_arraywhich is an array of hashes with required information for each query needed to perform transactions. Following is happening inside these methods: -
transaction_attributes_arrayis sent toprepare_transaction_querieswhich converts each transaction query to prepared query and returns array of prepared transaction queries. - We are pushing main query to the array since all queries have to be performed in same transaction.
- Finally we are performing transactions by calling
perform_transactionmethod and sending all transaction queries.
- prepare_transaction_queries
-
prepare_transaction_queriesis taking paramsattributes_arrayandtype.transaction_attributes_arrayis sent toattributes_arraywhile nature of query i.e. insert or update is sent totype. - Each transaction attribute is iterated one by one to get required query for transaction.
- params_for_prepare_query
-
params_for_prepare_queryis taking paramsquery_typeandtransaction_attributes.transaction_attributesis a hash withattributes,table_nameandprimary_columnrequired for preparing single query. - If
query_typeis insert then params returned are[attributes, query_type, transaction_table]whereattributesis a hash of attributes of the transaction query.transaction_tableis the name of the table to perform query on. - If
query_typeis update, we are pushingprimary_columnto thedefault_params.primary_columnwhich helps us in specifying the record we need to update. You can view methodprepare_update_querymethod to see how theprimary_columnis being used for that purpose.
- primary_column_hash
-
primary_column_hashis receiving paramsquery_type,primary_columnandattributes - Params description is same as above method
params_for_prepare_query - Empty hash is returned if query type is
insertelse primary column attribute of the transaction query is returned together with the name of primary column inprimary_column_name - This is required when pushing value of primary_column to other attributes' values while updating the record. You can view method
perform_update_transactionto see how we are usingprimary_column_nameand pushing the primary column attribute value to other attribute values.
- prepare_query
-
prepare_queryis taking additional paramstransaction_tableandtransaction_primary_columnrequired for preparing transaction queries based on the query type.
- prepared_query_by_type
- Responsibility of
prepared_query_by_typeis to call eitherprepare_insert_queryorprepare_update_querybased on paramsquery_typei.e. insert or update and return prepared query for performing transactions
- prepare_insert_query
- For supporting transactions,
prepare_insert_queryis taking additional paramtransaction_table -
transaction_tableis the name of table where queries need to be performed on.
- prepare_update_query
-
prepare_update_queryis taking additional two paramstransaction_tableandtransaction_primary_columnfor supporting transactions -
transaction_primary_columnis the column name for the primary key of the table where transaction needs to be performed on.
- perform_transaction
-
perform_transactiontakes three params;query_type,transaction_queriesandmain_table_id -
transaction_queriesis an array of queries for performing transactions. -
main_table_idis the id of the record for the main table. You can seeperform_update_transactionon how it is being used.
- transaction
- `transaction` takes a block and perform **transactions**.
- **BEGIN** tells mysql to begin the transaction for performing multiple queries to database.
- **yield** is supporting block of code, inside the block, each query in an array is executed one by one with a loop.
- Finally **COMMIT** tells mysql to commit all transactions to database and persist all of it.
- We are rescuing and rolling back all the performed queries in case error occurs with **ROLLBACK** i.e. if even one query fails, all other queries count as failed and nothing is persisted to the database
- perform_insert_transaction
- `perform_insert_transaction` is taking param `transaction_queries`
- Each query inside transaction is prepared and executed one by one in a loop
- perform_update_transaction
- `perform_update_transaction` is taking additional param `main_table_id` apart from `transaction_queries`
- `main_table_id` is the id of a record for the main table in our service.
- As with insert, we are processing each query in a loop.
- We are storing all values of the operation inside `values`
- If query is not the main one, i.e. is related transaction query, we are extracting name of its primary column stored inside key **primary_column_name** to variable `primary_column_name`
- If the query is not the main, we are storing `main_table_id` else we are extracting value of the key **primary_column_name** and storing it to variable `record_id`
- We are then pushing the id of the record to the existing values
- Finally, we are preparing and executing the query in and to the database.
Practically:
transaction_attributes_array contains
# For insert transactions
[
{
table: 'users',
attributes: {
first_name: 'John',
last_name: 'Doe'
},
primary_column: 'id',
},
{
table: 'users',
attributes: {
first_name: 'Jane',
last_name: 'Doe'
},
primary_column: 'id',
}
]
# For update transactions
[
{
table: 'users',
attributes: {
id: 115,
first_name: 'John'
},
primary_column: 'id',
},
{
table: 'users',
attributes: {
id: 116,
last_name: 'Doe'
},
primary_column: 'id',
}
]
- As discussed previously in last article,
prepare_queryconverts primary table attributes to prepared statement. - We are sending transaction_attributes_array to
prepare_transaction_queriesfor receiving array of queries. - This is what we will receive back depending on the nature of operation we are performing i.e. insert or update
# insert
[
{
:prepared_query=>"INSERT INTO users (first_name, last_name) VALUES (?, ?)",
:values=>["John", "Doe"]
},
{
:prepared_query=>"INSERT INTO users (first_name, last_name) VALUES (?, ?)", :values=>["Jane", "Doe"]
}
]
# update
[
{
:id => 115,
:primary_column_name => "id",
:prepared_query => "UPDATE users SET id = ?, first_name = ? WHERE id = ?",
:values => [115, "John"]
},
{
:id => 116,
:primary_column_name => "id",
:prepared_query => "UPDATE users SET id = ?, last_name = ? WHERE id = ?",
:values => [116, "Doe"]
}
]
- Then we will push main query to the transaction queries since we will have to perform all queries in one transactions and roll all back if error occurs.
-
perform_transactionmethod wraps all queries in one single transaction - Finally all queries in the array are executed one by one and inserted or updated to and in mysql database using mysql2 gem.
Final Code
If you have been following the tutorial from part 1, you will have following in your service file:
require 'mysql2'
module MySqlServer
module Database
class Connect
INSERT_QUERY_TYPE = 'insert'.freeze
UPDATE_QUERY_TYPE = 'update'.freeze
attr_reader :mysql_client, :table, :primary_column
def initialize(table, primary_column)
@table = table
@primary_column = primary_column
end
def fetch_all
perform_mysql_operation do
result = mysql_client.query("SELECT ce_id, ce_peername from #{table}")
puts result.entries
end
end
def fetch_one(id)
perform_mysql_operation do
result = mysql_client.query("SELECT * from #{table} WHERE #{primary_column}=#{id}")
puts result.entries
end
end
def insert(attributes, transaction_attributes_array = [])
query = prepare_query(attributes, INSERT_QUERY_TYPE)
transaction_queries = prepare_transaction_queries(transaction_attributes_array, INSERT_QUERY_TYPE)
transaction_queries.push(query)
perform_mysql_operation do
perform_transaction(INSERT_QUERY_TYPE, transaction_queries)
puts 'Record inserted!'
end
end
def update(id, attributes, transaction_attributes_array = [])
query = prepare_query(attributes, UPDATE_QUERY_TYPE)
transaction_queries = prepare_transaction_queries(transaction_attributes_array, UPDATE_QUERY_TYPE)
transaction_queries.push(query)
perform_mysql_operation do
perform_transaction(UPDATE_QUERY_TYPE, transaction_queries, id)
puts 'Record Updated!'
end
end
private
def connect_to_db
host = ENV['MYSQL_SERVER_IP']
database = ENV['MYSQL_DB_NAME']
username = ENV['MYSQL_USERNAME']
password = ENV['MYSQL_PASSWORD']
Mysql2::Client.new(username: username, password: password, database: database, host: host)
end
def perform_mysql_operation
raise ArgumentError, 'No block was given' unless block_given?
begin
@mysql_client = connect_to_db
yield
rescue StandardError => e
raise e
ensure
mysql_client&.close
end
end
def prepare_insert_query(keys, transaction_table = nil)
columns = keys.join(', ')
substituted_columns = keys.map { '?' }.join(', ')
table_name = transaction_table || table
"INSERT INTO #{table_name} (#{columns}) VALUES (#{substituted_columns})"
end
def prepare_update_query(keys, transaction_table = nil, transaction_primary_column = nil)
columns = keys.map { |key| "#{key} = ?" }.join(', ')
table_name = transaction_table || table
primary_column_name = transaction_primary_column || primary_column
"UPDATE #{table_name} SET #{columns} WHERE #{primary_column_name} = ?"
end
def primary_column_hash(query_type, primary_column, attributes)
return {} if primary_column.nil? || query_type == INSERT_QUERY_TYPE
column_hash = {}
primary_column_symbol = primary_column.to_sym
column_hash[primary_column_symbol] = attributes[primary_column_symbol]
{
**column_hash,
primary_column_name: primary_column
}
end
def prepared_query_by_type(query_type, keys, transaction_table = nil, transaction_primary_column = nil)
if query_type == INSERT_QUERY_TYPE
prepare_insert_query(keys, transaction_table)
else
prepare_update_query(keys, transaction_table, transaction_primary_column)
end
end
def prepare_query(attributes, type, transaction_table = nil, transaction_primary_column = nil)
raise 'Attributes cannot be empty' if attributes.empty?
keys = attributes.keys
values = attributes.values
{
prepared_query: prepared_query_by_type(type, keys, transaction_table, transaction_primary_column),
values: values
}
end
def params_for_prepare_query(query_type, transaction_attribute)
attributes = transaction_attribute[:attributes]
transaction_table = transaction_attribute[:table]
default_params = [attributes, query_type, transaction_table]
return default_params if query_type == INSERT_QUERY_TYPE
transaction_primary_column = transaction_attribute[:primary_column]
default_params.push(transaction_primary_column)
end
def prepare_transaction_queries(attributes_array, type)
attributes_array.map do |transaction_attribute|
params = params_for_prepare_query(type, transaction_attribute)
{
**primary_column_hash(type, transaction_attribute[:primary_column], transaction_attribute[:attributes]),
**prepare_query(*params)
}
end
end
def transaction
raise ArgumentError, 'No block was given' unless block_given?
begin
mysql_client.query('BEGIN')
yield
mysql_client.query('COMMIT')
rescue StandardError => e
mysql_client.query('ROLLBACK')
raise e
end
end
def perform_insert_transaction(transaction_queries)
transaction_queries.each do |transaction_query|
statement = mysql_client.prepare(transaction_query[:prepared_query])
statement.execute(*transaction_query[:values])
end
end
def perform_update_transaction(transaction_queries, main_table_id)
transaction_queries.each do |transaction_query|
values = transaction_query[:values]
primary_column_name = transaction_query[:primary_column_name]
record_id = primary_column_name && transaction_query[primary_column_name.to_sym] || main_table_id
values.push(record_id)
statement = mysql_client.prepare(transaction_query[:prepared_query])
statement.execute(*values)
end
end
def perform_transaction(query_type, transaction_queries, main_table_id = nil)
transaction do
if query_type == INSERT_QUERY_TYPE
perform_insert_transaction(transaction_queries)
else
perform_update_transaction(transaction_queries, main_table_id)
end
end
end
end
end
end
After this, our service should be able to perform all basic, prepared operations and transactions in and to the external mysql server using mysql2 gem. Next week we will learn how to perform join operations using mysql2 gem. Yes we will be joining a lot of tables next week and next article will be the final one in the series. Thank you and stay tuned!
Image Credits: Cover Image by Pierre Borthiry on Unsplash
Top comments (0)