request body中的内容只允许读取一次,若是多次读取会报错,本章中简单介绍了如何在springcloud gateway中读取body中的内容。
配置方法
首先先介绍java代码配置的方法
@Bean
public RouteLocator routes(RouteLocatorBuilder builder) {
return builder.routes()
.route("openApi", r -> r.path("/openApi")
.and()
.readBody(JSONObject.class, requestBody -> true)
.filters(f -> f.filter(new OpenApiFilter()))
.uri("lb://openApi"))
.build();
}
其中调用了readBody方法,这个方法就是读取body的核心方法,为gateway提供的,将body参数转为JSONObject并方入缓存中。
之后调用了名为OpenApiFilter的过滤器,这个是自定义的过滤器。我回在OpenApiFilter中用到body。
使用方法
在routes中使用了readBody方法后,就可以在缓存中获取到body参数。以下为Filter中的摘要。
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
if (!HttpMethod.POST.equals(request.getMethod())) {
// 不是POST请求直接返回
}
JSONObject jsonObject = exchange.getAttribute("cachedRequestBodyObject");
// 后面可以根据body中的内容进行处理
// 省略返回方法
return null;
}
源码简单解析
首先看一下核心的readBody方法。
public <T> BooleanSpec readBody(Class<T> inClass, Predicate<T> predicate) {
return asyncPredicate(getBean(ReadBodyPredicateFactory.class)
.applyAsync(c -> c.setPredicate(inClass, predicate)));
}
可以看出其调用了ReadBodyPredicateFactory类的applyAsync方法。
@Override
@SuppressWarnings("unchecked")
public AsyncPredicate<ServerWebExchange> applyAsync(Config config) {
return new AsyncPredicate<ServerWebExchange>() {
@Override
public Publisher<Boolean> apply(ServerWebExchange exchange) {
Class inClass = config.getInClass();
// 首先从exchange中获得cachedBody缓存中的body,CACHE_REQUEST_BODY_OBJECT_KEY的值即为cachedRequestBodyObject
// 所以在自定义Filter的中我们也同样以此方法获取body参数。
Object cachedBody = exchange.getAttribute(CACHE_REQUEST_BODY_OBJECT_KEY);
Mono<?> modifiedBody;
// We can only read the body from the request once, once that happens if
// we try to read the body again an exception will be thrown. The below
// if/else caches the body object as a request attribute in the
// ServerWebExchange so if this filter is run more than once (due to more
// than one route using it) we do not try to read the request body
// multiple times
// 注释翻译结果
// 我们只能从请求中读取一次内容,一旦我们尝试再次读取正文,将引发异常。 下面
// if / else将主体对象作为请求属性缓存在ServerWebExchange,因此如果此筛选器运行了多次(由于运行了多次)
// 而不是使用它的一条路线),我们不会尝试读取请求正文多次
if (cachedBody != null) {
// 如果缓存中的已经存在则去执行配置中的predicate,本案例中没有配置predicate。
try {
boolean test = config.predicate.test(cachedBody);
exchange.getAttributes().put(TEST_ATTRIBUTE, test);
return Mono.just(test);
}
catch (ClassCastException e) {
if (log.isDebugEnabled()) {
log.debug("Predicate test failed because class in predicate "
+ "does not match the cached body object", e);
}
}
return Mono.just(false);
}
else {
// 如果不存在则读取body并方入CACHE_REQUEST_BODY_OBJECT_KEY中,然后执行配置中的predicate.
// 读取body的操作在ServerWebExchangeUtils.cacheRequestBodyAndRequest中。
return ServerWebExchangeUtils.cacheRequestBodyAndRequest(exchange,
(serverHttpRequest) -> ServerRequest
.create(exchange.mutate().request(serverHttpRequest)
.build(), messageReaders)
.bodyToMono(inClass)
.doOnNext(objectValue -> exchange.getAttributes().put(
CACHE_REQUEST_BODY_OBJECT_KEY, objectValue))
.map(objectValue -> config.getPredicate()
.test(objectValue)));
}
}
@Override
public String toString() {
return String.format("ReadBody: %s", config.getInClass());
}
};
}
ServerWebExchangeUtils.cacheRequestBodyAndRequest方法
public static <T> Mono<T> cacheRequestBodyAndRequest(ServerWebExchange exchange,
Function<ServerHttpRequest, Mono<T>> function) {
return cacheRequestBody(exchange, true, function);
}
继续往下看cacheRequestBody
private static <T> Mono<T> cacheRequestBody(ServerWebExchange exchange,
boolean cacheDecoratedRequest,
Function<ServerHttpRequest, Mono<T>> function) {
// Join all the DataBuffers so we have a single DataBuffer for the body
return DataBufferUtils.join(exchange.getRequest().getBody())
.flatMap(dataBuffer -> {
if (dataBuffer.readableByteCount() > 0) {
if (log.isTraceEnabled()) {
log.trace("retaining body in exchange attribute");
}
exchange.getAttributes().put(CACHED_REQUEST_BODY_ATTR,
dataBuffer);
}
ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(
exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return Mono.<DataBuffer>fromSupplier(() -> {
if (exchange.getAttributeOrDefault(
CACHED_REQUEST_BODY_ATTR, null) == null) {
// probably == downstream closed
return null;
}
// TODO: deal with Netty
NettyDataBuffer pdb = (NettyDataBuffer) dataBuffer;
return pdb.factory()
.wrap(pdb.getNativeBuffer().retainedSlice());
}).flux();
}
};
if (cacheDecoratedRequest) {
exchange.getAttributes().put(
CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR, decorator);
}
return function.apply(decorator);
});
}
这里运用了装饰器设计模式将body中读取到的dataBuffer数据包装了下往下传递。这里本人也只是粗略了解。先记录一下。