基于本文回答

播面 播面

文图音视,全方位拆解八股文
0
评论

流计算过程中需要通过 Map 算子调用第三方 HTTP API 补全维表数据,因为网络延迟导致吞吐量极低。如何在 Flink 中使用异步 I/O (Async I/O) 加并发缓存来提升吞吐?

知识点图片

在流计算(Flink)中,通过同步的 Map 算子调用外部 HTTP API 进行维表关联,是典型的反模式。因为单线程同步等待网络 IO 会导致 CPU 大量空闲,吞吐量断崖式下降。

解决这个问题的标准方案是:Flink Async I/O(异步 IO) + 异步 HTTP Client + 本地并发缓存(如 Caffeine)

下面将详细介绍如何实现这一方案,并提供完整的代码示例和生产环境调优指南。


1. 核心架构设计

  1. Async I/O (异步 IO):允许单个并发实例同时发起多个 HTTP 请求,而不需要等待上一个请求返回。Flink 会在后台管理这些飞行中(In-Flight)的请求。
  2. 异步 HTTP Client:必须使用支持异步非阻塞的 HTTP 客户端(如 Java 11+ 自带的 HttpClientApache HttpAsyncClientVert.x WebClient),否则在 AsyncFunction 中使用同步 HTTP 客户端仍然会阻塞 Flink 的 Task 线程。
  3. 并发缓存 (Concurrent Cache):使用 Caffeine 缓存最近请求过的维表数据。由于异步回调会在不同的线程中执行,缓存必须是线程安全且高并发的。

2. 添加依赖

pom.xml 中引入 Flink 依赖以及高性能缓存库 Caffeine:

xml
<!-- 引入 Caffeine 缓存 -->
<dependency>
    <groupId>com.github.ben-manes.caffeine</groupId>
    <artifactId>caffeine</artifactId>
    <version>3.1.8</version>
</dependency>

(注:HTTP Client 我们直接使用 Java 11+ 内置的 java.net.http.HttpClient,因为它原生返回 CompletableFuture,与 Flink 异步 IO 结合极其平滑。)


3. 编写带有缓存的 AsyncFunction

我们需要继承 RichAsyncFunction,在 open 方法中初始化 HttpClient 和 Cache,在 asyncInvoke 中处理逻辑。

java
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;

public class HttpAsyncEnrichmentFunction extends RichAsyncFunction<InputData, EnrichedData> {

    private transient HttpClient httpClient;
    private transient Cache<String, String> cache;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        // 1. 初始化异步 HTTP Client (配置连接池和超时)
        httpClient = HttpClient.newBuilder()
                .version(HttpClient.Version.HTTP_2)
                .connectTimeout(Duration.ofSeconds(3)) // 连接超时
                .build();

        // 2. 初始化本地并发缓存 Caffeine
        cache = Caffeine.newBuilder()
                .maximumSize(100000) // 最大缓存数量
                .expireAfterWrite(Duration.ofMinutes(10)) // 写入后 10 分钟过期,保证维表数据更新
                .build();
    }

    @Override
    public void asyncInvoke(InputData input, ResultFuture<EnrichedData> resultFuture) {
        String key = input.getUserId(); // 假设用 UserId 关联维表

        // 3. 查缓存 (命中则直接返回,避免网络开销)
        String cachedValue = cache.getIfPresent(key);
        if (cachedValue != null) {
            EnrichedData result = new EnrichedData(input, cachedValue);
            resultFuture.complete(Collections.singletonList(result));
            return; // 结束处理
        }

        // 4. 缓存未命中,发起异步 HTTP 请求
        HttpRequest request = HttpRequest.newBuilder()
                .uri(URI.create("http://api.example.com/userInfo?userId=" + key))
                .timeout(Duration.ofSeconds(2)) // 读超时
                .GET()
                .build();

        CompletableFuture<HttpResponse<String>> responseFuture = 
                httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString());

        // 5. 处理异步回调
        responseFuture.whenComplete((response, exception) -> {
            if (exception != null) {
                // 网络异常处理:记录日志,可以返回带默认值的对象,或者使 future 失败
                System.err.println("HTTP 请求失败: " + exception.getMessage());
                // 根据业务需求:允许数据流失则 complete(空集合),要求强一致则 completeExceptionally
                resultFuture.completeExceptionally(exception); 
            } else {
                if (response.statusCode() == 200) {
                    String apiResult = response.body();
                    
                    // 写入缓存 (Caffeine 是线程安全的)
                    cache.put(key, apiResult);
                    
                    // 组装结果并输出
                    EnrichedData result = new EnrichedData(input, apiResult);
                    resultFuture.complete(Collections.singletonList(result));
                } else {
                    // API 返回非 200 错误码处理
                    resultFuture.complete(Collections.singletonList(new EnrichedData(input, "DEFAULT_VALUE")));
                }
            }
        });
    }

    @Override
    public void timeout(InputData input, ResultFuture<EnrichedData> resultFuture) {
        // 6. Flink 层的异步超时处理
        System.err.println("Async I/O 请求超时: " + input.getUserId());
        resultFuture.complete(Collections.singletonList(new EnrichedData(input, "TIMEOUT_DEFAULT")));
    }

    @Override
    public void close() throws Exception {
        // 资源清理
        if (cache != null) {
            cache.cleanUp();
        }
        super.close();
    }
}

