理解分布式事务JTA原理参见:

  JTA实现产品介绍:

  Atomikos官网无法访问,不过Maven中央库中具atomikos包。Atomikos集成Spring,Hibernate,Mybatis网上文章比较多,本文是通过JavaSE的方式借用Spring配置来测试Atomikos对JTA的实现。

 下面做一件事,就是两(+)个数据库,在一个事务里对其分别对数据库操作验证操作的原子性,即要么两个数据库的操作都成功,要么都失败。

 1.准备工作

  1.1 Maven pom.xml中添加依赖包

  atomikos:(目前最新版)

  

com.atomikos
transactions-jdbc
3.9.3

 jar依赖图:

 

 

 Postgresql数据库驱动:

org.postgresql
postgresql
9.2-1004-jdbc4

 注:文中使用的是Postgresql V9.2

 

 javax.transaction.transaction-api.1.1:

 

 

 

javax
javaee-api
7.0

 注:根据需要选择其一即可。

 Spring,Junit依赖这里省略。

 1.2 创建数据库以及表

   数据库分别是:javaee,tomdb

   

 2.在项目中添加配置文件

  spring-jta.xml 和transaction.properties(模版见文中附件)文件,spring-jta.xml在src/main/resources/integration下,transaction.properties在src/main/resources/下。

 2.1在spring-jta.xml中配置两个XADataSource:

 

postgres
postgres
localhost
5432
tomdb
postgres
postgres
localhost
5432
javaee

 说明:

 Postgresql的max_prepared_transactions参数值默认是0,要开启XA需要设置该值至少和max_connections参数值一样大,该参数在PostgreSQL\9.3\data\postgresql.conf文件中。

 PGXADataSource的父类BaseDataSource没有url属性,可需要分别设置serverName,portNumber,databaseName等属性。不同的数据库驱动有不同的实现方法。

 2.2 配置事务管理对象和UserTransaction接口实现

 

UserTransactionManager

 

 上面三个Bean可以独立使用来进行事务控制,具体看下面3。

3. 编写测试

 3.1 使用atomikosUserTransactionManager对象测试(TestAtomikos1.java)

 

package secondriver.springsubway.example.jta;import java.sql.Connection;import java.sql.SQLException;import javax.transaction.HeuristicMixedException;import javax.transaction.HeuristicRollbackException;import javax.transaction.NotSupportedException;import javax.transaction.RollbackException;import javax.transaction.SystemException;import org.junit.BeforeClass;import org.junit.Test;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;import com.atomikos.icatch.jta.UserTransactionManager;import com.atomikos.jdbc.AtomikosDataSourceBean;public class TestAtomikos1 {	public static ApplicationContext ctx;	@BeforeClass	public static void beforeClass() {		ctx = new ClassPathXmlApplicationContext(				"classpath:integration/spring-jta.xml");	}	public static void afterClass() {		ctx = null;	}	@Test	public void test1() {		exe("abc", "abc");	}	@Test	public void test2() {		exe("123=", "123");	}	public void exe(String av, String bv) {		AtomikosDataSourceBean adsA = (AtomikosDataSourceBean) ctx.getBean("a");		AtomikosDataSourceBean adsB = (AtomikosDataSourceBean) ctx.getBean("b");		Connection connA;		Connection connB;		UserTransactionManager utm = (UserTransactionManager) ctx				.getBean("atomikosUserTransactionManager");		try {			utm.begin();			connA = adsA.getConnection();			connB = adsB.getConnection();			connA.prepareStatement(					"insert into jta_temp (value) values('" + av + "')")					.execute();			connB.prepareStatement(					"insert into jta_temp (value) values('" + bv + "')")					.execute();			utm.commit();		} catch (SQLException | NotSupportedException | SystemException				| SecurityException | IllegalStateException | RollbackException				| HeuristicMixedException | HeuristicRollbackException e) {			e.printStackTrace();			try {				utm.rollback();			} catch (IllegalStateException | SecurityException					| SystemException e1) {				e1.printStackTrace();			}		}	}}

3.2使用Spring的JtaUserTransactionManager对象测试(TestAtomikos2.java 修改TestAtomikos1.java中的exe方法即可)

 

@Test	public void test1() {		exe("abc", "abc");	}	@Test	public void test2() {		exe("123=", "123");	}
public void exe(String av, String bv) {		TransactionFactory txm = (TransactionFactory) ctx				.getBean("transactionManager");		JdbcTemplate a = (JdbcTemplate) ctx.getBean("jdbcTemplateA");		JdbcTemplate b = (JdbcTemplate) ctx.getBean("jdbcTemplateB");		Transaction tx =null;		try {		 tx = txm.createTransaction("tx-name-define", 10000);			a.update("insert into jta_temp (value) values('" + av + "')");			b.update("insert into jta_temp (value) values('" + bv + "')");			tx.commit();		} catch (NotSupportedException | SystemException | SecurityException				| RollbackException | HeuristicMixedException				| HeuristicRollbackException e) {			e.printStackTrace();			if(tx!=null){			    try {					tx.rollback();				} catch (IllegalStateException | SystemException e1) {					e1.printStackTrace();				}			}		}	}

 

3.3使用atomikosUserTransaction Bean对象进行测试(TestAtomikos3.java 修改TestAtomikos1.java中的exe方法即可)

@Test	public void test1() {		exe("abc", "abc");	}	@Test	public void test2() {		exe("123", "123=");	}

public void exe(String av, String bv) {		AtomikosDataSourceBean adsA = (AtomikosDataSourceBean) ctx.getBean("a");		AtomikosDataSourceBean adsB = (AtomikosDataSourceBean) ctx.getBean("b");		Connection connA;		Connection connB;		UserTransaction utx = (UserTransaction) ctx				.getBean("atomikosUserTransaction");		try {			utx.begin();			connA = adsA.getConnection();			connB = adsB.getConnection();			connA.prepareStatement(					"insert into jta_temp (value) values('" + av + "')")					.execute();			connB.prepareStatement(					"insert into jta_temp (value) values('" + bv + "')")					.execute();			utx.commit();		} catch (SQLException | NotSupportedException | SystemException				| SecurityException | IllegalStateException | RollbackException				| HeuristicMixedException | HeuristicRollbackException e) {			e.printStackTrace();			try {				utx.rollback();			} catch (IllegalStateException | SecurityException					| SystemException e1) {				e1.printStackTrace();			}		}	}

 使用上述三种UserTransaction进行测试,其中test1方法是成功执行,test2方法是执行失败的(因为插入的值长度超过的字段长度限制)。通过分析之后,如果分布式事物控制正确,那么数据库中写入的值对于两张不同的表而言,是没有数字值被写入的。如图所示:

 

 在测试过程中,经过对比确实达到了分布式事务控制的效果。

 整个操作的架构图如下(摘自Atomikos Blog)

 

 关于JTA原理文章开始提到的那篇文章,写的很详细和清晰,可以细细阅读和理解。