文章 62
浏览 15135
自研网关day07-实现过滤器链

自研网关day07-实现过滤器链

过滤器类型

按照作用区域划分,它可以划分为全局过滤以及局部过滤器
全局过滤器的作用是处理一切进入网关的请求和微服务响应,与 GatewayFilter 的作用一样。
区别在于 GatewayFilter 通过配置定义,处理逻辑是固定的;而 GlobalFilter 的逻辑需要自己写代码实现。
Gateway 内部有一个接口 名为 GlobalFilter,这个就是 Gateway 的全局过滤器接口,只要在应用中实现此接口后注册为 Spring 的 Bean,它就会就会帮我们将这个实现注册到全局过滤器链条里边去。

在执行路由过滤器的前面可以有多个过滤器,最后一个一定是路由过滤器,有且只有一个,然后开启一个异步的线程去对下游服务发起请求下游服务响应之后,直接调用 ctx.writeAndFlush 方法写回数据;如果中间出现任何异常,都进行执行异常逻辑,直接调用 ctx.writeAndFlush 方法写回数据最后最后执行一系列回调函数,这个我们也可以组成一个过滤器链条

**由此可见,当请求经过网关转发路由到下游服务器的时候,需要经过一系列网关过滤器,最终成功校验才会放行到开启异步线程访问下游服务请求,类比与 Netty 的 usafe 过滤器,**设计过滤器需要便于扩展,能够支持用户自定义过滤器

过滤器设计模型

过滤器核心模型:

  • 顶级接口:Filter,doFilter 方法
  • 切面:FilterAspect 注解标识过滤器的 order,id 等等标识
  • 工厂生产类:FilterFactory 过滤器抽象工厂
  • 过滤器链条:GatewayFilterChain
  • 过滤器链条工厂实现:GatewayFilterChainFactory

实现 GatewayFilterChainFactory

该类是一个单例对象,再第一次构造的时候,会再构造器中,通过 SPI 机制加载 ServiceLoader ,通过加载所有的实现了 Filter 的实现类,获取对应类上标注的FilterAspect 注解,将 filter 接口的元数据信息,filterId,name,order,放在 processorFilterIdMap 容器

 private GatewayFilterChainFactory() {
        ServiceLoader<Filter> serviceLoader = ServiceLoader.load(Filter.class);
        serviceLoader
                .stream()
                .map(ServiceLoader.Provider::get)
                .filter(Objects::nonNull)
                .filter(filter -> filter.getClass().isAnnotationPresent(FilterAspect.class))
                .forEach(filter -> {
                    FilterAspect annotation = filter.getClass().getAnnotation(FilterAspect.class);
                    String id = Optional.ofNullable(annotation.id())
                            .orElse(filter.getClass().getName());
                    processorFilterIdMap.putIfAbsent(id, filter);
                    log.info("handle filter success:{},{},{},{}", filter.getClass().getName(),
                            id, annotation.name(), annotation.order());
                });
    }

通过 gatewayContext 上下文对象中,获取 Rule 规则,这些规则有部分是从 nacoas 配置中心配置的,配置中心发生事件变动,网关会更新自己的配置规则 map 缓存,当组装 gatewayContext,会通过请求路径和服务 id,获取匹配的规则 Rule,填充到 gatewayContext 上下文中,当构建过滤器 chain 的时候,满足了匹配的规则,就会放在 chan 链表中,并且会根据 filter 排序,最后路由过滤器一定是放在最后,只有等请求经过所有过滤器后,才会执行转发路由到相应的下游服务器

  @Override
    public GatewayFilterChain buildFilterChain(GatewayContext ctx) {
        List<Filter> filters = new LinkedList<>();
        GatewayFilterChain chain = new GatewayFilterChain();
        Optional.ofNullable(ctx.getRule())
                .ifPresent(rule -> {
                    Set<Rule.FilterConfig> filterConfigs = rule.getFilterConfigs();
                    filterConfigs.forEach(filterConfig -> {
                        String id = filterConfig.getId();
                        Optional.ofNullable(id)
                                .map(this::getFilterInfo)
                                .ifPresent(filters::add);
                    });
                });
        chain.addFilter(new RouterFilter()).addFilter(filters);
        return chain;
    }

    @Override
    public Filter getFilterInfo(String filterId) {
        return processorFilterIdMap.get(filterId);
    }

