springcloud gateway读取请求body中的内容

request body中的内容只允许读取一次,若是多次读取会报错,本章中简单介绍了如何在springcloud gateway中读取body中的内容。

配置方法

首先先介绍java代码配置的方法

@Bean
public RouteLocator routes(RouteLocatorBuilder builder) {
    return builder.routes()
            .route("openApi", r -> r.path("/openApi")
                    .and()
                    .readBody(JSONObject.class, requestBody -> true)
                    .filters(f -> f.filter(new OpenApiFilter()))
                    .uri("lb://openApi"))
            .build();
}

其中调用了readBody方法,这个方法就是读取body的核心方法,为gateway提供的,将body参数转为JSONObject并方入缓存中。
之后调用了名为OpenApiFilter的过滤器,这个是自定义的过滤器。我回在OpenApiFilter中用到body。

使用方法

在routes中使用了readBody方法后,就可以在缓存中获取到body参数。以下为Filter中的摘要。

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    ServerHttpRequest request = exchange.getRequest();
    ServerHttpResponse response = exchange.getResponse();
    if (!HttpMethod.POST.equals(request.getMethod())) {
        // 不是POST请求直接返回
    }
    JSONObject jsonObject = exchange.getAttribute("cachedRequestBodyObject");
    // 后面可以根据body中的内容进行处理
    // 省略返回方法
    return null;
}

源码简单解析

首先看一下核心的readBody方法。

public <T> BooleanSpec readBody(Class<T> inClass, Predicate<T> predicate) {
    return asyncPredicate(getBean(ReadBodyPredicateFactory.class)
            .applyAsync(c -> c.setPredicate(inClass, predicate)));
}

可以看出其调用了ReadBodyPredicateFactory类的applyAsync方法。

@Override
@SuppressWarnings("unchecked")
public AsyncPredicate<ServerWebExchange> applyAsync(Config config) {
    return new AsyncPredicate<ServerWebExchange>() {
        @Override
        public Publisher<Boolean> apply(ServerWebExchange exchange) {
            Class inClass = config.getInClass();
            // 首先从exchange中获得cachedBody缓存中的body,CACHE_REQUEST_BODY_OBJECT_KEY的值即为cachedRequestBodyObject
            // 所以在自定义Filter的中我们也同样以此方法获取body参数。
            Object cachedBody = exchange.getAttribute(CACHE_REQUEST_BODY_OBJECT_KEY);
            Mono<?> modifiedBody;
            // We can only read the body from the request once, once that happens if
            // we try to read the body again an exception will be thrown. The below
            // if/else caches the body object as a request attribute in the
            // ServerWebExchange so if this filter is run more than once (due to more
            // than one route using it) we do not try to read the request body
            // multiple times
            // 注释翻译结果
            // 我们只能从请求中读取一次内容,一旦我们尝试再次读取正文,将引发异常。 下面
            // if / else将主体对象作为请求属性缓存在ServerWebExchange,因此如果此筛选器运行了多次(由于运行了多次)
            // 而不是使用它的一条路线),我们不会尝试读取请求正文多次

            if (cachedBody != null) {
                // 如果缓存中的已经存在则去执行配置中的predicate,本案例中没有配置predicate。
                try {
                    boolean test = config.predicate.test(cachedBody);
                    exchange.getAttributes().put(TEST_ATTRIBUTE, test);
                    return Mono.just(test);
                }
                catch (ClassCastException e) {
                    if (log.isDebugEnabled()) {
                        log.debug("Predicate test failed because class in predicate "
                                + "does not match the cached body object", e);
                    }
                }
                return Mono.just(false);
            }
            else {
                // 如果不存在则读取body并方入CACHE_REQUEST_BODY_OBJECT_KEY中,然后执行配置中的predicate.
                // 读取body的操作在ServerWebExchangeUtils.cacheRequestBodyAndRequest中。
                return ServerWebExchangeUtils.cacheRequestBodyAndRequest(exchange,
                        (serverHttpRequest) -> ServerRequest
                                .create(exchange.mutate().request(serverHttpRequest)
                                        .build(), messageReaders)
                                .bodyToMono(inClass)
                                .doOnNext(objectValue -> exchange.getAttributes().put(
                                        CACHE_REQUEST_BODY_OBJECT_KEY, objectValue))
                                .map(objectValue -> config.getPredicate()
                                        .test(objectValue)));
            }
        }

        @Override
        public String toString() {
            return String.format("ReadBody: %s", config.getInClass());
        }
    };
}

ServerWebExchangeUtils.cacheRequestBodyAndRequest方法

public static <T> Mono<T> cacheRequestBodyAndRequest(ServerWebExchange exchange,
        Function<ServerHttpRequest, Mono<T>> function) {
    return cacheRequestBody(exchange, true, function);
}

继续往下看cacheRequestBody

private static <T> Mono<T> cacheRequestBody(ServerWebExchange exchange,
        boolean cacheDecoratedRequest,
        Function<ServerHttpRequest, Mono<T>> function) {
    // Join all the DataBuffers so we have a single DataBuffer for the body
    return DataBufferUtils.join(exchange.getRequest().getBody())
            .flatMap(dataBuffer -> {
                if (dataBuffer.readableByteCount() > 0) {
                    if (log.isTraceEnabled()) {
                        log.trace("retaining body in exchange attribute");
                    }
                    exchange.getAttributes().put(CACHED_REQUEST_BODY_ATTR,
                            dataBuffer);
                }

                ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(
                        exchange.getRequest()) {
                    @Override
                    public Flux<DataBuffer> getBody() {
                        return Mono.<DataBuffer>fromSupplier(() -> {
                            if (exchange.getAttributeOrDefault(
                                    CACHED_REQUEST_BODY_ATTR, null) == null) {
                                // probably == downstream closed
                                return null;
                            }
                            // TODO: deal with Netty
                            NettyDataBuffer pdb = (NettyDataBuffer) dataBuffer;
                            return pdb.factory()
                                    .wrap(pdb.getNativeBuffer().retainedSlice());
                        }).flux();
                    }
                };
                if (cacheDecoratedRequest) {
                    exchange.getAttributes().put(
                            CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR, decorator);
                }
                return function.apply(decorator);
            });
}

这里运用了装饰器设计模式将body中读取到的dataBuffer数据包装了下往下传递。这里本人也只是粗略了解。先记录一下。