分布式事务解决方案-基于XA协议的两阶段提交2PC简述

前言

1、事务的具体定义
事务提供一种机制,将一个活动涉及的所有操作纳入到一个不可分割的执行单元,组成事务的所有操作只有在所有操作均能正常执行的情况下方能提交,只要其中任一操作执行失败,都将导致整个事务的回滚。简单地说,事务提供一种“要么什么都不做,要么做全套(All or Nothing)”机制。

2、什么是分布式事务
分布式事务指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。就是说,对一个“大事务”处理的时候,可能会出现多个”小事务”。这些小事务在整体大事务里面,如何保证“All or Nothing”?

3、分布式事务引发的问题

一个看似简单的功能,内部可能需要调用多个服务并操作多个数据库实现,服务调用的分布式事务问题变的非常突出。比如一个简单的流程:下单成功-->积分增加。你可能会出现下单成功,积分没有增加的情况。因为订单服务和积分服务在不同的微服务上,使用的数据库也可能不一样,如何保证用户下单成功,积分要增加,成了一个大难题。分布式事务解决方案就是讲述如何解决这个问题的。

 

两阶段提交2PC(强一致性)

基于XA协议是”两阶段提交”,是基于X/OPEN(DPT)的一个分布式事务模型。 模型成员有三种:事务的发起者(如下图的APP),事务的协调管理者(TM),资源管理者(就是数据库,RM)。

1、两阶段意思是:

 a、阶段一为准备(prepare)阶段。即所有的RM参与者准备执行事务并锁住需要的资源(其实这时、RM已经在快照上执行了语句,但没有提交)。参与者,向transaction manager报告已准备就绪。 

  b、第二阶段是执行阶段,协调者TM根据所有参与者RM的反馈,通知所有参与者,步调一致地在所有分支上提交或者回滚;

2、优缺点分析

 2.1优点

2PC/XA 提供了一套完整的分布式事务的解决方案,遵循事务严格的 ACID 特性。

 2.2缺点:

a.主要业务都在协调服务上处理,比如“下单代码、积分变动代码”的实现,协调服务需要链接积分和订单两个数据库(RM)(微服务中,每个数据库种类不一定一样)。一般来说某个系统内部如果出现跨多个库的这么一个操作,是不合规的。现在微服务,一个大的系统分成几十个甚至几百个服务。一般来说,我们的规定和规范,是要求每个服务只能操作自己对应的一个数据库。

b.并且不是所有的数据库都支持XA协议。

c.性能很低。

d.如果提交的时候或rollback的时候,RM宕机了...咋办???有人又提出方案3PC....那么3pc在最后提交或回滚的时候宕机了咋办????总之有可能最终还是出现问题。

 

代码示例

.Net这方面,主要依赖于System.Transactions名称空间下的TransactionScope类或CommittableTransaction

transactionscope的介绍(MSDN) 和TransactionScope隐式事务,其他示例.Net隐式事务简单介绍

CommittableTransaction的介绍(MSDN)CommittableTransaction显示事务(可以异步提交,很厉害)

事务管理升级(MSDTC)因为2pc需要跨数据库,所以,事务会自动升级为MSDTC 事务。

 

下面贴一段.net方面的代码

using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, options)) 
{
	try 
	{
		using (SqlConnection sqlCn = new SqlConnection("Data Source=(local); User Id=scott; Password=tigger; Initial Catalog=Lab")) 
		{
			SqlCommand cmd = new SqlCommand("INSERT INTO tblAccount (Account, Password, Username) VALUES (@acct, @pwd, @name)", sqlCn);
			cmd.Parameters.Add("@acct", SqlDbType.NVarchar).Value = "EMP" + DateTime.Now.ToString("HHmmss");
			cmd.Parameters.Add("@pwd", SqlDbType.NVarchar).Value = "PWD";
			cmd.Parameters.Add("@name", SqlDbType.NVarchar).Value = "NAME" + DateTime.Now.ToString("HHmmss");
			sqlCn.Open();
			cmd.ExecuteNonQuery();
		}
		//注意要使用:.NET 2.0或以上版本的System.Data.OracleClient
		using (OracleConnection oraCn = new OracleConnection("Data Source=MYORA; User Id=scott; Password=tigger;")) 
		{
			OracleCommand cmd = new OracleCommand("INSERT INTO MYTABLE.tblAccount (Account, Password, Username) VALUES (:acct, :pwd, :name)", oraCn);
			cmd.Parameters.Add(":acct", OracleType.Varchar).Value = "EMP" + DateTime.Now.ToString("HHmmss");
			cmd.Parameters.Add(":pwd", OracleType.Varchar).Value = "PWD";
			cmd.Parameters.Add(":name", OracleType.Varchar).Value = "NAME" + DateTime.Now.ToString("HHmmss");
			oraCn.Open();
			cmd.ExecuteNonQuery();
		}
		//在scope.Complete()后才算Commit!
		scope.Complete();
	}
	catch (Exception ex) 
	{
		//只要沒有scope.Complete(),先前的动作都会Rollback
		Response.Write(ex.Message);
	}
}

 