4. 在 Flink Job 中集成 AsyncFunction

在组装 DataStream 时,使用 AsyncDataStream 包装你的数据流:

java
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;

// ... 初始化 env 和 inputStream

// 使用异步 IO 处理流
DataStream<EnrichedData> enrichedStream = AsyncDataStream.unorderedWait(
    inputStream, 
    new HttpAsyncEnrichmentFunction(),
    3000,                  // 超时时间:3秒 (必须大于 HTTP Client 的超时时间)
    TimeUnit.MILLISECONDS,
    100                    // 容量 (Capacity):最大同时进行中的异步请求数
);

orderedWait vs unorderedWait

  • unorderedWait(推荐):只要某个异步请求先返回,就先向下游发送。吞吐量最高,延迟最低。适用于对数据顺序没有严格要求的场景。
  • orderedWait:保证输出顺序与输入顺序完全一致。如果前面的请求未返回,后面的请求即使返回了也要在 Flink 内存中排队等待。会增加延迟和内存占用。

5. 生产环境调优指南 (极其重要)

代码写完只是第一步,要达到高吞吐,必须调整以下参数:

1. 异步容量 (Capacity) 的计算逻辑 (Little's Law)

Capacity 参数表示单 Task 并发能容纳的最多异步请求数。
如果你的目标吞吐是 10000 QPS,API 平均响应时间是 50ms (0.05s),那么你至少需要的并发请求数(Capacity)为:
Capacity = QPS * Latency = 10000 * 0.05 = 500
如果 Flink 并发度为 10,那么配置的参数应为 500 / 10 = 50。建议预留 2 倍 buffer,设置为 100

2. HTTP Client 底层连接池

极其容易被忽略的一点:就算 Flink 的 Capacity 设置为 1000,如果底层的 HTTP Client 连接池最大连接数(Max Connections)只有 50,那么剩下的 950 个请求全都在本地排队,根本没有发出去。
如果使用 Apache HttpAsyncClient,务必调整 setMaxTotal()setDefaultMaxPerRoute()。Java 11 HttpClient 默认没有硬性连接数限制,但受限于操作系统的临时端口数。

3. 缓存策略优化 (Caffeine)

  • 命中率:缓存的意义在于减少外部请求。如果 key (如 UserId) 高度分散且只出现一次,缓存不仅无效还会增加 GC 压力。此时应考虑 Redis 甚至不要缓存。
  • 缓存大小 (maximumSize):评估单个 value 的内存占用,避免 OOM。比如 100 万条字符串数据可能占用几百 MB 内存。
  • 过期策略 (expireAfterWrite):维表数据是会变的。根据业务容忍度设置过期时间。不要设置永不过期。

4. Flink Checkpoint 影响

需要注意的是,本地缓存(Caffeine)是 Transient(瞬态)的,不会被写入 Flink Checkpoint。
当 Flink 任务崩溃重启时,缓存是空的,此时会产生缓存雪崩效应(大量请求瞬间打向 HTTP API)。
解决策略:可以在 HTTP Client 中加入限流器(如 Guava RateLimiter),或者要求 HTTP API 端具备抗高并发及限流降级能力。

5. 资源隔离与线程池

在 Java 11 HttpClientsendAsync 底层使用了公用的 ForkJoinPool。如果 HTTP 请求非常密集,建议为 HttpClient 提供专属的 ExecutorService,避免干扰 Flink Task 线程:

java
ExecutorService customExecutor = Executors.newFixedThreadPool(50);
httpClient = HttpClient.newBuilder().executor(customExecutor).build();

记得在 close() 方法中关闭这个线程池。

00:00
00:00