触发过滤器时机

既然已经构造好了网关过滤器链 chain,那什么时候触发比较好呢,当请求经过网关的时候,会封装一个 gateway 上下文对象,开始执行路由转发之前,就得执行前置过滤器

    public GatewayContext doFilter(GatewayContext context) {
        if (CollectionUtils.isEmpty(filters)) {
            return context;
        }
        try {
            for (Filter filter : filters) {
                filter.doFilter(context);
            }
        } catch (Exception e) {
            log.error("执行过滤器发生异常,异常原因 ", e);
        }
        return context;
    }

最终循环处理所有 filter 组件的 doFilter 后(再新增 filter 已经排序了),最终会执行 RouterFilter 转发逻辑,开始异步发送请求

package com.xiaohu.core.router;

import com.xiaohu.common.constants.FilterConst;
import com.xiaohu.common.enums.ResponseCode;
import com.xiaohu.common.exception.ConnectException;
import com.xiaohu.common.exception.ResponseException;
import com.xiaohu.core.ConfigLoader;
import com.xiaohu.core.context.GatewayContext;
import com.xiaohu.core.context.enums.ContextState;
import com.xiaohu.core.filter.Filter;
import com.xiaohu.core.filter.FilterAspect;
import com.xiaohu.core.helper.AsyncHttpHelper;
import com.xiaohu.core.helper.ResponseHelper;
import com.xiaohu.core.response.GatewayResponse;
import lombok.extern.slf4j.Slf4j;
import org.asynchttpclient.Request;
import org.asynchttpclient.Response;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;

/**
 * @author xiaohugg
 * @version 1.0
 * @date 2024/4/28 22:00:27
 */
@Slf4j
@FilterAspect(id = FilterConst.ROUTER_FILTER_ID,
        name = FilterConst.ROUTER_FILTER_NAME,
        order = FilterConst.ROUTER_FILTER_ORDER)
public class RouterFilter implements Filter {
    @Override
    public void doFilter(GatewayContext gatewayContext) {
        route(gatewayContext);
    }

    private void route(GatewayContext gatewayContext) {
        Request request = gatewayContext.getRequest().build();
        CompletableFuture<Response> future = AsyncHttpHelper.getInstance().executeRequest(request);

        //单异步
        if (ConfigLoader.getInstance().getConfig().isWhenComplete()) {
            future.whenComplete(((response, throwable) -> complete(gatewayContext, response, request, throwable)));
        } else {
            //双异步
            future.whenCompleteAsync((response, throwable) -> complete(gatewayContext, response, request, throwable));
        }
    }

    private void complete(GatewayContext gatewayContext,
                          Response response,
                          Request request, Throwable throwable) {
        gatewayContext.releaseRequest();

        try {
            Optional.ofNullable(throwable).ifPresentOrElse(e -> {
                String url = request.getUrl();
                if (e instanceof TimeoutException) {
                    log.warn("request timeout,url:{}", url);
                    gatewayContext.setThrowable(new ResponseException(ResponseCode.REQUEST_TIMEOUT));
                } else {
                    gatewayContext.setThrowable(new ConnectException(e, gatewayContext.getUniqueId(), url, ResponseCode.INTERNAL_ERROR));
                }
            }, () -> gatewayContext.setResponse(GatewayResponse.buildGatewayResponse(response)));
        } catch (Exception e) {
            gatewayContext.setThrowable(new ResponseException(ResponseCode.INTERNAL_ERROR));
            log.error("complete error", e);
        } finally {
            gatewayContext.setState(ContextState.WRITTEN);
            ResponseHelper.writeResponse(gatewayContext);
        }
    }
}

标题:自研网关day07-实现过滤器链
作者:xiaohugg
地址:https://xiaohugg.top/articles/2024/04/29/1714360078024.html

人民有信仰 民族有希望 国家有力量