过滤器类型
按照作用区域划分,它可以划分为全局过滤以及局部过滤器
全局过滤器的作用是处理一切进入网关的请求和微服务响应,与 GatewayFilter 的作用一样。
区别在于 GatewayFilter 通过配置定义,处理逻辑是固定的;而 GlobalFilter 的逻辑需要自己写代码实现。
Gateway 内部有一个接口 名为 GlobalFilter,这个就是 Gateway 的全局过滤器接口,只要在应用中实现此接口后注册为 Spring 的 Bean,它就会就会帮我们将这个实现注册到全局过滤器链条里边去。
在执行路由过滤器的前面可以有多个过滤器,最后一个一定是路由过滤器,有且只有一个,然后开启一个异步的线程去对下游服务发起请求下游服务响应之后,直接调用 ctx.writeAndFlush 方法写回数据;如果中间出现任何异常,都进行执行异常逻辑,直接调用 ctx.writeAndFlush 方法写回数据最后最后执行一系列回调函数,这个我们也可以组成一个过滤器链条
**由此可见,当请求经过网关转发路由到下游服务器的时候,需要经过一系列网关过滤器,最终成功校验才会放行到开启异步线程访问下游服务请求,类比与 Netty 的 usafe 过滤器,**设计过滤器需要便于扩展,能够支持用户自定义过滤器
过滤器设计模型
过滤器核心模型:
- 顶级接口:Filter,doFilter 方法
- 切面:FilterAspect 注解标识过滤器的 order,id 等等标识
- 工厂生产类:FilterFactory 过滤器抽象工厂
- 过滤器链条:GatewayFilterChain
- 过滤器链条工厂实现:GatewayFilterChainFactory
实现 GatewayFilterChainFactory
该类是一个单例对象,再第一次构造的时候,会再构造器中,通过 SPI 机制加载 ServiceLoader
private GatewayFilterChainFactory() {
ServiceLoader<Filter> serviceLoader = ServiceLoader.load(Filter.class);
serviceLoader
.stream()
.map(ServiceLoader.Provider::get)
.filter(Objects::nonNull)
.filter(filter -> filter.getClass().isAnnotationPresent(FilterAspect.class))
.forEach(filter -> {
FilterAspect annotation = filter.getClass().getAnnotation(FilterAspect.class);
String id = Optional.ofNullable(annotation.id())
.orElse(filter.getClass().getName());
processorFilterIdMap.putIfAbsent(id, filter);
log.info("handle filter success:{},{},{},{}", filter.getClass().getName(),
id, annotation.name(), annotation.order());
});
}
通过 gatewayContext 上下文对象中,获取 Rule 规则,这些规则有部分是从 nacoas 配置中心配置的,配置中心发生事件变动,网关会更新自己的配置规则 map 缓存,当组装 gatewayContext,会通过请求路径和服务 id,获取匹配的规则 Rule,填充到 gatewayContext 上下文中,当构建过滤器 chain 的时候,满足了匹配的规则,就会放在 chan 链表中,并且会根据 filter 排序,最后路由过滤器一定是放在最后,只有等请求经过所有过滤器后,才会执行转发路由到相应的下游服务器
@Override
public GatewayFilterChain buildFilterChain(GatewayContext ctx) {
List<Filter> filters = new LinkedList<>();
GatewayFilterChain chain = new GatewayFilterChain();
Optional.ofNullable(ctx.getRule())
.ifPresent(rule -> {
Set<Rule.FilterConfig> filterConfigs = rule.getFilterConfigs();
filterConfigs.forEach(filterConfig -> {
String id = filterConfig.getId();
Optional.ofNullable(id)
.map(this::getFilterInfo)
.ifPresent(filters::add);
});
});
chain.addFilter(new RouterFilter()).addFilter(filters);
return chain;
}
@Override
public Filter getFilterInfo(String filterId) {
return processorFilterIdMap.get(filterId);
}
触发过滤器时机
既然已经构造好了网关过滤器链 chain,那什么时候触发比较好呢,当请求经过网关的时候,会封装一个 gateway 上下文对象,开始执行路由转发之前,就得执行前置过滤器
public GatewayContext doFilter(GatewayContext context) {
if (CollectionUtils.isEmpty(filters)) {
return context;
}
try {
for (Filter filter : filters) {
filter.doFilter(context);
}
} catch (Exception e) {
log.error("执行过滤器发生异常,异常原因 ", e);
}
return context;
}
最终循环处理所有 filter 组件的 doFilter 后(再新增 filter 已经排序了),最终会执行 RouterFilter 转发逻辑,开始异步发送请求
package com.xiaohu.core.router;
import com.xiaohu.common.constants.FilterConst;
import com.xiaohu.common.enums.ResponseCode;
import com.xiaohu.common.exception.ConnectException;
import com.xiaohu.common.exception.ResponseException;
import com.xiaohu.core.ConfigLoader;
import com.xiaohu.core.context.GatewayContext;
import com.xiaohu.core.context.enums.ContextState;
import com.xiaohu.core.filter.Filter;
import com.xiaohu.core.filter.FilterAspect;
import com.xiaohu.core.helper.AsyncHttpHelper;
import com.xiaohu.core.helper.ResponseHelper;
import com.xiaohu.core.response.GatewayResponse;
import lombok.extern.slf4j.Slf4j;
import org.asynchttpclient.Request;
import org.asynchttpclient.Response;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
/**
* @author xiaohugg
* @version 1.0
* @date 2024/4/28 22:00:27
*/
@Slf4j
@FilterAspect(id = FilterConst.ROUTER_FILTER_ID,
name = FilterConst.ROUTER_FILTER_NAME,
order = FilterConst.ROUTER_FILTER_ORDER)
public class RouterFilter implements Filter {
@Override
public void doFilter(GatewayContext gatewayContext) {
route(gatewayContext);
}
private void route(GatewayContext gatewayContext) {
Request request = gatewayContext.getRequest().build();
CompletableFuture<Response> future = AsyncHttpHelper.getInstance().executeRequest(request);
//单异步
if (ConfigLoader.getInstance().getConfig().isWhenComplete()) {
future.whenComplete(((response, throwable) -> complete(gatewayContext, response, request, throwable)));
} else {
//双异步
future.whenCompleteAsync((response, throwable) -> complete(gatewayContext, response, request, throwable));
}
}
private void complete(GatewayContext gatewayContext,
Response response,
Request request, Throwable throwable) {
gatewayContext.releaseRequest();
try {
Optional.ofNullable(throwable).ifPresentOrElse(e -> {
String url = request.getUrl();
if (e instanceof TimeoutException) {
log.warn("request timeout,url:{}", url);
gatewayContext.setThrowable(new ResponseException(ResponseCode.REQUEST_TIMEOUT));
} else {
gatewayContext.setThrowable(new ConnectException(e, gatewayContext.getUniqueId(), url, ResponseCode.INTERNAL_ERROR));
}
}, () -> gatewayContext.setResponse(GatewayResponse.buildGatewayResponse(response)));
} catch (Exception e) {
gatewayContext.setThrowable(new ResponseException(ResponseCode.INTERNAL_ERROR));
log.error("complete error", e);
} finally {
gatewayContext.setState(ContextState.WRITTEN);
ResponseHelper.writeResponse(gatewayContext);
}
}
}