Netty
Netty 简单介绍
Netty 是一个 NIO 客户端服务器框架,它可以快速轻松地开发网络应用程序,如协议服务器和客户端。它极大地简化了网络编程,如 TCP 和 UDP 套接字服务器。
“快速和简单”并不意味着最终的应用程序将遭受可维护性或性能问题。Netty 经过精心设计,从许多协议(如 FTP,SMTP,HTTP 和各种二进制和基于文本的遗留协议)的实现中获得了经验。因此,Netty 成功地找到了一种方法,可以在不妥协的情况下实现开发的轻松性、性能、稳定性和灵活性。
Netty 特征
设计
- 用于各种传输类型的统一 API-阻塞和非阻塞套接字
- 基于灵活和可扩展的事件模型,允许明确的关注点分离
- 高度可定制的线程模型-单线程,一个或多个线程池,如 SEDA
- 真正的无连接数据报套接字支持(自 3.1 起)
Netty 架构
Netty 核心概念
- bootstrap、serverBootstrap:bootstrap 的意思是引导,其主要作用是配置整个 Netty 程序,将各个组件整合起来。serverBootstrap 是服务器端的引导类。
- eventLoop:eventLoop 维护了一个线程和任务队列,支持异步提交执行任务。
- eventLoopGroup:eventLoopGroup 主要是管理 eventLoop 的生命周期,可以将其看作是一个线程池,其内部维护了一组 eventLoop,每个 eventLoop 对应处理多个 Channel,而一个 Channel 只能对应一个 eventLoop。
- Channel:Channel 代表一个实体(如一个硬件设备、一个文件、一个网络套接字或者一个能够执行一个或者多个不同的 IO 操作的程序组件)的开放链接,如读操作和写操作。
- channelPipeLine:是一个包含 channelHandler 的 list,用来设置 channelHandler 的执行顺序。
- ChannelInitializer:它是一个特殊的 ChannelInboundHandler,当 channel 注册到 eventLoop 上面时,对 channel 进行初始化
- ChannelHandler:用来处理业务逻辑的代码,ChannelHandler 是一个父接口,ChannelnboundHandler 和 ChannelOutboundHandler 都继承了该接口,它们分别用来处理入站和出站。
Netty Reactor 工作架构
实现 NettyHttpServer
- 封装属性
- 实现构造方法
- 实现 init 方法
- epoll 优化
- 实现 start 方法
- 实现 shutdown 方法
**该类主要是用来启动加载 Netty 的相关组件,比如 serverBootstrap,eventLoop,等等,为了接口抽象更方便管理 Netty 的生命周期,定义了一个 **<span class="ne-text">lifeCycle</span>
接口 ,该接口主要有 init,start,shutdown 三个方法,有个接口实现类 <span class="ne-text"></span><strong><span class="ne-text">NettyHttpServer</span></strong>
用来管理 Netty 程序的初始化和启动和优雅关闭
package com.xiaohu.core.netty;
import com.xiaohu.common.utils.RemotingUtil;
import com.xiaohu.core.Config;
import com.xiaohu.core.LifeCycle;
import com.xiaohu.core.netty.processor.NettyProcessor;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
/**
* @author xiaohugg
* @version 1.0
* @date 2024/4/25 21:42:11
*/
@Slf4j
public class NettyHttpServer implements LifeCycle {
private final Config config;
private final NettyProcessor processor;
private ServerBootstrap serverBootstrap;
private EventLoopGroup eventLoopGroupBoss;
private EventLoopGroup eventLoopGroupWorker;
public NettyHttpServer(Config config, NettyProcessor processor) {
this.config = config;
this.processor = processor;
this.init();
}
@Override
public void init() {
if (useEpoll()) {
this.serverBootstrap = new ServerBootstrap();
this.eventLoopGroupBoss = new EpollEventLoopGroup(config.getEventLoopGroupBossNum(), new DefaultThreadFactory("NettyHttpServerBoss-nio"));
this.eventLoopGroupWorker = new EpollEventLoopGroup(config.getEventLoopGroupBossNum(), new DefaultThreadFactory("NettyHttpServerWorker-nio"));
} else {
this.serverBootstrap = new ServerBootstrap();
this.eventLoopGroupBoss = new NioEventLoopGroup(config.getEventLoopGroupBossNum(), new DefaultThreadFactory("NettyHttpServerBoss-nio"));
this.eventLoopGroupWorker = new NioEventLoopGroup(config.getEventLoopGroupBossNum(), new DefaultThreadFactory("NettyHttpServerWorker-nio"));
}
}
public boolean useEpoll() {
return RemotingUtil.isLinuxPlatform() && Epoll.isAvailable();
}
@Override
public void start() {
serverBootstrap.group(eventLoopGroupBoss, eventLoopGroupWorker)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(config.getPort()))
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) {
channel.pipeline().addLast(
new HttpServerCodec(),
new HttpObjectAggregator(config.getMaxContentLength()),
new NettyServerConnectManagerHandler(),
new NettyHttpServerHandler(processor)
);
}
});
try {
this.serverBootstrap.bind().sync();
log.info("server startup on port {}", this.config.getPort());
} catch (Exception e) {
log.error("NettyHttpServer start error", e);
}
}
@Override
public void shutdown() {
if (eventLoopGroupBoss != null) {
eventLoopGroupBoss.shutdownGracefully();
}
if (eventLoopGroupWorker != null) {
eventLoopGroupWorker.shutdownGracefully();
}
}
}
实现 NettyHttpServerHandler
- 继承 ChannelInboundHandlerAdapter
- 实现 channelRead
- 把逻辑委托给 NettyProcessor
该类通过继承 Netty 的 ChannelInboundHandlerAdapter 类,重写里面的 channelRead 方法,从 msg 获解码获取相应的 fullRequest 对象和 channelHandleContext 上下文对象,移交给一个 requestWrapper 包装,通过策略模式实现不同的 processor 处理器进行各自 requestWrapper 的 processor 处理
package com.xiaohu.core.netty;
import com.xiaohu.core.context.HttpRequestWrapper;
import com.xiaohu.core.netty.processor.NettyProcessor;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.FullHttpRequest;
/**
* @author xiaohugg
* @version 1.0
* @date 2024/4/25 22:00:10
*/
public class NettyHttpServerHandler extends ChannelInboundHandlerAdapter {
private final NettyProcessor processor;
public NettyHttpServerHandler(NettyProcessor processor) {
this.processor = processor;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
FullHttpRequest request = (FullHttpRequest) msg;
HttpRequestWrapper httpRequestWrapper = new HttpRequestWrapper(request, ctx);
processor.process(httpRequestWrapper);
}
}
实现 NettyProcessor
该类通过实现 processor 接口,实现 process 方法,定义自己的 processor 处理方法,解析处理 HttpRequestWrapper 对象,开始处理路由,通过配置类是否定义了双异步模式还是单异步模式,进行将 response 将数据异步返回,这里需要通过 nacos 的服务注册和服务发现动态获取服务元数据信息,得到一个 gateway 上下文对象,进行路由返回,通过 asyncHttpClient 将数据写回
package com.xiaohu.core.netty.processor;
import com.xiaohu.common.enums.ResponseCode;
import com.xiaohu.common.exception.BaseException;
import com.xiaohu.common.exception.ConnectException;
import com.xiaohu.common.exception.ResponseException;
import com.xiaohu.core.ConfigLoader;
import com.xiaohu.core.context.GatewayContext;
import com.xiaohu.core.context.HttpRequestWrapper;
import com.xiaohu.core.context.enums.ContextState;
import com.xiaohu.core.helper.AsyncHttpHelper;
import com.xiaohu.core.helper.RequestHelper;
import com.xiaohu.core.helper.ResponseHelper;
import com.xiaohu.core.response.GatewayResponse;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import org.asynchttpclient.Request;
import org.asynchttpclient.Response;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
/**
* @author xiaohugg
* @version 1.0
* @date 2024/4/25 22:10:25
*/
@Slf4j
public class NettyCoreProcessor implements NettyProcessor {
@Override
public void process(HttpRequestWrapper requestWrapper) {
ChannelHandlerContext ctx = requestWrapper.getChannelHandlerContext();
FullHttpRequest request = requestWrapper.getRequest();
GatewayContext gatewayContext = RequestHelper.doContext(request, ctx);
try {
route(gatewayContext);
} catch (BaseException e) {
log.error("process error {} {}", e.getCode().getCode(), e.getCode().getMessage());
FullHttpResponse httpResponse = ResponseHelper.getHttpResponse(e.getCode());
doWriteAndRelease(ctx, request, httpResponse);
} catch (Throwable t) {
log.error("process unknown error", t);
FullHttpResponse httpResponse = ResponseHelper.getHttpResponse(ResponseCode.INTERNAL_ERROR);
doWriteAndRelease(ctx, request, httpResponse);
}
}
private void doWriteAndRelease(ChannelHandlerContext ctx, FullHttpRequest request, FullHttpResponse httpResponse) {
ctx.writeAndFlush(httpResponse)
.addListener(ChannelFutureListener.CLOSE);
ReferenceCountUtil.release(request);
}
private void route(GatewayContext gatewayContext) {
Request request = gatewayContext.getRequest().build();
CompletableFuture<Response> future = AsyncHttpHelper.getInstance().executeRequest(request);
//单异步
if (ConfigLoader.getInstance().getConfig().isWhenComplete()) {
future.whenComplete(((response, throwable) -> {
complete(gatewayContext, response, request, throwable);
}));
} else {
//双异步
future.whenCompleteAsync((response, throwable) -> {
complete(gatewayContext, response, request, throwable);
});
}
}
private void complete(GatewayContext gatewayContext,
Response response,
Request request, Throwable throwable) {
gatewayContext.releaseRequest();
try {
Optional.ofNullable(throwable).ifPresentOrElse(e -> {
String url = request.getUrl();
if (e instanceof TimeoutException) {
log.warn("request timeout,url:{}", url);
gatewayContext.setThrowable(new ResponseException(ResponseCode.REQUEST_TIMEOUT));
} else {
gatewayContext.setThrowable(new ConnectException(e, gatewayContext.getUniqueId(), url, ResponseCode.INTERNAL_ERROR));
}
}, () -> gatewayContext.setResponse(GatewayResponse.buildGatewayResponse(response)));
} catch (Exception e) {
gatewayContext.setThrowable(new ResponseException(ResponseCode.INTERNAL_ERROR));
log.error("complete error", e);
} finally {
gatewayContext.setState(ContextState.WRITTEN);
ResponseHelper.writeResponse(gatewayContext);
}
}
}
实现生命周期 lifeCycle
策略管理每个组件的生命周期
核心容器实现
**当基本的核心组件已经开发完毕,需要有一个容器来维护这些组件统一管理,**也就是上图圈出来的那部分,需要有个容器来管理和创建这些组件,所以需要引入 <span class="ne-text">Container</span>
类,这个类也实现了 <span class="ne-text"> lifeCycle</span>
生命周期类,
- init 方法
在 init 方法中,主要初始化 nettyCoreProcessor 和 neetyHttpServer 和 nettyHttpClien
- start 方法
启动 Netty 容器,启动 nettyHttpServer 和 nettyHttpClient
- shutdown
关闭 nettyHttpServer,nettyHttpClient
bootstrap 网关启动类
public class BootStrap {
public static void main(String[] args) {
ConfigLoader instance = ConfigLoader.getInstance();
Config config = instance.load(args);
new Container(config).start();
}
}
验证网关路由
**启动了一个 SpringBoot 服务,有个接口访问返回 pong **
当正常访问 localhost:8080/http-demo/ping 是能够接受到接口返回信息
这时候启动自定义的网关服务 bootStrap 启动类
网关正常启动,默认网关端口是 8888,这时候我们访问通过网关看能否正确路由
可以看到通过访问 8888 网关,是能够正确路由到相应的服务