Reactor响应式编程实战:从Flux|Mono到Spring Cloud Gateway应用
在微服务架构里,高并发场景就像“早高峰的地铁站”——传统同步编程的“排队检票”模式很容易堵死,而响应式编程的“异步分流”模式能让系统更高效地处理请求。作为Java响应式编程的“基石”,Reactor框架基于Reactive Streams规范和Java 8特性(Lambda、CompletableFuture)实现,底层还整合了Netty的非阻塞IO能力,专门解决“高并发下的资源浪费”问题;而Spring Cloud Gateway则基于Reactor,把这种能力落地到网关层,帮我们处理路由、过滤等核心需求。今天就从基础到实战,把Reactor和Gateway的核心逻辑讲明白,重点解答“请求体只能读一次”“重新创建的Flux为何能重复订阅”这些高频坑。
一、Reactor框架底层:三大技术栈与设计模式解析
要真正理解Flux和Mono,必须先搞懂Reactor的“底层骨架”——它基于Reactive Streams规范、Java 8特性、Netty非阻塞IO构建,还用到了生产者-订阅者、装饰器等经典设计模式,每一部分都有明确的作用。
1. 核心支撑1:Reactive Streams规范(异步流的“交通规则”)
Reactive Streams是Java响应式编程的“通用规则”,定义了异步流的接口规范和行为约束,目的是解决“生产者速度远超消费者导致的内存溢出”(背压问题)和“多线程下的线程安全”。
(1)核心设计模式:生产者-订阅者模式(Pub-Sub)的扩展
Reactive Streams没有用简单的“生产者→消费者”直连,而是加入了“中间处理器”,形成**“生产者→处理器→订阅者”的三层结构**,对应规范中的4个核心接口:
| 接口 | 角色 | 作用 |
|---|---|---|
| Publisher | 生产者 | 产生数据,提供subscribe(Subscriber)方法,接收订阅者并绑定关系 |
| Subscriber | 订阅者 | 接收数据和信号(完成/错误),有4个核心方法:onSubscribe(绑定后回调)、onNext(接收数据)、onError(错误回调)、onComplete(完成回调) |
| Subscription | 订阅关系 | 连接生产者和订阅者,控制数据流速(核心是request(long n):告诉生产者“我能处理n个数据”) |
| Processor | 中间处理器 | 既是生产者也是订阅者,用于数据转换(如Flux的map操作符底层就是Processor) |
举个通俗例子:
生产者像“快递站”,订阅者像“你”,Subscription像“快递员”——你(订阅者)下单(subscribe)后,快递员(Subscription)会问你“一次能收多少个快递”(request(n)),快递站(生产者)只给你发n个,不会一次性把所有快递堆到你家门口(避免内存溢出)。
代码层面的接口关系(简化版):
// 生产者接口(如Flux实现了Publisher)
public interface Publisher<T> {
void subscribe(Subscriber<? super T> s);
}
// 订阅者接口
public interface Subscriber<T> {
void onSubscribe(Subscription s); // 绑定后第一个回调,拿到Subscription
void onNext(T t); // 接收数据
void onError(Throwable t); // 错误回调
void onComplete(); // 完成回调
}
// 订阅关系(控制流速)
public interface Subscription {
void request(long n); // 告诉生产者“要n个数据”
void cancel(); // 取消订阅
}(2)背压(Backpressure)的实现机制:“按需生产”
背压是Reactive Streams的核心,本质是**“订阅者反向控制生产者的数据流速”**,避免生产者“狂发数据”导致订阅者处理不过来、内存溢出。实现分三步:
信号传递:从订阅者到生产者
当订阅者调用Subscription.request(n)时,这个“要n个数据”的信号会层层传递到生产者——比如Flux调用map转换时,map对应的Processor会把request(n)转发给上游生产者,确保上游只生产n个数据。背压策略:处理“流速不匹配”
如果生产者还是比订阅者快(比如生产者是高速日志流,订阅者是慢数据库),Reactor提供了5种常用背压策略,通过onBackpressureXXX操作符设置:
onBackpressureBuffer():缓冲多余数据(默认缓冲256个,可指定大小);onBackpressureDrop():丢弃多余数据;onBackpressureError():超出流速时抛OverflowException;onBackpressureLatest():只保留最新的一个数据,丢弃旧数据;onBackpressureBuffer(Duration timeout):缓冲+超时,超时后丢弃。
代码示例:设置背压策略
// 生产者:每秒产生10个数据(速度快)
Flux.interval(Duration.ofMillis(100)) // 100ms一个,每秒10个
.onBackpressureDrop(num -> System.out.println("丢弃数据:" + num)) // 处理不过来就丢弃
.subscribe(
num -> {
// 订阅者:每秒处理1个数据(速度慢)
try { Thread.sleep(1000); } catch (InterruptedException e) {}
System.out.println("处理数据:" + num);
}
);
// 输出:处理数据:0 → 丢弃数据:1-9 → 处理数据:10 → 丢弃数据:11-19...- 线程安全:通过“串行化信号”保证
Reactive Streams规定:onNext/onError/onComplete等回调必须在同一个线程串行执行,且onError/onComplete只能调用一次——这避免了多线程下的数据竞争,比如不会出现“一个线程调用onNext,另一个线程同时调用onError”的情况。
2. 核心支撑2:Java 8特性(简化代码的“工具”)
Java 8的Lambda、CompletableFuture等特性,让Reactor的代码从“冗长的匿名内部类”变成“简洁的一行代码”,主要体现在两个方面:
(1)Lambda:简化订阅逻辑
没有Lambda时,订阅Subscriber需要写大量匿名内部类代码;有了Lambda,可直接用“函数式接口”简化——Reactor的subscribe方法重载了多个版本,支持传入Lambda表达式作为onNext/onError/onComplete的回调。
对比示例:
// 没有Lambda:冗长的匿名内部类
Flux.range(1, 2)
.subscribe(new Subscriber<Integer>() {
private Subscription s;
@Override
public void onSubscribe(Subscription s) {
this.s = s;
s.request(2); // 要2个数据
}
@Override
public void onNext(Integer num) {
System.out.println("处理:" + num);
}
@Override
public void onError(Throwable t) {}
@Override
public void onComplete() {
System.out.println("处理完");
}
});
// 有Lambda:一行搞定
Flux.range(1, 2)
.subscribe(
num -> System.out.println("处理:" + num), // onNext
error -> {}, // onError(空实现)
() -> System.out.println("处理完") // onComplete
);
// 两种写法输出相同:处理:1 → 处理:2 → 处理完(2)CompletableFuture:适配现有异步代码
很多老系统用CompletableFuture实现异步(比如异步查库),Reactor提供了Mono.fromFuture()方法,能把CompletableFuture转换成Mono,无缝融入响应式流。
代码示例:
// 老系统的异步方法:返回CompletableFuture
CompletableFuture<String> getUserNameAsync(Long userId) {
return CompletableFuture.supplyAsync(() -> {
// 模拟异步查库
try { Thread.sleep(500); } catch (InterruptedException e) {}
return "用户" + userId;
});
}
// 转换成Mono,融入响应式流
Mono<String> userNameMono = Mono.fromFuture(() -> getUserNameAsync(101L));
userNameMono.subscribe(name -> System.out.println("用户名:" + name));
// 输出:(延迟500ms后)用户名:用户1013. 核心支撑3:Netty非阻塞IO(高并发的“引擎”)
Reactor的reactor-netty模块封装了Netty,而Netty是Java领域“高性能网络编程”的代名词——Spring Cloud Gateway、Dubbo等框架的高并发能力,都源于Netty的非阻塞IO。要理解它,必须先搞懂“什么是Netty”“非阻塞为何能快”。
(1)什么是Netty?
Netty是一个异步事件驱动的网络应用框架,基于Java NIO(New IO)开发,主要用于开发高性能的TCP/UDP服务器和客户端。它解决了Java原生NIO的“API复杂、容易出bug、性能优化难”等问题,提供了简洁的API和开箱即用的性能优化(如零拷贝、内存池)。
简单说:Netty是“优化版的Java NIO”,让开发者不用关注底层IO细节,就能写出高并发的网络代码。
(2)Netty非阻塞IO的实现原理:靠“IO多路复用”和“事件驱动”
要理解“非阻塞”,先对比传统的BIO(阻塞IO) 和Netty的NIO(非阻塞IO):
| 模型 | 处理方式 | 问题(高并发下) |
|---|---|---|
| BIO | 一个连接对应一个线程,线程阻塞等待数据(比如Socket.read()会一直等数据) | 1万连接需要1万线程,线程切换开销大,内存占用高 |
| Netty NIO | 一个线程处理多个连接,线程不阻塞,靠“事件通知”处理数据 | 1万连接只需几个线程,开销极小 |
Netty的非阻塞核心靠IO多路复用(IO Multiplexing) 和事件驱动模型(Reactor模式) 实现,分三步:
第一步:用Selector(多路复用器)管理所有连接
Selector就像“餐厅的迎宾员”,负责管理所有“餐桌”(Socket连接)——它会监听多个连接的“IO事件”(比如“有数据可读”“连接可写”),不用每个连接都派一个“服务员”(线程)盯着。具体流程:
- 所有Socket连接都注册到Selector上,并告诉Selector“我关注什么事件”(比如“关注有数据可读”);
- 线程调用
Selector.select(),非阻塞等待“有事件发生的连接”(比如A连接有数据了,B连接能写了); - 一旦有事件发生,Selector会返回“有事件的连接列表”,线程再逐个处理这些连接的事件(读/写数据)。
- 第二步:事件驱动模型(Reactor模式)
Netty用“Reactor模式”处理事件,核心是“线程只处理事件,不阻塞等数据”:
- Reactor线程:负责监听事件(如Selector.select()),把“有数据的连接”交给“工作线程”处理;
- 工作线程:负责具体的业务逻辑(如解析HTTP请求、处理数据),不参与IO等待;
- 事件类型:主要有“OP_READ”(有数据可读)、“OP_WRITE”(连接可写)、“OP_ACCEPT”(新连接到来)。
通俗例子:
Reactor线程像“前台”,只负责“接收客户需求(事件)”;工作线程像“后厨”,只负责“处理需求(业务逻辑)”——前台不用等后厨做完,继续接下一个需求,效率极高。
第三步:零拷贝(Zero Copy):减少数据拷贝次数
传统IO读取数据时,数据会从“网卡→内核缓冲区→用户缓冲区”拷贝两次;而Netty用“零拷贝”技术,让数据直接从“内核缓冲区”到“目标缓冲区”,只拷贝一次(甚至不拷贝),大幅提升IO效率。比如Netty的
ByteBuf(字节缓冲区)有两种类型:
HeapByteBuf:数据存在JVM堆内存,适合小数据;DirectByteBuf:数据存在“内核直接内存”,避免“内核→用户空间”的拷贝,适合大数据(如文件传输)。
(3)Netty高并发的核心原因:“少线程+低开销”
基于以上设计,Netty在高并发下的优势很明显:
- 线程少:一个Reactor线程+几个工作线程,就能处理上万甚至几十万连接,线程切换开销几乎可以忽略;
- 无阻塞:线程不阻塞等待数据,只在“有事件时”才工作,CPU利用率高;
- 零拷贝:减少数据拷贝次数,IO速度快;
- 内存池:Netty用“内存池”管理
ByteBuf,避免频繁创建/销毁缓冲区导致的GC(垃圾回收)开销。
这也是Spring Cloud Gateway基于Netty能实现“高并发、低延迟”的根本原因——Gateway的“请求接收、路由转发”等操作,都是通过Netty的非阻塞IO完成的,底层由Reactor封装成Flux/Mono流。
1. Flux:处理“0~N个元素”的流
Flux就像“一串糖葫芦”,能装多个数据块,适合“分块传输”或“多元素场景”,比如:
- HTTP请求体(可能拆成多个DataBuffer);
- 数据库查询的多条结果;
- 消息队列的批量消息。
举个简单例子,从数组生成Flux并订阅:
// 从数组生成Flux,订阅后打印每个元素
Integer[] nums = {1, 2, 3, 4};
Flux.fromArray(nums)
.subscribe(
num -> System.out.println("拿到数字:" + num), // 接收元素
error -> System.err.println("出错了:" + error), // 处理错误
() -> System.out.println("所有元素处理完了") // 处理完成
);
// 输出:拿到数字:1 → 拿到数字:2 → 拿到数字:3 → 拿到数字:4 → 所有元素处理完了2. Mono:处理“0~1个元素”的流
Mono就像“一个苹果”,最多装1个元素,适合“单个结果场景”,比如:
- 数据库单条查询结果;
- 接口单次响应;
- 聚合后的完整请求体。
比如从“获取用户名”的逻辑生成Mono:
// 模拟异步获取用户名(比如查数据库)
Mono<String> usernameMono = Mono.fromSupplier(() -> {
// 模拟异步延迟
try { Thread.sleep(500); } catch (InterruptedException e) {}
return "zhangsan";
});
// 订阅Mono,拿到单个结果
usernameMono.subscribe(name -> System.out.println("用户名:" + name));
// 输出:(延迟500ms后)用户名:zhangsan二、Reactor核心操作符:数据“加工工具”
操作符就像“数据流的加工厂”,能实现过滤、转换、合并等功能,常用的几个操作符足以应对80%场景。
1. 转换操作符:map vs flatMap
- map:同步转换,“输入一个元素,输出一个元素”,比如把数字转成字符串;
- flatMap:异步转换,“输入一个元素,输出一个新的Flux/Mono”,比如根据用户ID查用户信息(查库是异步的)。
// 1. map:同步转换(数字→字符串)
Flux.range(1, 3) // 生成1、2、3
.map(num -> "编号:" + num)
.subscribe(System.out::println);
// 输出:编号:1 → 编号:2 → 编号:3
// 2. flatMap:异步转换(用户ID→用户信息)
Flux.just(101, 102) // 两个用户ID
.flatMap(userId -> {
// 模拟异步查库(返回Mono)
return Mono.just("用户" + userId + ":状态正常");
})
.subscribe(info -> System.out.println("用户信息:" + info));
// 输出:用户信息:用户101:状态正常 → 用户信息:用户102:状态正常2. 聚合操作符:join(关键!)
join是解决“请求体只能读一次”的核心操作符——它能把Flux的“多个分块”聚合成“一个完整元素”,比如把Flux<DataBuffer>的请求体分块,合并成一个字节数组(用Mono包裹,因为只有一个完整结果)。
// 模拟请求体的Flux<DataBuffer>(3个分块)
DataBufferFactory factory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
Flux<DataBuffer> bodyFlux = Flux.just(
factory.wrap("{\n \"name\": ".getBytes()),
factory.wrap("\"zhangsan\",\n \"age\": ".getBytes()),
factory.wrap("18\n}".getBytes())
);
// 用join聚合所有分块,得到完整字节数组(Mono<byte[]>)
Mono<byte[]> fullBodyMono = DataBufferUtils.join(bodyFlux);
// 订阅Mono,拿到完整请求体
fullBodyMono.subscribe(fullBody -> {
String json = new String(fullBody, StandardCharsets.UTF_8);
System.out.println("完整请求体:" + json);
});
// 输出:完整请求体:{ "name": "zhangsan", "age": 18 }3. 错误处理操作符:onErrorReturn
异步流里报错很常见(比如除零异常、网络超时),onErrorReturn能在报错时返回“兜底值”,避免流程中断。
Flux.just(1, 0) // 第二个元素会触发除零异常
.map(num -> 10 / num)
.onErrorReturn(-1) // 报错时返回-1
.subscribe(result -> System.out.println("计算结果:" + result));
// 输出:计算结果:10 → 计算结果:-1三、Spring Cloud Gateway实战:响应式的“网关大门”
Spring Cloud Gateway是微服务的“入口”,所有请求都要经过它,它的核心是“过滤器链”——而过滤器的操作,全靠ServerWebExchange(请求响应的“工具箱”)和Reactor流。
1. ServerWebExchange:请求响应的“随身包”
ServerWebExchange就像你出门带的包,里面装着:
ServerHttpRequest:请求信息(路径、参数、请求体);ServerHttpResponse:响应信息(状态码、响应体);- 自定义属性(比如路由信息、用户身份)。
比如在过滤器中获取“路由目标地址”:
@Component
public class RouteLogFilter implements GlobalFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 从“包”里拿Gateway的内置属性:路由目标地址
URI targetUri = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
System.out.println("当前请求要转发到:" + targetUri);
// 继续走过滤器链
return chain.filter(exchange);
}
}2. 高频坑解答:请求体为什么“只能读一次”?
很多人第一次处理请求体时会懵:“明明是Flux,为什么订阅一次后就空了?” 核心原因是**“数据来源不同”+“Flux的冷流特性”**,用两个比喻讲透:
(1)HTTP请求中的Flux:数据来源是“一次性的TCP流”
HTTP基于TCP协议,而TCP是“单向字节流”——就像“接水管的水”:
- 客户端把请求体数据通过TCP“流”给服务器,服务器的TCP缓冲区接收后,会交给应用层(Gateway)处理;
- 数据“流”过缓冲区后就会被清空,不会留存(就像水流过水管,流走就没了);
- 所以HTTP请求的
Flux<DataBuffer>,数据源是“正在传输的TCP流”,流完就没了,第一次订阅读完,第二次自然没数据。
(2)Flux的“冷流特性”:订阅一次,生成一次数据
Flux默认是“冷流”——就像“餐厅点餐”:
- 只有订阅(点餐),才会生成数据(做餐);
- 每个订阅都是“独立订单”,但如果“食材没了”(TCP流的数据用完了),再点餐也做不出餐;
- HTTP请求的Flux,“食材”就是TCP流的一次性数据,第一次订阅用完,第二次订阅自然没数据。
(3)DataBuffer的“不可重复读”:读一次就“空了”
DataBuffer是Netty的字节缓冲区,就像“一次性饭盒”:
- 数据读取后,“读指针”会移动到末尾,已读的数据无法再读(就像饭盒里的饭吃完了,不能再吃一遍);
- 即使强行订阅两次,第二次拿到的DataBuffer也是“空的”。
3. 解决方案:让请求体“能重复订阅”
核心思路是**“把一次性的TCP流,换成可重复用的内存数据”**——步骤很简单,本质是“重新创建Flux”:
关键原理:重新创建的Flux,数据来源变了!
重新创建的Flux,数据源不再是“TCP流”,而是“内存中的固定字节数组”——就像“把水管的水装进桶里”:
- 桶里的水(内存数组)是固定的,随时能舀(订阅);
- 每次订阅时,Flux.just()会基于这个数组“重新生成DataBuffer”,所以每次都能拿到完整数据。
完整代码:请求体重读过滤器
@Component
public class CachedRequestBodyFilter implements GlobalFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
// 1. 聚合原始请求体:把Flux<DataBuffer>变成完整字节数组(Mono<byte[]>)
return DataBufferUtils.join(request.getBody())
.flatMap(fullBuffer -> {
// 2. 把完整Buffer转成字节数组(存到内存,可重复用)
byte[] fullBody = new byte[fullBuffer.readableByteCount()];
fullBuffer.read(fullBody);
DataBufferUtils.release(fullBuffer); // 释放原始Buffer,避免内存泄漏
// 3. 重新创建Flux<DataBuffer>:数据源是内存数组
DataBufferFactory bufferFactory = request.bufferFactory();
Flux<DataBuffer> cachedBodyFlux = Flux.just(
bufferFactory.wrap(fullBody) // 每次订阅都会生成新的DataBuffer
);
// 4. 用新Flux替换原始请求体,放进exchange
ServerHttpRequest cachedRequest = request.mutate()
.body(cachedBodyFlux)
.build();
ServerWebExchange cachedExchange = exchange.mutate()
.request(cachedRequest)
.build();
// 5. 继续过滤器链,后续过滤器能重复读请求体
return chain.filter(cachedExchange);
});
}
}重点说明:重新创建的Flux还是“冷流”!
很多人以为重新创建的是“热流”,其实不是——它依然是冷流,但因为数据源是“固定内存数组”,所以每次订阅都能“重新生成数据”:
- 热流:数据主动推送,订阅晚了会错过(比如实时日志流);
- 重新创建的冷流:订阅才生成数据,且每次生成的都是“内存数组的完整副本”,所以两个过滤器订阅,都能拿到完整请求体,不会互相影响。
4. 实战场景:修改请求体与响应体
有了“可重复读”的基础,就能轻松修改请求体或响应体,比如给请求体加用户ID、给响应体加密。
场景1:请求体添加用户ID
从请求头的accessToken提取用户ID,写入请求体JSON:
@Component
public class AddUserIdFilter implements GlobalFilter {
@Autowired
private ObjectMapper objectMapper; // 解析JSON用
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
// 1. 从请求头拿用户ID(实际项目可能解析JWT)
String userId = request.getHeaders().getFirst("accessToken");
if (userId == null) {
throw new IllegalArgumentException("缺少accessToken!");
}
// 2. 聚合请求体,拿到完整JSON
return DataBufferUtils.join(request.getBody())
.flatMap(fullBuffer -> {
// 转成字节数组
byte[] fullBody = new byte[fullBuffer.readableByteCount()];
fullBuffer.read(fullBody);
DataBufferUtils.release(fullBuffer);
// 3. 修改JSON:添加userId字段
JsonNode jsonNode = objectMapper.readTree(fullBody);
ObjectNode objectNode = (ObjectNode) jsonNode;
objectNode.put("userId", userId); // 加用户ID
byte[] newBody = objectMapper.writeValueAsBytes(objectNode);
// 4. 重新创建Flux,替换请求体
DataBufferFactory factory = request.bufferFactory();
Flux<DataBuffer> newBodyFlux = Flux.just(factory.wrap(newBody));
ServerHttpRequest newRequest = request.mutate().body(newBodyFlux).build();
// 5. 继续过滤器链
return chain.filter(exchange.mutate().request(newRequest).build());
});
}
}场景2:响应体加密
把下游服务的响应体用AES加密,再返回给客户端:
@Component
public class EncryptResponseFilter implements GlobalFilter, Ordered {
@Autowired
private ObjectMapper objectMapper;
private final AesUtils aesUtils = new AesUtils("your-key"); // 自定义AES工具类
// 确保比Gateway默认的写响应过滤器先执行
@Override
public int getOrder() {
return -2;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpResponse response = exchange.getResponse();
DataBufferFactory bufferFactory = response.bufferFactory();
// 装饰响应体:加密处理
ServerHttpResponseDecorator decorator = new ServerHttpResponseDecorator(response) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
if (body instanceof Flux) {
Flux<? extends DataBuffer> fluxBody = (Flux<? extends DataBuffer>) body;
// 处理每个响应分块
return super.writeWith(fluxBody.map(buffer -> {
try {
// 读原始响应体
CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer.asByteBuffer());
DataBufferUtils.release(buffer); // 释放原始Buffer
JsonNode jsonNode = objectMapper.readTree(charBuffer.toString());
// 加密payload字段
String payload = jsonNode.get("payload").asText();
String encryptedPayload = aesUtils.encrypt(payload);
((ObjectNode) jsonNode).put("payload", encryptedPayload);
// 生成加密后的响应体
return bufferFactory.wrap(objectMapper.writeValueAsBytes(jsonNode));
} catch (Exception e) {
throw new IllegalStateException("响应加密失败", e);
}
}));
}
return super.writeWith(body);
}
};
// 传递装饰后的响应
return chain.filter(exchange.mutate().response(decorator).build());
}
}四、响应式编程:调试技巧与最佳实践
响应式代码不像传统代码能“一步步debug”,掌握这些技巧能少踩坑:
1. 调试技巧:看日志、查堆栈
- 加log()操作符:打印数据流的“全生命周期”(订阅、元素发射、错误、完成):
Flux.range(1, 2) .log("请求体数据流") // 打印详细日志 .subscribe(); - 开启操作符堆栈跟踪:遇到错误时,能看到具体是哪个操作符出了问题(启动类加一行):
Hooks.onOperator(providedHook -> providedHook.operatorStacktrace());
2. 最佳实践:避坑指南
- 别写阻塞代码:响应式流里禁止
Thread.sleep()、同步IO(比如new FileReader()),要用异步替代(Mono.delay()、响应式数据库驱动R2DBC); - 释放DataBuffer:每次操作完DataBuffer都要调用
DataBufferUtils.release(buffer),避免内存泄漏; - 选对调度器:不同场景用不同线程池:
- IO密集型(查库、发HTTP):
Schedulers.elastic(); - 计算密集型(复杂运算):
Schedulers.parallel()。
- IO密集型(查库、发HTTP):
总结
Reactor的底层是“Reactive Streams规范(交通规则)+ Java 8(简化工具)+ Netty(性能引擎)”,靠生产者-订阅者模式和背压实现“异步安全”,靠IO多路复用实现“非阻塞高并发”;Flux和Mono是这个底层的“上层接口”,分别对应“多元素”和“单元素”场景。
Spring Cloud Gateway则是Reactor的“实战载体”——它的请求体处理、过滤器链,本质都是Reactor流的操作。理解“请求体只能读一次”的原因(数据源是TCP流)和解决方案(换内存数据源),就能轻松应对网关的核心需求。
响应式编程的核心不是“学API”,而是“换思维”——从“同步等待结果”变成“异步响应事件”,从“关注线程”变成“关注流”。掌握这些,你就能在高并发场景中写出更高效、更稳定的代码。
掌握这些,你就能在微服务高并发场景中,用响应式编程写出高效、稳定的网关代码。