Skip to content

Commit 422e9d1

Browse files
committed
提交笔记
1 parent 2d57e71 commit 422e9d1

File tree

67 files changed

+24511
-3
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+24511
-3
lines changed
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
---
2+
title: Rocketmq实战
3+
date: 2024-12-30 19:07:05
4+
updated: 2024-12-30 19:07:05
5+
tags:
6+
- 分布式
7+
- Rocketmq
8+
comments: true
9+
categories:
10+
- 分布式
11+
- Rocketmq
12+
thumbnail: https://cdn.jsdelivr.net/gh/hackerHiJu/note-picture@main/note-picture/%25E5%25A4%25A9%25E7%25A9%25BA.png
13+
---
14+
15+
# RocketMq
16+
17+
![image-20210331151901841](https://cdn.jsdelivr.net/gh/hackerHiJu/note-picture@main/note-picture/image-20210331151901841.png)
18+
19+
## 一、基本概念
20+
21+
### 1. 消息模型
22+
23+
Producer、Broker、Consumer三部分组成
24+
25+
### 2. Producer(消息生产者)
26+
27+
RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。
28+
29+
### 3. Consumer(消息消费者)
30+
31+
一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:**拉取式消费 (pull consumer)****推动式消费(push consumer)**
32+
33+
### 4. Topic(主题)
34+
35+
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。每个Topic默认都会创建4个队列
36+
37+
### 5. Broker Server(代理服务器)
38+
39+
消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
40+
41+
### 6. Name Server(名字服务)
42+
43+
名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。
44+
45+
### 7. 拉取式消费(Pull Consumer)
46+
47+
Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。
48+
49+
### 8. 推动式消费(Push Consumer)
50+
51+
Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。基于拉取式消费深度封装,进行长轮询的拉取消息。
52+
53+
### 9. 生产者组(Producer Group)
54+
55+
同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事物消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
56+
57+
### 10. 消费者组(Consumer Group)
58+
59+
同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费 (Clustering)和广播消费(Broadcasting)
60+
61+
### 11. 集群消费(Clustering)
62+
63+
集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。
64+
65+
### 12. 广播消费(Broadcasting)
66+
67+
广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。
68+
69+
### 13. 普通顺序消息(Normal Ordered Message)
70+
71+
普通顺序消费模式下,消费者通过同一个消费队列收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。
72+
73+
### 14. 严格顺序消息(Strictly Ordered Message)
74+
75+
严格顺序消息模式下,消费者收到的所有消息均是有顺序的。
76+
77+
### 15. 消息(Message)
78+
79+
RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。
80+
81+
### 16. 标签(Tag)
82+
83+
为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。
84+
85+
## 二、RocketMq模块划分
86+
87+
88+
89+
| remoting | 远程通讯模块:netty+fastjson |
90+
| ------------- | -------------------------------------------- |
91+
| 名称 | 作用 |
92+
| broker | broker模块:c端和p端消息存储逻辑 |
93+
| client | 客户端api:producer、consumer端接受与发送api |
94+
| common | 公共组件:常量、基类、数据结构 |
95+
| tools | 运维tools:命令行工具 |
96+
| store | 存储模块:消息、索引、commitlog存储 |
97+
| namesrv | 服务管理模块:服务注册topic等信息存储 |
98+
| logappender | 日志适配模块 |
99+
| example | Demo例子 |
100+
| filtersrv | 消息过滤模块 |
101+
| srvutil | 辅助模块 |
102+
| filter | 过滤模块:消息过滤模块 |
103+
| distribution | 部署、运维相关zip包中的代码 |
104+
| openmessaging | 兼容分布式消息模块 |
105+
106+
## 三、RocketMq特性
107+
108+
### 1. producer端
109+
110+
#### 1.1 发送方式
111+
112+
- Sync:同步方式发送,等待结果后才返回
113+
- Async:异步的方式发送,发送完后,立刻返回
114+
- Oneway:发出去之后什么都不用管直接返回
115+
116+
#### 1.2 发送结果
117+
118+
- SEND_OK:消息发送成功,但是不一定就是可靠,要确保消息不会丢失,还用启用同步Master服务或者同步刷盘,即SYNC_MASTER或SYNC_FLUSH
119+
- FLUSH_DISK_TIMEOUT:消息发送成功但是服务器刷盘超时。消息进入队列了,只有服务宕机才会丢失,如果Broker服务器设置了刷盘方式为同步刷盘,即FlushDiskType=SYNC_FLUSH(默认同步刷盘),超过了默认刷盘时间5s,就返回该状态
120+
- FLUSH_SLAVE_TIMEOUT:消息发送成功,但是服务器同步salve时超时
121+
- SLAVE_NOT_AVAILABLE:消息发送成功,但是此时salve不可用
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
---
2+
title: Seata实战入门与实战
3+
date: 2024-12-30 19:07:05
4+
updated: 2024-12-30 19:07:05
5+
tags:
6+
- 分布式
7+
- Seata
8+
comments: true
9+
categories:
10+
- 分布式
11+
- Seata
12+
thumbnail: https://cdn.jsdelivr.net/gh/hackerHiJu/note-picture@main/note-picture/%25E5%25A4%25A9%25E7%25A9%25BA.png
13+
---
14+
15+
# 分布式事务
16+
17+
## 一、分布式事务的组成部分
18+
19+
- 事务参与者:对应的一个一个的微服务
20+
- 资源服务器:对应一个个微服务的数据库
21+
- 事务管理器:决策各个事务参与者的提交和回滚
22+
23+
### 两阶段提交:
24+
25+
1. 准备阶段:向事务管理器向事务参与者发送预备请求,事务参与者在写本地的redo和undo日志,但是不提交,并且返回准备就绪的信息,最后提交的动作交给第二阶段来进行
26+
2. 提交阶段:如果事务协调者收到失败或者超时的信息,直接给每个参与者发送回滚消息;否则提交消息,最后根据协调者的指令释放所有事务处理过程中使用的资源锁
27+
28+
29+
30+
## 二、项目例子
31+
32+
当前依赖,全局事务XID,不需要手动进行绑定,自动进行传递
33+
34+
```java
35+
<dependency>
36+
<groupId>com.alibaba.cloud</groupId>
37+
&lt;!&ndash;加入spring-cloud-alibaba-seata,解决xid不传递问题&ndash;&gt;
38+
<artifactId>spring-cloud-alibaba-seata</artifactId>
39+
<version>2.2.0.RELEASE</version>
40+
<exclusions>
41+
<exclusion>
42+
<groupId>io.seata</groupId>
43+
<artifactId>seata-spring-boot-starter</artifactId>
44+
</exclusion>
45+
<exclusion>
46+
<groupId>io.seata</groupId>
47+
<artifactId>seata-all</artifactId>
48+
</exclusion>
49+
</exclusions>
50+
</dependency>
51+
52+
```
53+
54+
全局事务XID需要通过过滤器或拦截器进行手动绑定,否则下游服务获取不到全局XID回滚不了
55+
56+
```java
57+
<dependency>
58+
<groupId>io.seata</groupId>
59+
<artifactId>seata-spring-boot-starter</artifactId>
60+
<version>1.3.0</version>
61+
<!-- 这里需要排除自身的seata-all -->
62+
<exclusions>
63+
<exclusion>
64+
<artifactId>seata-all</artifactId>
65+
<groupId>io.seata</groupId>
66+
</exclusion>
67+
</exclusions>
68+
</dependency>
69+
<!-- 导入与之前下载的seata版本一致的包 -->
70+
<dependency>
71+
<groupId>io.seata</groupId>
72+
<artifactId>seata-all</artifactId>
73+
<version>1.3.0</version>
74+
</dependency>
75+
```
76+
77+
OpenFeign进行手动传递XID
78+
79+
```java
80+
@Component
81+
public class FeignConfiguration implements RequestInterceptor {
82+
83+
@Override
84+
public void apply(RequestTemplate requestTemplate) {
85+
requestTemplate.header("XID", RootContext.getXID());
86+
}
87+
}
88+
```
89+
90+
提供者手动绑定XID
91+
92+
```java
93+
@Component
94+
public class SeataFilter implements Filter {
95+
@Override
96+
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
97+
HttpServletRequest request = (HttpServletRequest) servletRequest;
98+
//手动绑定XID
99+
String xid = request.getHeader("XID");
100+
if(StringUtils.isNotBlank(xid)){
101+
RootContext.bind(xid);
102+
}
103+
filterChain.doFilter(servletRequest,servletResponse);
104+
}
105+
}
106+
```
107+
108+
**seata在1.0.0版本之后就不需要手动进行数据源代理,已经被自动代理**
109+
110+
客户端的配置文件
111+
112+
```yml
113+
seata:
114+
enabled: true
115+
tx-service-group: my_first_seata #配置文件中的事务服务组一样
116+
config:
117+
type: nacos # nacos中拉去对应的配置文件
118+
nacos:
119+
server-addr: 192.168.60.46:8849
120+
group: SEATA_GROUP
121+
registry: # 会去nacos中拉去seata-server服务
122+
type: nacos
123+
nacos:
124+
application: seata-server
125+
server-addr: 192.168.60.46:8849
126+
group: SEATA_GROUP
127+
```
128+
129+
**seata1.0.0之后config文件下就移除了nacos-config.txt等文件,改为了config.txt需要手动下载,并且config.txt需要在nacos-config.sh的上一级目录下才能推送到nacos中**
130+
131+
```application
132+
# 只需要修改下面几种配置即可,这里是配置客户端需要拉取的配置文件
133+
service.vgroupMapping.自定义的名称=default
134+
store.mode=db #修改为db
135+
store.db.dbType=mysql #修改msql的连接方式账号和密码
136+
```
137+
138+
139+
140+
## 三、seata原理
141+
142+
### 1、角色划分
143+
144+
- RM:资源管理者/事务参与者,也可以是一个TM
145+
146+
- TM:事务管理者,也是一个微服务,充当分布式事务的发起者
147+
148+
- TC:全局事务协调者seata-server,一个包需要搭建,TC来决定事务的回滚和提交
149+
150+
### 2、AT模式
151+
152+
#### (1)核心概念
153+
154+
- 两阶段提交:只执行,不提交
155+
156+
- seata 核心概念:边执行,边提交(两阶段的变种)
157+
- 一阶段:查询前置快照---------->执行业务语句-------------->查询出后置快照,保存只undo_log日志表中
158+
- 二阶段提交:分支插入待删除队列--------->异步删除undo_log表中数据
159+
- 二阶段回滚:根据配置选项选择是否检验dirty data------------>构造方向SQL----------->删除undo_log
160+
161+
#### (2)执行流程
162+
163+
阶段一:业务SQL:update product set name = 'GTS' where name = 'TXC'
164+
165+
- 解析SQL,根据update product解析出update语句,表product,条件where等相关信息
166+
- 查询前置镜像:根据解析sql生成查询语句:**select id**, **name**, since **from** product **where name** = 'TXC'
167+
- 执行业务SQL:update product set name = 'GTS' where name = 'TXC' 更新数据
168+
- 查询后置镜像:通过主键定位数据
169+
- 插入回滚日志:把前后镜像数据以及业务SQL相关的信息组成一条回滚日志记录,插入到undo_log表中
170+
- 提交前,向TC注册分支,申请product表中,主键值记录的全局锁
171+
- 本地事务提交:业务数据的更新和前面步骤中生成的undo_log一并提交
172+
- 将本地事务的提交结果上报给TC
173+
174+
**业务数据和回滚日志记录会在同一个本地事务中保存,会释放本地锁和连接资源**
175+
176+
阶段二(回滚):
177+
178+
- 收到TC的分支回滚请求,开启一个本地事务,把请求放入一个异步任务的队列里面
179+
- 根据XID和Branch ID查找到相应的undo_log记录
180+
- 数据校验:拿undo_log中的后镜与当前数据进行比较,如果有不同,说明当前数据被其它事务所更改,需要通过配置的策略进行处理
181+
- 根据undo_log的前置镜像和业务sql的相关信息组成回滚语句
182+
- 将分支回滚的结果提交给TC
183+
184+
通过一阶段的回滚日志进行反向补偿
185+
186+
阶段二(提交):
187+
188+
- 收到TC的分支提交请求,把请求放入异步队列中,马上返回提交成功的结果给TC
189+
- 异步批量的删除undo_log记录
190+
191+
#### (3)写隔离
192+
193+
**一阶段提交本地事务,必须需要拿到更改数据的全局锁,拿不到全局锁,不能提交本地事务,超出等待时间,会回滚本地事务,释放本地锁**
194+
195+
例:tx1和tx2两个全局事务同时修改 a表的m字段,m初始为1000;
196+
197+
tx1先开始,拿到本地锁,将m 1000-100 = 900。本地事务提交前,先拿到该记录的全局锁,本地提交释放本地锁。tx2开始,拿到本地锁,将m 900-100=800,提交本地事务前,先获取该记录的全局锁,tx1全局事务提交前,全局锁会被tx1所持有,tx2就会重试等待全局锁。
198+
199+
**tx1二阶段全局提交,释放全局锁。tx2拿到全局锁提交本地事务。如果tx1二阶段为全局回滚,那么会重新重试获取本地锁,此时tx2如果还在等待全局锁,同时持有本地锁,tx1分支事务就会等待tx2超时释放本地锁之后,再次获取本地锁;整个过程 全局锁都是被 tx1锁持有,不会存在脏数据的问题**
200+
201+
#### (4)读隔离
202+
203+
Seata AT模式的默认全局隔离级别是读未提交,如果在特定场景下,必需要求全局的读已提交,Seata采用通过select for update 语句来进行代理的;select for update语句的执行会申请*全局锁* ,如果全局锁被其它事务锁持有,就会回滚select for update的本地执行并且重试,因为这时候查询是被锁住,直到全局锁拿到,即读取相关的数据是已提交的
204+
205+
### 3、TCC模式
206+
207+
AT模式是基于本地支持ACID事务的关系型数据库:
208+
209+
- 一阶段prepare行为:在本地事务中,一并提交数据更新和相应的回滚记录
210+
- 二阶段commit行为:马上成功,自动异步删除回滚记录
211+
- 二阶段rollback行为:通过回滚日志,自动生成补偿操作,完成数据回滚
212+
213+
相应的TCC模式,不依赖本地底层数据的事务支持:
214+
215+
- 一阶段prepare行为:调用自定义的prepare逻辑
216+
- 二阶段commit行为:调用自定义的commit逻辑
217+
- 二阶段rollback行为:调用自定义的rollback逻辑
218+
219+
### 4、Saga模式
220+
221+
- 特点:业务流程中每个参与者都提交本地事务,当某一个参与者失败则补偿前面已经成功的参与者,一阶段正向服务和二阶段补偿服务都由业务开发者实现
222+
- sage实现:基于状态机引擎来实现
223+
- 通过状态图来定义服务调用的流程并生成json状态语言定义文件
224+
- 状态图中一个节点可以是调用一个服务,节点可以配置它的补偿节点
225+
- 状态图json由状态机引擎驱动执行,当出现异常时状态引擎反向执行已经成功节点对应的补偿节点将事务回滚(用户可以自定义是否进行补偿)
226+
- 可以实现服务编排需求,支持单项选择、并发、子流程、参数转换、参数映射、服务执行状态判断、异常捕获等功能
227+
228+
### 5、XA模式
229+
230+
- 特点:利用事务资源(数据库、消息服务等)对 XA 协议的支持,以 XA 协议的机制来管理分支事务的一种 事务模式
231+

0 commit comments

Comments
 (0)