下面贴一段java对开启xa的mysql示例代码,引用的别人代码:

package study.xa;

import com.mysql.jdbc.jdbc2.optional.MysqlXAConnection;
import com.mysql.jdbc.jdbc2.optional.MysqlXid;

import javax.sql.XAConnection;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

/***
 * @Description mysql分布式事务XAConnection模拟
 * @author denny
 * @date 2019/4/3 上午9:15
 */
public class MysqlXaConnectionTest {

    public static void main(String[] args) throws SQLException {
        //true表示打印XA语句,,用于调试
        boolean logXaCommands = true;
        // 获得资源管理器操作接口实例 RM1
        Connection conn1 = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "12345");
        XAConnection xaConn1 = new MysqlXAConnection((com.mysql.jdbc.Connection)conn1, logXaCommands);
        XAResource rm1 = xaConn1.getXAResource();

        // 获得资源管理器操作接口实例 RM2
        Connection conn2 = DriverManager.getConnection("jdbc:mysql://localhost:3306/test2", "root", "12345");
        XAConnection xaConn2 = new MysqlXAConnection((com.mysql.jdbc.Connection)conn2, logXaCommands);
        XAResource rm2 = xaConn2.getXAResource();
        // AP请求TM执行一个分布式事务,TM生成全局事务id
        byte[] gtrid = "g12345".getBytes();
        int formatId = 1;
        try {
            // ==============分别执行RM1和RM2上的事务分支====================
            // TM生成rm1上的事务分支id
            byte[] bqual1 = "b00001".getBytes();
            Xid xid1 = new MysqlXid(gtrid, bqual1, formatId);
            // 执行rm1上的事务分支 One of TMNOFLAGS, TMJOIN, or TMRESUME.
            rm1.start(xid1, XAResource.TMNOFLAGS);
            // 业务1:插入user表
            PreparedStatement ps1 = conn1.prepareStatement("INSERT into user VALUES ('99', 'user99')");
            ps1.execute();
            rm1.end(xid1, XAResource.TMSUCCESS);

            // TM生成rm2上的事务分支id
            byte[] bqual2 = "b00002".getBytes();
            Xid xid2 = new MysqlXid(gtrid, bqual2, formatId);
            // 执行rm2上的事务分支
            rm2.start(xid2, XAResource.TMNOFLAGS);
            // 业务2:插入user_msg表
            PreparedStatement ps2 = conn2.prepareStatement("INSERT into user_msg VALUES ('88', '99', 'user99的备注')");
            ps2.execute();
            rm2.end(xid2, XAResource.TMSUCCESS);

            // ===================两阶段提交================================
            // phase1:询问所有的RM 准备提交事务分支
            int rm1Prepare = rm1.prepare(xid1);
            int rm2Prepare = rm2.prepare(xid2);
            // phase2:提交所有事务分支
            boolean onePhase = false;
            //TM判断有2个事务分支,所以不能优化为一阶段提交
            if (rm1Prepare == XAResource.XA_OK
                && rm2Prepare == XAResource.XA_OK
                ) {
                //所有事务分支都prepare成功,提交所有事务分支
                rm1.commit(xid1, onePhase);
                rm2.commit(xid2, onePhase);
            } else {
                //如果有事务分支没有成功,则回滚
                rm1.rollback(xid1);
                rm1.rollback(xid2);
            }
        } catch (XAException e) {
            // 如果出现异常,也要进行回滚
            e.printStackTrace();
        }
    }
}

 

添加评论

Loading