注意Spring事务这一点,避免出现大事务
2021-03-22 04:27
标签:方式 通过 order 帮助 debug crud manager 好的 epo 本篇文章主要分享压测的(高并发)时候发现的一些问题。之前的两篇文章已经讲述了在高并发的情况下,消息队列和数据库连接池的一些总结和优化,有兴趣的可以在我的公众号中去翻阅。废话不多说,进入正题。 事务,想必各位CRUD之王对其并不陌生,基本上有多个写请求的都需要使用事务,而Spring对于事务的使用又特别的简单,只需要一个@Transactional注解即可,如下面的例子: 在我们创建订单的时候, 通常需要将订单和订单项放在同一个事务里面保证其满足ACID,这里我们只需要在我们创建订单的方法上面写上事务注解即可。 对于上面的创建订单的代码,如果现在需要新增一个需求,再创建订单之后发送一个消息到消息队列或者调用一个RPC,你会怎么做呢?很多同学首先会想到,直接在事务方法里面进行调用: 这种代码在很多人写的业务中都会出现,事务中嵌套rpc,嵌套一些非DB的操作,一般情况下这么写的确也没什么问题,一旦非DB写操作出现比较慢,或者流量比较大,就会出现大事务的问题。由于事务的一直不提交,就会导致数据库连接被占用。这个时候你可能会问,我扩大点数据库连接不就行了吗,100个不行就上1000个,再上篇文章已经讲过数据库连接池大小依然会影响我们数据库的性能,所以,数据库连接并不是想扩多少扩多少。 那我们应该怎么对其进行优化呢?在这里可以仔细想想,我们的非db操作,其实是不满足我们事务的ACID的,那么干嘛要写在事务里面,所以这里我们可以将其提取出来。 在这个方法里面先去调用事务的创建订单,然后再去调用其他非DB操作。如果我们现在想要更复杂一点的逻辑,比如创建订单成功就发送成功的RPC请求,失败就发送失败的RPC请求,由上面的代码我们可以做如下转化: 通常我们会捕获异常,或者根据返回值来进行一些特殊处理,这里的实现需要显示的捕获异常,并且再次抛出,这种方式不是很优雅,那么怎么才能更好的写这种话逻辑呢? TransactionSynchronizationManager 在Spring的事务中刚好提供了一些工具方法,来帮助我们完成这种需求。在TransactionSynchronizationManager中提供了让我们对事务注册callBack的方法: TransactionSynchronization也就是我们事务的callBack,提供了一些扩展点给我们: 我们可以利用afterComplettion方法实现我们上面的业务逻辑: 这里我们直接实现了afterCompletion,通过事务的status进行判断,我们应该具体发送哪个RPC。当然我们可以进一步封装TransactionSynchronizationManager.registerSynchronization将其封装成一个事务的Util,可以使我们的代码更加简洁。 通过这种方式我们不必把所有非DB操作都写在方法之外,这样代码更具有逻辑连贯性,更加易读,并且优雅。 这个注册事务的回调代码在我们在我们的业务逻辑中经常会出现,比如某个事务做完之后的刷新缓存,发送消息队列,发送通知消息等等,在日常的使用中,大家用这个基本也没出什么问题,但是在打压的过程中,发现了这一块出现了瓶颈,耗时特别久,通过一系列的监测,发现是从数据库连接池获取连接等待的时间较长,最终我们定位到了afterCompeltion这个动作,居然没有归还数据库连接。 在Spring的AbstractPlatformTransactionManager中,对commit处理的代码如下: 这里我们只需要关注 倒数几行代码即可,可以发现我们的triggerAfterCompletion,是倒数第二个执行逻辑,当执行完所有的代码之后就会执行我们的cleanupAfterCompletion,而我们的归还数据库连接也在这段代码之中,这样就导致了我们获取数据库连接变慢。 对于上面的问题如何优化呢?这里有三种方案可以进行优化: 通过第三种方法基本只需要把我们注册事务回调的地方都进行替换就可以正常使用了。 说了这么久大事务,到底什么才是大事务呢?简单点就是事务时间运行得长,那么就是大事务。一般来说导致事务时间运行时间长的因素不外乎下面几种: 上面的三种情况,前面两种可能来说不是特别常见,但是第三种事务中有很多非DB操作,这个是我们非常常见,通常出现这个情况的原因很多时候是我们自己习惯规范,初学者或者一些经验不丰富的人写代码,往往会先写一个大方法,直接在这个方法加上事务注解,然后再往里面补充,哪管他是什么逻辑,一把梭,就像下面这张图一样: 当然还有些人是想搞什么分布式事务,可惜用错了方法,对于分布式事务可以关注Seata,同样可以用一个注解就能帮助你做到分布式事务。 其实最后想想,为什么会出现这种问题呢?一般大家的理解都是会认为都是在完成之后做的了,数据库连接肯定早都释放了,但是事实并非如此。所以,我们使用很多API的时候不能望文生义,如果其没有详细的doc,那么你应该更加深入了解其实现细节。 当然最后希望大家写代码之前尽量还是不要一把梭,认真对待每一句代码。 如果大家觉得这篇文章对你有帮助,你的关注和转发是对我最大的支持,O(∩_∩)O: 注意Spring事务这一点,避免出现大事务 标签:方式 通过 order 帮助 debug crud manager 好的 epo 原文地址:https://blog.51cto.com/14980978/2544649@Transactional
public int createOrder(Order order){
orderDbStorage.save(order);
orderItemDbStorage.save(order.getItems());
return order.getId();
}
事务的合理使用
@Transactional
public int createOrder(Order order){
orderDbStorage.save(order);
orderItemDbStorage.save(order.getItems());
sendRpc();
sendMessage();
return order.getId();
}
public int createOrder(Order order){
createOrderService.createOrder(order);
sendRpc();
sendMessage();
}
public int createOrder(Order order){
try {
createOrderService.createOrder(order);
sendSuccessedRpc();
}catch (Exception e){
sendFailedRpc();
throw e;
}
}
public static void registerSynchronization(TransactionSynchronization synchronization)
throws IllegalStateException {
Assert.notNull(synchronization, "TransactionSynchronization must not be null");
if (!isSynchronizationActive()) {
throw new IllegalStateException("Transaction synchronization is not active");
}
synchronizations.get().add(synchronization);
}
public interface TransactionSynchronization extends Flushable {
int STATUS_COMMITTED = 0;
int STATUS_ROLLED_BACK = 1;
int STATUS_UNKNOWN = 2;
/**
* 挂起时触发
*/
void suspend();
/**
* 挂起事务抛出异常的时候 会触发
*/
void resume();
@Override
void flush();
/**
* 在事务提交之前触发
*/
void beforeCommit(boolean readOnly);
/**
* 在事务完成之前触发
*/
void beforeCompletion();
/**
* 在事务提交之后触发
*/
void afterCommit();
/**
* 在事务完成之后触发
*/
void afterCompletion(int status);
}
@Transactional
public int createOrder(Order order){
orderDbStorage.save(order);
orderItemDbStorage.save(order.getItems());
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCompletion(int status) {
if (status == STATUS_COMMITTED){
sendSuccessedRpc();
}else {
sendFailedRpc();
}
}
});
return order.getId();
}
afterCompletion的坑
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
boolean globalRollbackOnly = false;
if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
globalRollbackOnly = status.isGlobalRollbackOnly();
}
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
status.releaseHeldSavepoint();
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
doCommit(status);
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn‘t get a corresponding exception from commit.
if (globalRollbackOnly) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
cleanupAfterCompletion(status);
}
}
如何优化
@Transactional
@MethodCallBack
public int createOrder(Order order){
orderDbStorage.save(order);
orderItemDbStorage.save(order.getItems());
MethodCallbackHelper.registerOnSuccess(() -> sendSuccessedRpc());
MethodCallbackHelper.registerOnThrowable(throwable -> sendFailedRpc());
return order.getId();
}
再谈大事务
最后