文章 62
浏览 15135
自研网关day04-集成netty和容器启动

自研网关day04-集成netty和容器启动

Netty

Netty 简单介绍

Netty 是一个 NIO 客户端服务器框架,它可以快速轻松地开发网络应用程序,如协议服务器和客户端。它极大地简化了网络编程,如 TCP 和 UDP 套接字服务器。

“快速和简单”并不意味着最终的应用程序将遭受可维护性或性能问题。Netty 经过精心设计,从许多协议(如 FTP,SMTP,HTTP 和各种二进制和基于文本的遗留协议)的实现中获得了经验。因此,Netty 成功地找到了一种方法,可以在不妥协的情况下实现开发的轻松性、性能、稳定性和灵活性。

Netty 特征

设计

  • 用于各种传输类型的统一 API-阻塞和非阻塞套接字
  • 基于灵活和可扩展的事件模型,允许明确的关注点分离
  • 高度可定制的线程模型-单线程,一个或多个线程池,如 SEDA
  • 真正的无连接数据报套接字支持(自 3.1 起)

Netty 架构

Netty 核心概念

  1. bootstrap、serverBootstrap:bootstrap 的意思是引导,其主要作用是配置整个 Netty 程序,将各个组件整合起来。serverBootstrap 是服务器端的引导类。
  2. eventLoop:eventLoop 维护了一个线程和任务队列,支持异步提交执行任务。
  3. eventLoopGroup:eventLoopGroup 主要是管理 eventLoop 的生命周期,可以将其看作是一个线程池,其内部维护了一组 eventLoop,每个 eventLoop 对应处理多个 Channel,而一个 Channel 只能对应一个 eventLoop。
  4. Channel:Channel 代表一个实体(如一个硬件设备、一个文件、一个网络套接字或者一个能够执行一个或者多个不同的 IO 操作的程序组件)的开放链接,如读操作和写操作。
  5. channelPipeLine:是一个包含 channelHandler 的 list,用来设置 channelHandler 的执行顺序。
  6. ChannelInitializer:它是一个特殊的 ChannelInboundHandler,当 channel 注册到 eventLoop 上面时,对 channel 进行初始化
  7. ChannelHandler:用来处理业务逻辑的代码,ChannelHandler 是一个父接口,ChannelnboundHandler 和 ChannelOutboundHandler 都继承了该接口,它们分别用来处理入站和出站。

Netty Reactor 工作架构

实现 NettyHttpServer

  1. 封装属性
  2. 实现构造方法
  3. 实现 init 方法
  4. epoll 优化
  5. 实现 start 方法
  6. 实现 shutdown 方法

**该类主要是用来启动加载 Netty 的相关组件,比如 serverBootstrap,eventLoop,等等,为了接口抽象更方便管理 Netty 的生命周期,定义了一个 **<span class="ne-text">lifeCycle</span> 接口 ,该接口主要有 init,start,shutdown 三个方法,有个接口实现类 <span class="ne-text"></span><strong><span class="ne-text">NettyHttpServer</span></strong> 用来管理 Netty 程序的初始化和启动和优雅关闭

package com.xiaohu.core.netty;

import com.xiaohu.common.utils.RemotingUtil;
import com.xiaohu.core.Config;
import com.xiaohu.core.LifeCycle;
import com.xiaohu.core.netty.processor.NettyProcessor;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;

/**
 * @author xiaohugg
 * @version 1.0
 * @date 2024/4/25 21:42:11
 */
@Slf4j
public class NettyHttpServer implements LifeCycle {

    private final Config config;
    private final NettyProcessor processor;
    private ServerBootstrap serverBootstrap;
    private EventLoopGroup eventLoopGroupBoss;
    private EventLoopGroup eventLoopGroupWorker;


    public NettyHttpServer(Config config, NettyProcessor processor) {
        this.config = config;
        this.processor = processor;
        this.init();
    }

