基于本文回答

播面 播面

文图音视,全方位拆解八股文
0
评论

Dubbo 同步异步调用的实现

知识点图片

Dubbo 通过 Request ID 和 Future 实现同步与异步。同步调用阻塞等待结果,异步调用立即返回 Future,通过回调处理,释放调用方线程。

这是一个非常核心且重要的问题。下面我将从原理、实现细节和使用场景等多个角度,详细解释 Dubbo 的同步调用和异步调用是如何实现的。

核心思想:一次请求,一次响应

无论是同步还是异步,Dubbo RPC 调用的本质都是一样的:

  1. 客户端(Consumer) 发起一个请求。
  2. 服务端(Provider) 处理这个请求。
  3. 服务端 返回一个响应。

同步和异步的区别在于 客户端发起请求后,它的行为模式是怎样的

为了理清这个过程,我们先引入几个关键角色:

  • Request ID: 每个 RPC 请求都有一个全局唯一的 ID。客户端发送请求时会带上这个 ID,服务端响应时会原封不动地将这个 ID 返回。这是将请求和响应关联起来的唯一凭证。
  • Future/Callback 机制: 这是实现同步/异步的核心。客户端在发送请求后,会创建一个“期货”(Future)对象,用来在未来某个时间点接收结果。
  • 一个共享的 Map: 在客户端内部,有一个全局的 Map(例如 ConcurrentHashMap),用于存放所有已发送但尚未收到响应的请求。它的结构是 Map<long, DefaultFuture>,其中 key是 Request ID,value是对应的 Future 对象。我们称之为 FUTURES
  • I/O 线程: 负责网络数据读写的线程(通常由 Netty 等网络框架管理)。
  • 业务线程: 真正执行你的业务逻辑的线程。

一、同步调用(Synchronous Invocation)的实现

同步调用是最常见的方式,调用方线程会一直阻塞,直到收到服务端的返回结果或者超时。

实现流程(图解思路):

plaintext
sequenceDiagram
    participant C as 业务线程 (Client)
    participant IO_C as I/O线程 (Client)
    participant P as 服务端 (Provider)

    C->>C: 1. userService.getUser(1)
    C->>IO_C: 2. 封装Request(ID=123), 创建DefaultFuture(ID=123)
    Note over C: 3. 将 (123, future) 存入FUTURES Map
    IO_C->>P: 4. 发送网络请求 Request(ID=123)
    C->>C: 5. 调用 future.get(timeout), 线程阻塞等待
    
    P->>P: 6. 处理业务逻辑
    P->>IO_C: 7. 返回Response(ID=123, result)
    
    IO_C->>IO_C: 8. 收到响应, 从Response中取出ID=123
    IO_C->>IO_C: 9. 从FUTURES Map中找到ID=123对应的future
    IO_C->>C: 10. 调用 future.complete(result), 唤醒阻塞的业务线程
    
    C->>C: 11. 业务线程被唤醒, 从future中拿到结果
    Note over C: 12. 从FUTURES Map中移除 (123, future)
    C->>C: 13. 方法返回, 继续执行

步骤详解:

  1. 发起调用:业务线程调用接口方法,例如 userService.getUser(1)
  2. 代理拦截:Dubbo 的代理(Proxy)拦截到这次调用。
  3. 创建 Future
    • 生成一个唯一的 Request ID(例如 123)。
    • 创建一个 DefaultFuture 对象,这个对象内部有一个 LockCondition,用于线程的等待和唤醒。
    • 将这个 (Request ID, DefaultFuture) 键值对存入全局的 FUTURES Map 中。
  4. 发送请求:将请求(包含 Request ID 和调用参数)序列化后,通过客户端的 I/O 线程(Netty)发送给服务端。
  5. 阻塞等待:业务线程在发送完请求后,并不会立即返回,而是调用 defaultFuture.get(timeout) 方法。这个方法内部会使用 lock.lock()condition.await()阻塞当前业务线程,等待服务端的响应。
  6. 服务端处理:服务端接收到请求,处理业务逻辑,然后将结果和原始的 Request ID 一起封装成 Response 对象,通过网络发回给客户端。
  7. 客户端接收响应:客户端的 I/O 线程接收到服务端的 Response。
  8. 匹配 Future 并唤醒
    • I/O 线程从 Response 中解析出 Request ID(123)。
    • 根据这个 ID 去 FUTURES Map 中找到对应的 DefaultFuture 对象。
    • 将 Response 中的结果设置到 DefaultFuture 对象中。
    • 调用 condition.signal()唤醒之前被阻塞的那个业务线程。
    • FUTURES Map 中移除这个已经完成的请求。
  9. 返回结果:业务线程被唤醒后,从 DefaultFuture 中获取到最终结果,getUser(1) 方法调用结束,返回结果。

