Dubbo 同步异步调用的实现
Dubbo 通过 Request ID 和 Future 实现同步与异步。同步调用阻塞等待结果,异步调用立即返回 Future,通过回调处理,释放调用方线程。
这是一个非常核心且重要的问题。下面我将从原理、实现细节和使用场景等多个角度,详细解释 Dubbo 的同步调用和异步调用是如何实现的。
核心思想:一次请求,一次响应
无论是同步还是异步,Dubbo RPC 调用的本质都是一样的:
- 客户端(Consumer) 发起一个请求。
- 服务端(Provider) 处理这个请求。
- 服务端 返回一个响应。
同步和异步的区别在于 客户端发起请求后,它的行为模式是怎样的。
为了理清这个过程,我们先引入几个关键角色:
- Request ID: 每个 RPC 请求都有一个全局唯一的 ID。客户端发送请求时会带上这个 ID,服务端响应时会原封不动地将这个 ID 返回。这是将请求和响应关联起来的唯一凭证。
- Future/Callback 机制: 这是实现同步/异步的核心。客户端在发送请求后,会创建一个“期货”(Future)对象,用来在未来某个时间点接收结果。
- 一个共享的 Map: 在客户端内部,有一个全局的 Map(例如
ConcurrentHashMap),用于存放所有已发送但尚未收到响应的请求。它的结构是Map<long, DefaultFuture>,其中key是 Request ID,value是对应的 Future 对象。我们称之为FUTURES。 - I/O 线程: 负责网络数据读写的线程(通常由 Netty 等网络框架管理)。
- 业务线程: 真正执行你的业务逻辑的线程。
一、同步调用(Synchronous Invocation)的实现
同步调用是最常见的方式,调用方线程会一直阻塞,直到收到服务端的返回结果或者超时。
实现流程(图解思路):
plaintext
sequenceDiagram
participant C as 业务线程 (Client)
participant IO_C as I/O线程 (Client)
participant P as 服务端 (Provider)
C->>C: 1. userService.getUser(1)
C->>IO_C: 2. 封装Request(ID=123), 创建DefaultFuture(ID=123)
Note over C: 3. 将 (123, future) 存入FUTURES Map
IO_C->>P: 4. 发送网络请求 Request(ID=123)
C->>C: 5. 调用 future.get(timeout), 线程阻塞等待
P->>P: 6. 处理业务逻辑
P->>IO_C: 7. 返回Response(ID=123, result)
IO_C->>IO_C: 8. 收到响应, 从Response中取出ID=123
IO_C->>IO_C: 9. 从FUTURES Map中找到ID=123对应的future
IO_C->>C: 10. 调用 future.complete(result), 唤醒阻塞的业务线程
C->>C: 11. 业务线程被唤醒, 从future中拿到结果
Note over C: 12. 从FUTURES Map中移除 (123, future)
C->>C: 13. 方法返回, 继续执行
步骤详解:
- 发起调用:业务线程调用接口方法,例如
userService.getUser(1)。 - 代理拦截:Dubbo 的代理(Proxy)拦截到这次调用。
- 创建 Future:
- 生成一个唯一的 Request ID(例如 123)。
- 创建一个
DefaultFuture对象,这个对象内部有一个Lock和Condition,用于线程的等待和唤醒。 - 将这个
(Request ID, DefaultFuture)键值对存入全局的FUTURESMap 中。
- 发送请求:将请求(包含 Request ID 和调用参数)序列化后,通过客户端的 I/O 线程(Netty)发送给服务端。
- 阻塞等待:业务线程在发送完请求后,并不会立即返回,而是调用
defaultFuture.get(timeout)方法。这个方法内部会使用lock.lock()和condition.await()来阻塞当前业务线程,等待服务端的响应。 - 服务端处理:服务端接收到请求,处理业务逻辑,然后将结果和原始的 Request ID 一起封装成 Response 对象,通过网络发回给客户端。
- 客户端接收响应:客户端的 I/O 线程接收到服务端的 Response。
- 匹配 Future 并唤醒:
- I/O 线程从 Response 中解析出 Request ID(123)。
- 根据这个 ID 去
FUTURESMap 中找到对应的DefaultFuture对象。 - 将 Response 中的结果设置到
DefaultFuture对象中。 - 调用
condition.signal()来唤醒之前被阻塞的那个业务线程。 - 从
FUTURESMap 中移除这个已经完成的请求。
- 返回结果:业务线程被唤醒后,从
DefaultFuture中获取到最终结果,getUser(1)方法调用结束,返回结果。
同步调用的关键:业务线程通过 Future.get() 方法,主动阻塞自己,直到 I/O 线程收到响应后将其唤醒。
二、异步调用(Asynchronous Invocation)的实现
异步调用下,调用方线程不会阻塞,它会立即返回一个 CompletableFuture (或 Dubbo 自己的 Future)。你可以通过回调函数或者链式调用来处理后续的结果。
如何开启异步?
在 Consumer 端配置即可,Provider 端是无感知的。
XML 配置:
xml
<dubbo:reference id="userService" interface="com.example.UserService">
<dubbo:method name="getUser" async="true" />
</dubbo:reference>
注解配置:
java
@Reference(async = true)
private UserService userService;
实现流程(图解思路):
plaintext
sequenceDiagram
participant C as 业务线程 (Client)
participant IO_C as I/O线程 (Client)
participant P as 服务端 (Provider)
C->>C: 1. userService.getUser(1)
C->>IO_C: 2. 封装Request(ID=456), 创建DefaultFuture(ID=456)
Note over C: 3. 将 (456, future) 存入FUTURES Map
IO_C->>P: 4. 发送网络请求 Request(ID=456)
C->>C: 5. 立即返回一个 CompletableFuture
C->>C: 6. 业务线程继续执行其他任务 (非阻塞)
Note over C: 7. 用户代码: future.thenAccept(user -> { ... })
P->>P: 8. 处理业务逻辑
P->>IO_C: 9. 返回Response(ID=456, result)
IO_C->>IO_C: 10. 收到响应, 从Response中取出ID=456
IO_C->>IO_C: 11. 从FUTURES Map中找到ID=456对应的future
IO_C->>IO_C: 12. 调用 future.complete(result), 触发回调
Note over IO_C: 13. 从FUTURES Map中移除 (456, future)
Note over C: 14. 回调函数在特定线程池中被执行
步骤详解:
- 前 4 步与同步调用完全相同:发起调用 -> 代理拦截 -> 创建
DefaultFuture并存入FUTURESMap -> I/O 线程发送请求。 - 关键区别 - 立即返回:业务线程在发送请求后,不会调用
future.get()去阻塞自己。相反,Dubbo 框架会立即返回一个CompletableFuture对象给调用方。业务线程拿到这个 Future 后可以继续执行其他任务,完全不会被阻塞。 - 注册回调:调用方可以通过
future.thenAccept(...)或future.whenComplete(...)等方法给这个CompletableFuture注册回调函数,定义当结果返回时应该做什么。 - 服务端处理与响应:这部分与同步调用完全一样,服务端并不知道客户端是同步还是异步等待。
- 客户端接收响应并触发回调:
- 客户端 I/O 线程接收到 Response。
- 根据 Request ID 在
FUTURESMap 中找到对应的DefaultFuture。 - 将结果设置到
DefaultFuture中。 - 这一步会触发之前与
CompletableFuture关联的回调函数。 - 这些回调函数会在 Dubbo 的 Callback 线程池中执行,而不是在 I/O 线程或业务线程中执行。
异步调用的关键:业务线程不等待,立即返回一个 Future 对象。真正的结果处理逻辑被封装在回调函数中,由 I/O 线程收到响应后触发,在专门的线程池中执行。
三、总结与对比
| 特性 | 同步调用 | 异步调用 |
|---|---|---|
| 线程模型 | 业务线程阻塞,等待结果返回。 | 业务线程不阻塞,立即返回 Future。 |
| 返回值 | 直接返回目标类型的结果(如 User 对象)。 |
返回 CompletableFuture<User>。 |
| 实现核心 | 业务线程调用 Future.get() 等待,I/O 线程收到响应后 signal() 唤醒。 |
业务线程不等待,I/O 线程收到响应后触发注册的回调函数。 |
| 性能 | 简单直接,但如果 RPC 耗时较长,会长时间占用业务线程,降低吞吐量。 | 释放了调用方线程,使其可以处理更多任务,显著提高系统吞ötuliang。特别适合扇出(Fan-out)调用场景。 |
| 编程复杂度 | 简单,符合传统的编程习惯。 | 略高,需要处理回调逻辑,代码可读性可能稍差。 |
| 适用场景 | 大部分简单、快速的调用。 | 1. 调用耗时较长的服务。 2. 需要同时调用多个互相不依赖的服务(并行化)。 3. 对系统吞吐量要求极高的场景。 |
总而言之,Dubbo 通过 Request ID + 全局Map + Future 这套组合拳,巧妙地在同一个底层通信模型上实现了同步和异步两种调用方式。同步调用是让业务线程“等” Future 的结果,而异步调用是给 Future 注册一个“后续动作”(回调),然后就去做别的事情了。