    @Override
    public void init() {
        if (useEpoll()) {
            this.serverBootstrap = new ServerBootstrap();
            this.eventLoopGroupBoss = new EpollEventLoopGroup(config.getEventLoopGroupBossNum(), new DefaultThreadFactory("NettyHttpServerBoss-nio"));
            this.eventLoopGroupWorker = new EpollEventLoopGroup(config.getEventLoopGroupBossNum(), new DefaultThreadFactory("NettyHttpServerWorker-nio"));
        } else {
            this.serverBootstrap = new ServerBootstrap();
            this.eventLoopGroupBoss = new NioEventLoopGroup(config.getEventLoopGroupBossNum(), new DefaultThreadFactory("NettyHttpServerBoss-nio"));
            this.eventLoopGroupWorker = new NioEventLoopGroup(config.getEventLoopGroupBossNum(), new DefaultThreadFactory("NettyHttpServerWorker-nio"));
        }
    }

    public boolean useEpoll() {
        return RemotingUtil.isLinuxPlatform() && Epoll.isAvailable();
    }

    @Override
    public void start() {
        serverBootstrap.group(eventLoopGroupBoss, eventLoopGroupWorker)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .localAddress(new InetSocketAddress(config.getPort()))
                .childHandler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel channel) {
                        channel.pipeline().addLast(
                                new HttpServerCodec(),
                                new HttpObjectAggregator(config.getMaxContentLength()),
                                new NettyServerConnectManagerHandler(),
                                new NettyHttpServerHandler(processor)
                        );
                    }
                });

        try {
            this.serverBootstrap.bind().sync();
            log.info("server startup on port {}", this.config.getPort());
        } catch (Exception e) {
            log.error("NettyHttpServer start error", e);
        }
    }

    @Override
    public void shutdown() {
        if (eventLoopGroupBoss != null) {
            eventLoopGroupBoss.shutdownGracefully();
        }
        if (eventLoopGroupWorker != null) {
            eventLoopGroupWorker.shutdownGracefully();
        }
    }
}

实现 NettyHttpServerHandler

  1. 继承 ChannelInboundHandlerAdapter
  2. 实现 channelRead
  3. 把逻辑委托给 NettyProcessor

该类通过继承 Netty 的 ChannelInboundHandlerAdapter 类,重写里面的 channelRead 方法,从 msg 获解码获取相应的 fullRequest 对象和 channelHandleContext 上下文对象,移交给一个 requestWrapper 包装,通过策略模式实现不同的 processor 处理器进行各自 requestWrapper 的 processor 处理

package com.xiaohu.core.netty;

import com.xiaohu.core.context.HttpRequestWrapper;
import com.xiaohu.core.netty.processor.NettyProcessor;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.FullHttpRequest;

/**
 * @author xiaohugg
 * @version 1.0
 * @date 2024/4/25 22:00:10
 */
public class NettyHttpServerHandler extends ChannelInboundHandlerAdapter {

    private final NettyProcessor processor;

    public NettyHttpServerHandler(NettyProcessor processor) {
        this.processor = processor;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        FullHttpRequest request = (FullHttpRequest) msg;
        HttpRequestWrapper httpRequestWrapper = new HttpRequestWrapper(request, ctx);

        processor.process(httpRequestWrapper);
    }
}

实现 NettyProcessor

该类通过实现 processor 接口,实现 process 方法,定义自己的 processor 处理方法,解析处理 HttpRequestWrapper 对象,开始处理路由,通过配置类是否定义了双异步模式还是单异步模式,进行将 response 将数据异步返回,这里需要通过 nacos 的服务注册和服务发现动态获取服务元数据信息,得到一个 gateway 上下文对象,进行路由返回,通过 asyncHttpClient 将数据写回

package com.xiaohu.core.netty.processor;

import com.xiaohu.common.enums.ResponseCode;
import com.xiaohu.common.exception.BaseException;
import com.xiaohu.common.exception.ConnectException;
import com.xiaohu.common.exception.ResponseException;
import com.xiaohu.core.ConfigLoader;
import com.xiaohu.core.context.GatewayContext;
import com.xiaohu.core.context.HttpRequestWrapper;
import com.xiaohu.core.context.enums.ContextState;
import com.xiaohu.core.helper.AsyncHttpHelper;
import com.xiaohu.core.helper.RequestHelper;
import com.xiaohu.core.helper.ResponseHelper;
import com.xiaohu.core.response.GatewayResponse;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import org.asynchttpclient.Request;
import org.asynchttpclient.Response;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;

/**
 * @author xiaohugg
 * @version 1.0
 * @date 2024/4/25 22:10:25
 */
