分布式事务(Seata)原理详解篇

网站建设4年前发布
48 0 0

2023030610412575a3f3f888fc69f90f5154bf33e1f353048592147,今天这篇,就给大家分析一下Seata的源码是如何一步一步实现的。读源码的时候我们需要俯瞰起全貌,不要去扣一个一个的细节,这样我们学习起来会快捷而且有效率,我们学习源码需要掌握的是整体思路和核心点。,首先 Seata 客户端启动一般分为以下几个流程:,在这篇源码的讲解中,我们主要以AT模式为主导,官网也是主推AT模式,我们在上篇的文章中也讲解过,感兴趣的小伙伴可以去看一看​​分布式事务(Seata) 四大模式详解​​,在官网中也提供了对应的流程地址:https://seata.io/zh-cn/docs/dev/mode/at-mode.html ,在这里我们只是做一些简单的介绍,AT模式主要分为两个阶段:,如果TC收到的是回滚请求:,如果没问题,执行提交操作:,2023030610412681a0af0825c70dc59856449e9db8111696ee4d73720230306104127d7e6dc874e0dcc4b407469944febf107832b86760,接下来,我们就需要从官网中去下载源码,下载地址:https://seata.io/zh-cn/blog/download.html,选择 source 即可,下载完成之后,通过IDEA打开项目。,2023030610412789d68f44017483376fa8458bf8cdbd7496db3a7612023030612064078153ac71155309fee545913c5e8a7bfc02b0f255,源码下载下来之后,我们应该如何去找入口呢?首先我们需要找到对应引入的 Seata 包 spring-alibaba-seata,我们在回想一下,我们开启事务的时候,是不是添加过一个@GlobalTransactional的注解,这个注解就是我们入手的一个点,我们在 spring.factories 中看到有一个 GlobalTransactionAutoConfiguration,这个就是我们需要关注的点,也就是我们源码的入口。,2023030610412839192d1382c8e2fc1ed9416fa0f943d90f7f8f807202303061206426597dfd17dd87755d30060b6d0d434799af7ff370,在 GlobalTransactionAutoConfiguration 中我们找到一个用Bean注入的方法 globalTransactionScanner ,这个就是全局事务扫描器,这个类型主要负责加载配置,注入相关的Bean。,20230306104129f671e36301c838dd61262100c820c33fb372829762023030612064348c886e74fa8b6fec48092c0cd781d5b4848bc209,这里给大家展示了当前GlobalTransactionScanner的类关系图,其中我们现在继承了Aop的AbstractAutoProxyCreator类型,在这其中有一个重点方法,这个方法就是判断Bean对象是否需要代理,是否需要增强。,在这其中我们要关心的是 GlobalTransactionScanner 这个类型,这个类型扫描 @GlobalTransactional 注解,并对代理方法进行拦截增强事务的功能。我们就从源码中搜索这个GlobalTransactionScanner类,看看里面具体是做了什么。,InitializingBean:中实现了一个 afterPropertiesSet()方法,在这个方法中,调用了initClient()。,AbstractAutoProxyCreator:APO动态代理,在之前的的Nacos和Sentiel中都有这个代理类,AOP在我们越往深入学习,在学习源码的会见到的越来越多,越来越重要,很多相关代理,都是通过AOP进行增强,在这个类中,我们需要关注有一个wrapIfNecessary()方法, 这个方法主要是判断被代理的bean或者类是否需要代理增强,在这个方法中会调用GlobalTransactionalInterceptor.invoke()进行带来增强。,具体代码如下:,具体流程图如下所示:,2023030610413018d5f50070bafc18f791001f0916e74c5afb2f60020230306104131b8d4d1b972bb9611e03701c8d7297c3a864d32548,在上面我们讲解到 GlobalTransactionalInterceptor 作为全局事务拦截器,一旦执行拦截,就会进入invoke方法,其中,我们会做 @GlobalTransactional 注解的判断,如果有这个注解的存在,会执行全局事务和全局锁,再执行全局事务的时候会调用 handleGlobalTransaction 全局事务处理器,获取事务信息,那我们接下来就来看一下 GlobalTransactionalInterceptor.handleGlobalTransaction 到底是如何执行全局事务的。,在这里我们,主要关注一个重点方法 execute() ,这个方法主要用来执行事务的具体流程:,这个位置也是一个非常核心的一个位置,因为我们所有的业务进来以后都会去走这个位置,具体源码如下所示:,这其中的第三步和第四步其实在向 TC(Seata-Server)发起全局事务的提交或者回滚,在这里我们首先关注执行全局事务的 ​​beginTransaction()​​ 方法。,在来关注其中,向TC发起请求的 tx.begin() 方法,而调用begin()方法的类为:DefaultGlobalTransaction。,再来看一下 transactionManager.begin() 方法,这个时候使用的是 DefaultTransactionManager.begin 默认的事务管理者,来获取XID,传入事务相关的信息 ,最好TC返回对应的全局事务XID,它调用的是DefaultTransactionManager.begin()方法。,在这里我们需要关注一个syncCall,在这里采用的是Netty通讯方式。,具体图解如下:,20230306104131a5fdb0f38378cd8d8f2938e8399802d63a539a6712023030610413367ba60665554832ca411040f9d6c0040669e1f145,在这里我们需要重点了解 GlobalTransactionScanner 这个类型,在这个类型中继承了一些接口和抽象类,这个类主要作用就是扫描有注解的Bean,并做AOP增强。,在上面的环节中,我们讲解了Seata AT模式2PC的执行流程,那么现在我们就来带大家了解一下关于AT数据源代理的信息,这也是AT模式中非常关键的一个重要知识点,大家可以拿起小本子,记下来。,首先AT模式的核心主要分为一下两个:,关于第一点我们已经分析清楚了,第二点就是关于AT模式如何解析SQL并写入undoLog中,但是在这之前,我们需要知道Seata是如何选择数据源,并进行数据源代理的。虽然全局事务拦截成功后最终还是执行了业务方法进行SQL提交和操作,但是由于Seata对数据源进行了代理,所以SQL的解析和undoLog的操作,是在数据源代理中进行完成的。,数据源代理是Seata中一个非常重要的知识点,在分布式事务运行过程中,undoLog的记录、资源的锁定,用户都是无感知的,因为这些操作都是数据源的代理中完成了,恰恰是这样,我们才要去了解,这样不仅有利于我们了解Seata的核心操作,还能对以后源码阅读有所帮助,因为其实很多底层代码都会去使用这样用户无感知的方式(代理)去实现。,同样,我们在之前的寻找源码入口的时候,通过我们项目中引入的jar找到一个 SeataAutoConfiguration 类,我们在里面找到一个SeataDataSourceBeanPostProcessor(),这个就是我们数据源代理的入口方法。,20230306104133459123033f235b956a5625eaee5786feb765e642120230306120643e7cf2af85b31233ba3d822056bdd7bbcb810c3963,我们进入SeataDataSourceBeanPostProcessor类里面,发现继承了一个 BeanPostProcessor ,这个接口我们应该很熟悉,这个是Sprng的拓展接口,所有的Bean对象,都有进入两个方法 postProcessAfterInitialization() 和 postProcessBeforeInitialization() 这两个方法都是由 BeanPostProcessor提供的,这两个方法,一个是初始化之前执行Before。一个是在初始化之后执行After,主要用来对比我们的的Bean是否为数据源代理对象。,在这里我们需要关注到一个postProcessAfterInitialization.proxyDataSource() 方法,这个里面。,这里有一个DataSourceProxy代理对象,我们需要看的就是这个类,这个就是我们数据库代理的对象,我们从我们下载的源码项目中,搜索这个代理对象,当我们打开这个类的目录时发现,除了这个,还有ConnectionProxy 连接对象、StatementProxy、PreparedStatementProxy SQL执行对象,这些都被Seata进行了代理,为什么要对这些都进行代理,代理的目的其实为了执行Seata的业务逻辑,生成undoLog,全局事务的开启,事务的提交回滚等操作。,2023030610413591747a0027141a4ee9e165c0cf9599182d7edc97420230306104136c23496854103d75edf0578d428ecf6e22a7cc4663,DataSourceProxy 具体做了什么,主要功能有哪些,我们来看一下。他在源码中是如何体现的,我们需要关注的是init()。,从上面我们可以看出,他主要做了以下几点的增强:,在这三个增强功能里面,第三个是最重要的,在AT模式里面,会自动记录undoLog,资源锁定,都是通过ConnectionProxy完成的,除此之外 DataSrouceProxy重写了一个方法 getConnection,因为这里返回的是一个 ConnectionProxy,而不是原生的Connection。,ConnectionProxy 继承 AbstractConnectionProxy ,在这个父类中有很多公用的方法,在这个父类有 PreparedStatementProxy 、StatementProxy 、DataSourceProxy。,20230306104136646772735b9f64bfbee4097e374a97a5249aa23002023030610433424bd215594a4288a0da101c42739b83cce920a403,所以我们需要先来看一下AbstractConnectionProxy,因为这里封装了需要我们用到的通用方法和逻辑,在其中我们需要关注的主要在于 PreparedStatementProxy 和 StatementProxy ,在这里的逻辑主要是数据源连接的步骤,连接获取,创建执行对象等等。,在这两个代理对象中,都用到了以下几个方法:,在这些方法中都调用了 ExecuteTemplate.execute(),所以我们就看一下在 ExecuteTemplate类中具体是做了什么操作:,在 ExecuteTemplate就一个 execute(),Seata将SQL执行委托给不同的执行器(模板),Seata提供了6种执行器也就是我们代码 case 中(INSERT,UPDATE,DELETE,SELECT_FOR_UPDATE,INSERT_ON_DUPLICATE_UPDATE),这些执行器的父类都是AbstractDMLBaseExecutor。,关系图如下:,20230306104335b71bb1f94c6241024c7261129fbf50f63a5da5914,然后我们找到 rs = executor.execute(args); 最终执行的方法,找到最顶级的父类BaseTransactionalExecutor.execute()。,在根据doExecute(args);找到其中的重写方法 AbstractDMLBaseExecutor.doExecute()。,对于数据库而言,本身都是自动提交的,所以我们进入executeAutoCommitTrue()。,connectionProxy.changeAutoCommit()方法,修改为手动提交后,我们看来最关键的代码executeAutoCommitFalse()。,我们再回到executeAutoCommitTrue中,去看看提交做了哪些操作connectionProxy.commit()。,进入到doCommit()中。,作为分布式事务,一定是存在全局事务的,所以我们进入 processGlobalTransactionCommit()。,其中register()方法就是注册分支事务的方法,同时还会将undoLog写入数据库和执行提交等操作。,然后我们在回到processGlobalTransactionCommit中,看看写入数据库中的flushUndoLogs()。,具体写入方法,此时我们使用的是MySql,所以执行的是MySql实现类MySQLUndoLogManager.insertUndoLogWithNormal()。,具体流程如下所示:,20230306120644d18b545449927cf1f9a4797f20190ecc1018ac935,我们找到Server.java 这里就是启动入口,在这个入口中找到协调者,因为TC整体的操作就是协调整体的全局事务。,在DefaultCoordinator类中我们找到 一个doGlobalBegin 这个就是处理全局事务开始的方法,以及全局提交 doGlobalCommit 和全局回滚 doGlobalRollback。,在这里我们首先关注 doGlobalBegin 中 core.begin()。,然后我们在来看一下SessionHolder.getRootSessionManager()。,在这里他其实读取的是DB模式下 io.seata.server.session.SessionManager文件的内容。,20230306104137432e633658d0d5a000d74649808894102d488d689,我们在回到begin方法中,去查看session.begin()。,这里我们来看一下 onBegin() 方法,调用的是父级的方法,在这其中我们要关注 addGlobalSession() 方法,但是要注意,这里我们用的是db模式所以调用的是db模式的 DateBaseSessionManager。,然后在看查询其中关键的方法DataBaseTransactionStoreManager.writeSession()。,我们就看第一次进去的方法logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session))。,在这里有一个 GlobalTransactionDO 对象,里面有xid、transactionId 等等,到这里是不是就很熟悉了。,202303061041387537ad6745fe9259e8e4925e749f8e985f2e7b44020230306104138f70a70d751830a3eaa5253cff35a6c8a825929691,还记得我们第一次使用Seata的时候会创建三张表:,而这里就是对应我们的global_table表,其他两个也是差不多,都是一样的操作。,20230306120645c17f5f016a2da019994186a40822fd94bbd854231,20230306104139e89d2471292fb3323c5421eb9f2b4df964fd58867流程图如下:2023030612064603edea09991c1e313b2619823720a9dff80f4b482,2023030610414031a588358de676b80b52422bb816193c2fd4ff163,完整流程图:,20230306104141b195c3b14925d1a6fc42290961dea16718209a108,对于Seata源码来说主要是了解从哪里入口以及核心点在哪里,遇到有疑问的,可以Debug,对于Seata AT模式,我们主要掌握的核心点是,围绕这两点去看的话,会有针对性一点,到这里我们的Seata源码就讲解完了。

© 版权声明

相关文章