,今天这篇,就给大家分析一下Seata的源码是如何一步一步实现的。读源码的时候我们需要俯瞰起全貌,不要去扣一个一个的细节,这样我们学习起来会快捷而且有效率,我们学习源码需要掌握的是整体思路和核心点。,首先 Seata 客户端启动一般分为以下几个流程:,在这篇源码的讲解中,我们主要以AT模式为主导,官网也是主推AT模式,我们在上篇的文章中也讲解过,感兴趣的小伙伴可以去看一看分布式事务(Seata) 四大模式详解,在官网中也提供了对应的流程地址:https://seata.io/zh-cn/docs/dev/mode/at-mode.html ,在这里我们只是做一些简单的介绍,AT模式主要分为两个阶段:,如果TC收到的是回滚请求:,如果没问题,执行提交操作:,![]()
,接下来,我们就需要从官网中去下载源码,下载地址:https://seata.io/zh-cn/blog/download.html,选择 source 即可,下载完成之后,通过IDEA打开项目。,![]()
,源码下载下来之后,我们应该如何去找入口呢?首先我们需要找到对应引入的 Seata 包 spring-alibaba-seata,我们在回想一下,我们开启事务的时候,是不是添加过一个@GlobalTransactional的注解,这个注解就是我们入手的一个点,我们在 spring.factories 中看到有一个 GlobalTransactionAutoConfiguration,这个就是我们需要关注的点,也就是我们源码的入口。,![]()
,在 GlobalTransactionAutoConfiguration 中我们找到一个用Bean注入的方法 globalTransactionScanner ,这个就是全局事务扫描器,这个类型主要负责加载配置,注入相关的Bean。,![]()
,这里给大家展示了当前GlobalTransactionScanner的类关系图,其中我们现在继承了Aop的AbstractAutoProxyCreator类型,在这其中有一个重点方法,这个方法就是判断Bean对象是否需要代理,是否需要增强。,在这其中我们要关心的是 GlobalTransactionScanner 这个类型,这个类型扫描 @GlobalTransactional 注解,并对代理方法进行拦截增强事务的功能。我们就从源码中搜索这个GlobalTransactionScanner类,看看里面具体是做了什么。,InitializingBean:中实现了一个 afterPropertiesSet()方法,在这个方法中,调用了initClient()。,AbstractAutoProxyCreator:APO动态代理,在之前的的Nacos和Sentiel中都有这个代理类,AOP在我们越往深入学习,在学习源码的会见到的越来越多,越来越重要,很多相关代理,都是通过AOP进行增强,在这个类中,我们需要关注有一个wrapIfNecessary()方法, 这个方法主要是判断被代理的bean或者类是否需要代理增强,在这个方法中会调用GlobalTransactionalInterceptor.invoke()进行带来增强。,具体代码如下:,具体流程图如下所示:,![]()
,在上面我们讲解到 GlobalTransactionalInterceptor 作为全局事务拦截器,一旦执行拦截,就会进入invoke方法,其中,我们会做 @GlobalTransactional 注解的判断,如果有这个注解的存在,会执行全局事务和全局锁,再执行全局事务的时候会调用 handleGlobalTransaction 全局事务处理器,获取事务信息,那我们接下来就来看一下 GlobalTransactionalInterceptor.handleGlobalTransaction 到底是如何执行全局事务的。,在这里我们,主要关注一个重点方法 execute() ,这个方法主要用来执行事务的具体流程:,这个位置也是一个非常核心的一个位置,因为我们所有的业务进来以后都会去走这个位置,具体源码如下所示:,这其中的第三步和第四步其实在向 TC(Seata-Server)发起全局事务的提交或者回滚,在这里我们首先关注执行全局事务的 beginTransaction() 方法。,在来关注其中,向TC发起请求的 tx.begin() 方法,而调用begin()方法的类为:DefaultGlobalTransaction。,再来看一下 transactionManager.begin() 方法,这个时候使用的是 DefaultTransactionManager.begin 默认的事务管理者,来获取XID,传入事务相关的信息 ,最好TC返回对应的全局事务XID,它调用的是DefaultTransactionManager.begin()方法。,在这里我们需要关注一个syncCall,在这里采用的是Netty通讯方式。,具体图解如下:,![]()
,在这里我们需要重点了解 GlobalTransactionScanner 这个类型,在这个类型中继承了一些接口和抽象类,这个类主要作用就是扫描有注解的Bean,并做AOP增强。,在上面的环节中,我们讲解了Seata AT模式2PC的执行流程,那么现在我们就来带大家了解一下关于AT数据源代理的信息,这也是AT模式中非常关键的一个重要知识点,大家可以拿起小本子,记下来。,首先AT模式的核心主要分为一下两个:,关于第一点我们已经分析清楚了,第二点就是关于AT模式如何解析SQL并写入undoLog中,但是在这之前,我们需要知道Seata是如何选择数据源,并进行数据源代理的。虽然全局事务拦截成功后最终还是执行了业务方法进行SQL提交和操作,但是由于Seata对数据源进行了代理,所以SQL的解析和undoLog的操作,是在数据源代理中进行完成的。,数据源代理是Seata中一个非常重要的知识点,在分布式事务运行过程中,undoLog的记录、资源的锁定,用户都是无感知的,因为这些操作都是数据源的代理中完成了,恰恰是这样,我们才要去了解,这样不仅有利于我们了解Seata的核心操作,还能对以后源码阅读有所帮助,因为其实很多底层代码都会去使用这样用户无感知的方式(代理)去实现。,同样,我们在之前的寻找源码入口的时候,通过我们项目中引入的jar找到一个 SeataAutoConfiguration 类,我们在里面找到一个SeataDataSourceBeanPostProcessor(),这个就是我们数据源代理的入口方法。,![]()
,我们进入SeataDataSourceBeanPostProcessor类里面,发现继承了一个 BeanPostProcessor ,这个接口我们应该很熟悉,这个是Sprng的拓展接口,所有的Bean对象,都有进入两个方法 postProcessAfterInitialization() 和 postProcessBeforeInitialization() 这两个方法都是由 BeanPostProcessor提供的,这两个方法,一个是初始化之前执行Before。一个是在初始化之后执行After,主要用来对比我们的的Bean是否为数据源代理对象。,在这里我们需要关注到一个postProcessAfterInitialization.proxyDataSource() 方法,这个里面。,这里有一个DataSourceProxy代理对象,我们需要看的就是这个类,这个就是我们数据库代理的对象,我们从我们下载的源码项目中,搜索这个代理对象,当我们打开这个类的目录时发现,除了这个,还有ConnectionProxy 连接对象、StatementProxy、PreparedStatementProxy SQL执行对象,这些都被Seata进行了代理,为什么要对这些都进行代理,代理的目的其实为了执行Seata的业务逻辑,生成undoLog,全局事务的开启,事务的提交回滚等操作。,![]()
,DataSourceProxy 具体做了什么,主要功能有哪些,我们来看一下。他在源码中是如何体现的,我们需要关注的是init()。,从上面我们可以看出,他主要做了以下几点的增强:,在这三个增强功能里面,第三个是最重要的,在AT模式里面,会自动记录undoLog,资源锁定,都是通过ConnectionProxy完成的,除此之外 DataSrouceProxy重写了一个方法 getConnection,因为这里返回的是一个 ConnectionProxy,而不是原生的Connection。,ConnectionProxy 继承 AbstractConnectionProxy ,在这个父类中有很多公用的方法,在这个父类有 PreparedStatementProxy 、StatementProxy 、DataSourceProxy。,![]()
,所以我们需要先来看一下AbstractConnectionProxy,因为这里封装了需要我们用到的通用方法和逻辑,在其中我们需要关注的主要在于 PreparedStatementProxy 和 StatementProxy ,在这里的逻辑主要是数据源连接的步骤,连接获取,创建执行对象等等。,在这两个代理对象中,都用到了以下几个方法:,在这些方法中都调用了 ExecuteTemplate.execute(),所以我们就看一下在 ExecuteTemplate类中具体是做了什么操作:,在 ExecuteTemplate就一个 execute(),Seata将SQL执行委托给不同的执行器(模板),Seata提供了6种执行器也就是我们代码 case 中(INSERT,UPDATE,DELETE,SELECT_FOR_UPDATE,INSERT_ON_DUPLICATE_UPDATE),这些执行器的父类都是AbstractDMLBaseExecutor。,关系图如下:,
,然后我们找到 rs = executor.execute(args); 最终执行的方法,找到最顶级的父类BaseTransactionalExecutor.execute()。,在根据doExecute(args);找到其中的重写方法 AbstractDMLBaseExecutor.doExecute()。,对于数据库而言,本身都是自动提交的,所以我们进入executeAutoCommitTrue()。,connectionProxy.changeAutoCommit()方法,修改为手动提交后,我们看来最关键的代码executeAutoCommitFalse()。,我们再回到executeAutoCommitTrue中,去看看提交做了哪些操作connectionProxy.commit()。,进入到doCommit()中。,作为分布式事务,一定是存在全局事务的,所以我们进入 processGlobalTransactionCommit()。,其中register()方法就是注册分支事务的方法,同时还会将undoLog写入数据库和执行提交等操作。,然后我们在回到processGlobalTransactionCommit中,看看写入数据库中的flushUndoLogs()。,具体写入方法,此时我们使用的是MySql,所以执行的是MySql实现类MySQLUndoLogManager.insertUndoLogWithNormal()。,具体流程如下所示:,
,我们找到Server.java 这里就是启动入口,在这个入口中找到协调者,因为TC整体的操作就是协调整体的全局事务。,在DefaultCoordinator类中我们找到 一个doGlobalBegin 这个就是处理全局事务开始的方法,以及全局提交 doGlobalCommit 和全局回滚 doGlobalRollback。,在这里我们首先关注 doGlobalBegin 中 core.begin()。,然后我们在来看一下SessionHolder.getRootSessionManager()。,在这里他其实读取的是DB模式下 io.seata.server.session.SessionManager文件的内容。,
,我们在回到begin方法中,去查看session.begin()。,这里我们来看一下 onBegin() 方法,调用的是父级的方法,在这其中我们要关注 addGlobalSession() 方法,但是要注意,这里我们用的是db模式所以调用的是db模式的 DateBaseSessionManager。,然后在看查询其中关键的方法DataBaseTransactionStoreManager.writeSession()。,我们就看第一次进去的方法logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session))。,在这里有一个 GlobalTransactionDO 对象,里面有xid、transactionId 等等,到这里是不是就很熟悉了。,![]()
,还记得我们第一次使用Seata的时候会创建三张表:,而这里就是对应我们的global_table表,其他两个也是差不多,都是一样的操作。,
,
流程图如下:
,
,完整流程图:,
,对于Seata源码来说主要是了解从哪里入口以及核心点在哪里,遇到有疑问的,可以Debug,对于Seata AT模式,我们主要掌握的核心点是,围绕这两点去看的话,会有针对性一点,到这里我们的Seata源码就讲解完了。
© 版权声明
文章版权归作者所有,未经允许请勿转载。