@Slf4j
public class NettyCoreProcessor implements NettyProcessor {
    @Override
    public void process(HttpRequestWrapper requestWrapper) {

        ChannelHandlerContext ctx = requestWrapper.getChannelHandlerContext();
        FullHttpRequest request = requestWrapper.getRequest();

        GatewayContext gatewayContext = RequestHelper.doContext(request, ctx);
        try {
            route(gatewayContext);
        } catch (BaseException e) {
            log.error("process error {} {}", e.getCode().getCode(), e.getCode().getMessage());
            FullHttpResponse httpResponse = ResponseHelper.getHttpResponse(e.getCode());
            doWriteAndRelease(ctx, request, httpResponse);
        } catch (Throwable t) {
            log.error("process unknown error", t);
            FullHttpResponse httpResponse = ResponseHelper.getHttpResponse(ResponseCode.INTERNAL_ERROR);
            doWriteAndRelease(ctx, request, httpResponse);
        }
    }

    private void doWriteAndRelease(ChannelHandlerContext ctx, FullHttpRequest request, FullHttpResponse httpResponse) {
        ctx.writeAndFlush(httpResponse)
                .addListener(ChannelFutureListener.CLOSE);
        ReferenceCountUtil.release(request);
    }

    private void route(GatewayContext gatewayContext) {
        Request request = gatewayContext.getRequest().build();
        CompletableFuture<Response> future = AsyncHttpHelper.getInstance().executeRequest(request);

        //单异步
        if (ConfigLoader.getInstance().getConfig().isWhenComplete()) {
            future.whenComplete(((response, throwable) -> {
                complete(gatewayContext, response, request, throwable);
            }));
        } else {
            //双异步
            future.whenCompleteAsync((response, throwable) -> {
                complete(gatewayContext, response, request, throwable);
            });
        }
    }

    private void complete(GatewayContext gatewayContext,
                          Response response,
                          Request request, Throwable throwable) {
        gatewayContext.releaseRequest();

        try {
            Optional.ofNullable(throwable).ifPresentOrElse(e -> {
                String url = request.getUrl();
                if (e instanceof TimeoutException) {
                    log.warn("request timeout,url:{}", url);
                    gatewayContext.setThrowable(new ResponseException(ResponseCode.REQUEST_TIMEOUT));
                } else {
                    gatewayContext.setThrowable(new ConnectException(e, gatewayContext.getUniqueId(), url, ResponseCode.INTERNAL_ERROR));
                }
            }, () -> gatewayContext.setResponse(GatewayResponse.buildGatewayResponse(response)));
        } catch (Exception e) {
            gatewayContext.setThrowable(new ResponseException(ResponseCode.INTERNAL_ERROR));
            log.error("complete error", e);
        } finally {
            gatewayContext.setState(ContextState.WRITTEN);
            ResponseHelper.writeResponse(gatewayContext);
        }
    }
}

实现生命周期 lifeCycle

策略管理每个组件的生命周期

核心容器实现

**当基本的核心组件已经开发完毕,需要有一个容器来维护这些组件统一管理,**也就是上图圈出来的那部分,需要有个容器来管理和创建这些组件,所以需要引入 <span class="ne-text">Container</span> 类,这个类也实现了 <span class="ne-text"> lifeCycle</span> 生命周期类,

  • init 方法

在 init 方法中,主要初始化 nettyCoreProcessor 和 neetyHttpServer 和 nettyHttpClien

  • start 方法

启动 Netty 容器,启动 nettyHttpServer 和 nettyHttpClient

  • shutdown

关闭 nettyHttpServer,nettyHttpClient

bootstrap 网关启动类

public class BootStrap {
    public static void main(String[] args) {
        ConfigLoader instance = ConfigLoader.getInstance();
        Config config = instance.load(args);

        new Container(config).start();
    }
}

验证网关路由

**启动了一个 SpringBoot 服务,有个接口访问返回 pong **

当正常访问 localhost:8080/http-demo/ping 是能够接受到接口返回信息

这时候启动自定义的网关服务 bootStrap 启动类

网关正常启动,默认网关端口是 8888,这时候我们访问通过网关看能否正确路由

可以看到通过访问 8888 网关,是能够正确路由到相应的服务

处理转发流程


标题:自研网关day04-集成netty和容器启动
作者:xiaohugg
地址:https://xiaohugg.top/articles/2024/04/25/1714058023915.html

人民有信仰 民族有希望 国家有力量