同步调用的关键:业务线程通过 Future.get() 方法,主动阻塞自己,直到 I/O 线程收到响应后将其唤醒。


二、异步调用(Asynchronous Invocation)的实现

异步调用下,调用方线程不会阻塞,它会立即返回一个 CompletableFuture (或 Dubbo 自己的 Future)。你可以通过回调函数或者链式调用来处理后续的结果。

如何开启异步?

在 Consumer 端配置即可,Provider 端是无感知的。

XML 配置:

xml
<dubbo:reference id="userService" interface="com.example.UserService">
    <dubbo:method name="getUser" async="true" />
</dubbo:reference>

注解配置:

java
@Reference(async = true)
private UserService userService;

实现流程(图解思路):

plaintext
sequenceDiagram
    participant C as 业务线程 (Client)
    participant IO_C as I/O线程 (Client)
    participant P as 服务端 (Provider)

    C->>C: 1. userService.getUser(1)
    C->>IO_C: 2. 封装Request(ID=456), 创建DefaultFuture(ID=456)
    Note over C: 3. 将 (456, future) 存入FUTURES Map
    IO_C->>P: 4. 发送网络请求 Request(ID=456)
    C->>C: 5. 立即返回一个 CompletableFuture
    C->>C: 6. 业务线程继续执行其他任务 (非阻塞)
    Note over C: 7. 用户代码: future.thenAccept(user -> { ... })
    
    P->>P: 8. 处理业务逻辑
    P->>IO_C: 9. 返回Response(ID=456, result)
    
    IO_C->>IO_C: 10. 收到响应, 从Response中取出ID=456
    IO_C->>IO_C: 11. 从FUTURES Map中找到ID=456对应的future
    IO_C->>IO_C: 12. 调用 future.complete(result), 触发回调
    Note over IO_C: 13. 从FUTURES Map中移除 (456, future)
    Note over C: 14. 回调函数在特定线程池中被执行

步骤详解:

  1. 前 4 步与同步调用完全相同:发起调用 -> 代理拦截 -> 创建 DefaultFuture 并存入 FUTURES Map -> I/O 线程发送请求。
  2. 关键区别 - 立即返回:业务线程在发送请求后,不会调用 future.get() 去阻塞自己。相反,Dubbo 框架会立即返回一个 CompletableFuture 对象给调用方。业务线程拿到这个 Future 后可以继续执行其他任务,完全不会被阻塞。
  3. 注册回调:调用方可以通过 future.thenAccept(...)future.whenComplete(...) 等方法给这个 CompletableFuture 注册回调函数,定义当结果返回时应该做什么。
  4. 服务端处理与响应:这部分与同步调用完全一样,服务端并不知道客户端是同步还是异步等待。
  5. 客户端接收响应并触发回调
    • 客户端 I/O 线程接收到 Response。
    • 根据 Request ID 在 FUTURES Map 中找到对应的 DefaultFuture
    • 将结果设置到 DefaultFuture 中。
    • 这一步会触发之前与 CompletableFuture 关联的回调函数。
    • 这些回调函数会在 Dubbo 的 Callback 线程池中执行,而不是在 I/O 线程或业务线程中执行。

异步调用的关键:业务线程不等待,立即返回一个 Future 对象。真正的结果处理逻辑被封装在回调函数中,由 I/O 线程收到响应后触发,在专门的线程池中执行。


三、总结与对比

特性 同步调用 异步调用
线程模型 业务线程阻塞,等待结果返回。 业务线程不阻塞,立即返回 Future。
返回值 直接返回目标类型的结果(如 User 对象)。 返回 CompletableFuture<User>
实现核心 业务线程调用 Future.get() 等待,I/O 线程收到响应后 signal() 唤醒。 业务线程不等待,I/O 线程收到响应后触发注册的回调函数
性能 简单直接,但如果 RPC 耗时较长,会长时间占用业务线程,降低吞吐量。 释放了调用方线程,使其可以处理更多任务,显著提高系统吞ötuliang。特别适合扇出(Fan-out)调用场景。
编程复杂度 简单,符合传统的编程习惯。 略高,需要处理回调逻辑,代码可读性可能稍差。
适用场景 大部分简单、快速的调用。 1. 调用耗时较长的服务。
2. 需要同时调用多个互相不依赖的服务(并行化)。
3. 对系统吞吐量要求极高的场景。

总而言之,Dubbo 通过 Request ID + 全局Map + Future 这套组合拳,巧妙地在同一个底层通信模型上实现了同步和异步两种调用方式。同步调用是让业务线程“等” Future 的结果,而异步调用是给 Future 注册一个“后续动作”(回调),然后就去做别的事情了。

00:00
00:00