nacos 核心特性
服务注册及其元数据管理
Nacos 能让您从微服务平台建设的视角管理数据中心的所有服务及元数据
服务发现和服务健康监测
- Nacos 支持基于 DNS 和基于 RPC 的服务发现
- Nacos 提供对服务的实时的健康检查,阻止向不健康的主机或服务实例发送请求
动态配置服务
动态配置服务可以让您以中心化、外部化和动态化的方式管理所有环境的应用配置和服务配置
bootStrap 集成注册中心
当网关 bootstrap 启动的时候,需要收集 nacos 的服务注册中心相关客户端元数据配置,放在本地缓存 map,进行后续的网关路由等等操作,所以需要从注册中心 poll 操作,并且也需要将自己注册到服务中心
实现集成 nacos 服务注册中心
接口类图
通过该接口主要是控制客户端注册到 nacos 服务,以及相关初始化工作,注销等相关的生命周期活动,其中
subscribeAllServices 方法就是从 nacos 服务端拉取所以组,服务名对应的客户端元数据信息,存到网关一个
serviceInstanceMap 中,后续网关路由转发等操作就是从 serviceInstanceMap 中获取对应的元数据信息,实现动态路由转发
NacosRegisterCenter
package com.xiaohu.register.nacos;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingFactory;
import com.alibaba.nacos.api.naming.NamingMaintainFactory;
import com.alibaba.nacos.api.naming.NamingMaintainService;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.Event;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.Service;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.common.executor.NameThreadFactory;
import com.xiaohu.common.config.ServiceDefinition;
import com.xiaohu.common.config.ServiceInstance;
import com.xiaohu.common.constants.GatewayConst;
import com.xiaohu.gateway.register.api.RegisterCenter;
import com.xiaohu.gateway.register.api.RegisterCenterListener;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* @author xiaohugg
* @version 1.0
* @date 2024/4/27 15:26:45
*/
@Slf4j
public class NacosRegisterCenter implements RegisterCenter {
/**
* 注册中心侦听器列表
*/
private final List<RegisterCenterListener> registerCenterListenerList = new ArrayList<>();
/**
* 环境
*/
private String env;
/**
* 命名服务
*/
private NamingService namingService;
/**
* 命名维护服务
*/
private NamingMaintainService namingMaintainService;
@Override
public void init(String registerAddress, String env) {
this.env = env;
try {
this.namingMaintainService = NamingMaintainFactory.createMaintainService(registerAddress);
this.namingService = NamingFactory.createNamingService(registerAddress);
} catch (NacosException e) {
throw new IllegalArgumentException(e.getMessage(), e);
}
}
@Override
public void register(ServiceDefinition serviceDefinition, ServiceInstance serviceInstance) {
try {
//构造nacos实例信息
Instance nacosInstance = new Instance();
nacosInstance.setInstanceId(serviceInstance.getServiceInstanceId());
nacosInstance.setPort(serviceInstance.getPort());
nacosInstance.setIp(serviceInstance.getIp());
nacosInstance.setMetadata(Map.of(GatewayConst.META_DATA_KEY,
JSON.toJSONString(serviceInstance)));
//注册
namingService.registerInstance(serviceDefinition.getServiceId(), env, nacosInstance);
//更新服务定义
namingMaintainService.updateService(serviceDefinition.getServiceId(), env, 0,
Map.of(GatewayConst.META_DATA_KEY, JSON.toJSONString(serviceDefinition)));
log.info("register {} {}", serviceDefinition, serviceInstance);
} catch (NacosException e) {
log.error("register {} {} error", serviceDefinition, serviceInstance, e);
throw new IllegalArgumentException(e.getMessage(), e);
}
}
@Override
public void deregister(ServiceDefinition serviceDefinition, ServiceInstance serviceInstance) {
try {
namingService.deregisterInstance(
serviceDefinition.getServiceId(),
env, serviceInstance.getIp(), serviceInstance.getPort()
);
} catch (NacosException e) {
log.error("deregisterInstance {} {} error", serviceDefinition, serviceInstance, e);
throw new IllegalArgumentException(e.getMessage(), e);
}
}
@Override
public void subscribeAllServices(RegisterCenterListener registerCenterListener) {
this.registerCenterListenerList.add(registerCenterListener);
doSubscribeAllServices();
//可能有新服务加入,所以需要有一个定时任务来检查
ScheduledExecutorService scheduledThreadPool = Executors
.newScheduledThreadPool(1, new NameThreadFactory("doSubscribeAllServices"));
scheduledThreadPool.scheduleWithFixedDelay(this::doSubscribeAllServices,
10, 10, TimeUnit.SECONDS);
}
private void doSubscribeAllServices() {
try {
//已经订阅的服务
Set<String> subscribeService = namingService.getSubscribeServices().stream()
.map(ServiceInfo::getName).collect(Collectors.toSet());
int pageNo = 1;
int pageSize = 100;
//分页从nacos拿到服务列表
List<String> serviseList = namingService
.getServicesOfServer(pageNo, pageSize, env).getData();
while (CollectionUtils.isNotEmpty(serviseList)) {
log.info("service list size {}", serviseList.size());
for (String service : serviseList) {
if (subscribeService.contains(service)) {
continue;
}
//nacos事件监听器
EventListener eventListener = new NacosRegisterListener();
eventListener.onEvent(new NamingEvent(service, null));
namingService.subscribe(service, env, eventListener);
log.info("subscribe {} {}", service, env);
}
serviseList = namingService.getServicesOfServer(++pageNo, pageSize, env).getData();
}
} catch (NacosException e) {
throw new IllegalArgumentException(e);
}
}
public class NacosRegisterListener implements EventListener {
@Override
public void onEvent(Event event) {
if (event instanceof NamingEvent namingEvent) {
String serviceName = namingEvent.getServiceName();
try {
Service service = namingMaintainService.queryService(serviceName, env);
ServiceDefinition serviceDefinition = JSON.parseObject(service.getMetadata().get(GatewayConst.META_DATA_KEY), ServiceDefinition.class);
Set<ServiceInstance> instanceSet = namingService.getAllInstances(serviceName, env)
.stream().map(instance -> JSON.parseObject(instance.getMetadata().get(GatewayConst.META_DATA_KEY), ServiceInstance.class))
.collect(Collectors.toSet());
registerCenterListenerList.forEach(listener -> listener.onChange(serviceDefinition, instanceSet));
} catch (NacosException e) {
throw new IllegalArgumentException(e.getMessage(), e);
}
}
}
}
}
doSubscribeAllServices 核心方法,通过监听器模式,当有 nacos 服务发生变动事件,会自动触发 NamingEvent 事件,解析该事件能获取到相应改动的服务元数据信息,然后网关进行本地缓存更新
registerCenterListenerList.forEach(listener -> listener.onChange(serviceDefinition, instanceSet)); 进行推送绑定的监听器告知
客户端实现
客户端主要是充当 SDK 二方包,通过控制注解能够反射注册到 nacos,然后网关可以感知到下游配置元数据信息,通过 SpringBoot 自动装配机制,下游服务系统自动装配 sdk 的 bean
SpringBoot 自动装配
package com.xiaohu.gateway.client.core.autoconfigure;
import com.xiaohu.gateway.client.core.ApiProperties;
import com.xiaohu.gateway.client.support.dubbo.Dubbo27ClientRegisterManager;
import com.xiaohu.gateway.client.support.springmvc.SpringMVCClientRegisterManager;
import org.apache.dubbo.config.spring.ServiceBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.web.servlet.DispatcherServlet;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import javax.servlet.Servlet;
/**
* @author xiaohugg
* @version 1.0
* @date 2024/4/27 18:37:50
*/
@Configurable
@ConditionalOnProperty(prefix = "api", name = {"registerAddress"})
@EnableConfigurationProperties(ApiProperties.class)
public class ApiClientAutoConfiguration {
@Autowired
private ApiProperties apiProperties;
@Bean
@ConditionalOnClass({Servlet.class, DispatcherServlet.class, WebMvcConfigurer.class})
@ConditionalOnMissingBean(SpringMVCClientRegisterManager.class)
public SpringMVCClientRegisterManager springMVCClientRegisterManager() {
return new SpringMVCClientRegisterManager(apiProperties);
}
@Bean
@ConditionalOnClass({ServiceBean.class})
@ConditionalOnMissingBean(Dubbo27ClientRegisterManager.class)
public Dubbo27ClientRegisterManager dubbo27ClientRegisterManager() {
return new Dubbo27ClientRegisterManager(apiProperties);
}
}
ApiClientAutoConfiguration 就是用来自动装配 SDK 需要的 bean 对象
AbstractClientRegisterManager
该抽象类管理注册管理类,将注解标记的类注册到相应的服务中心,通过 SPI 机制,获取对应的注册中心实例
package com.xiaohu.gateway.client.support;
import com.xiaohu.common.config.ServiceDefinition;
import com.xiaohu.common.config.ServiceInstance;
import com.xiaohu.gateway.client.core.ApiProperties;
import com.xiaohu.gateway.register.api.RegisterCenter;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.ServiceLoader;
/**
* @author xiaohugg
* @version 1.0
* @date 2024/4/27 17:40:06
*/
@Slf4j
public class AbstractClientRegisterManager {
@Getter
private final ApiProperties apiProperties;
private final List<RegisterCenter> registerCenterList = new ArrayList<>();
protected AbstractClientRegisterManager(ApiProperties apiProperties) {
this.apiProperties = apiProperties;
//初始化注册中心对象
ServiceLoader<RegisterCenter> serviceLoader = ServiceLoader.load(RegisterCenter.class);
serviceLoader.iterator().forEachRemaining(registerCenterList::add);
registerCenterList.forEach(registerCenter -> registerCenter.init(apiProperties.getRegisterAddress(), apiProperties.getEnv()));
}
protected void register(ServiceDefinition serviceDefinition, ServiceInstance serviceInstance) {
registerCenterList.forEach(registerCenter -> registerCenter.register(serviceDefinition, serviceInstance));
}
}
nacos 配置中心
引入了服务注册,那也需要再接入配置中心,刚好 nacos,服务注册和配置中心都可以充当
从 naocs 获取配置跟从 nacos 获取服务信息,流程大同小异,都是通过 nacos 的 API 根据 group 和 namespace 获取对应的 service,通过自定义监听器模式,进行事件变更推送,修改本地缓存
package com.xiaohu.gateway.config.center.nacos;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import com.xiaohu.common.config.Rule;
import com.xiaohu.gateway.center.api.ConfigCenter;
import com.xiaohu.gateway.center.api.RulesChangeListener;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.Executor;
/**
* @author xiaohugg
* @version 1.0
* @date 2024/4/27 18:59:36
*/
@Slf4j
public class NacosConfigCenter implements ConfigCenter {
private static final String DATA_ID = "api-gateway";
private String env;
private ConfigService configService;
@Override
public void init(String serverAddr, String env) {
this.env = env;
try {
configService = NacosFactory.createConfigService(serverAddr);
} catch (NacosException e) {
throw new IllegalArgumentException("nacos config center init error", e);
}
}
@Override
public void subscribeRulesChange(RulesChangeListener listener) {
try {
//初始化通知
String config = configService.getConfig(DATA_ID, env, 5000);
if (config == null) {
return;
}
//{"rules":[{}, {}]}
log.info("config from nacos: {}", config);
List<Rule> rules = JSON.parseObject(config).getJSONArray("rules").toJavaList(Rule.class);
if (rules == null) {
return;
}
listener.onRulesChange(rules);
//监听变化
configService.addListener(DATA_ID, env, new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String configInfo) {
log.info("config from nacos: {}", configInfo);
List<Rule> rules = JSON.parseObject(configInfo).getJSONArray("rules").toJavaList(Rule.class);
listener.onRulesChange(rules);
}
});
} catch (NacosException e) {
log.error("nacos config center subscribe error", e);
throw new IllegalArgumentException(e);
}
}
}
bootStrap 加载 nacos 注册和配置
定义好了相关模块和功能,那肯定需要一个入口启动类,来定义好加载这些监听器回调函数,当网关程序已启动,就会自动注册和绑定配置,设置相应的监听器,关注各自的事件变更,推送更新本地缓存
final RegisterCenter registerCenter = registerAndSubscribe(config);
//服务优雅关机
//收到kill信号时调用
Runtime.getRuntime().addShutdownHook(new Thread(() -> registerCenter.deregister(buildGatewayServiceDefinition(config),
buildGatewayServiceInstance(config))));
}
private static RegisterCenter registerAndSubscribe(Config config) {
ServiceLoader<RegisterCenter> serviceLoader = ServiceLoader.load(RegisterCenter.class);
final RegisterCenter registerCenter = serviceLoader.findFirst().orElseThrow(() -> {
log.error("not found RegisterCenter impl");
return new RuntimeException("not found RegisterCenter impl");
});
registerCenter.init(config.getRegistryAddress(), config.getEnv());
//构造网关服务定义和服务实例
ServiceDefinition serviceDefinition = buildGatewayServiceDefinition(config);
ServiceInstance serviceInstance = buildGatewayServiceInstance(config);
//注册
registerCenter.register(serviceDefinition, serviceInstance);
//订阅
registerCenter.subscribeAllServices((serviceDefinition1, serviceInstanceSet) -> {
log.info("refresh service and instance: {} {}", serviceDefinition1.getUniqueId(),
JSON.toJSON(serviceInstanceSet));
DynamicConfigManager manager = DynamicConfigManager.getInstance();
manager.addServiceInstance(serviceDefinition1.getUniqueId(), serviceInstanceSet);
});
return registerCenter;
}
//插件初始化
//配置中心管理器初始化,连接配置中心,监听配置的新增、修改、删除
ServiceLoader<ConfigCenter> serviceLoader = ServiceLoader.load(ConfigCenter.class);
final ConfigCenter configCenter = serviceLoader.findFirst().orElseThrow(() -> {
log.error("not found ConfigCenter impl");
return new RuntimeException("not found ConfigCenter impl");
});
configCenter.init(config.getRegistryAddress(), config.getEnv());
configCenter.subscribeRulesChange(rule -> DynamicConfigManager.getInstance().putAllRule(rule));
验证效果
后天有个 HTTP 服务,定义了一个 controller 标记了 @ApiService 注解,通过 maven 引入 client 的 sdk,会自动装配,就会自动扫描绑定注册到 nacos 中
package com.xiaohu.controller;
import com.xiaohu.gateway.client.core.ApiInvoker;
import com.xiaohu.gateway.client.core.ApiProperties;
import com.xiaohu.gateway.client.core.ApiProtocol;
import com.xiaohu.gateway.client.core.ApiService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
@ApiService(serviceId = "backend-http-server", protocol = ApiProtocol.HTTP, patternPath = "/http-server/**")
public class PingController {
@Autowired()
private ApiProperties apiProperties;
@ApiInvoker(path = "/http-server/ping")
@GetMapping("/http-server/ping")
public String ping() {
log.info("{}", apiProperties);
return "pong";
}
}
当程序一启动,就会自动注册到 nacos 中
就会绑定自己的元数据信息到 ncaos,网关那边就能自动更新到本地缓存中
可以看到网关已经可以正常感知到有新服务注册到 nacos 了,相应的监听器就会自动触发,更新本地缓存