文章 62
浏览 15135
自研网关05-接入nacos服务注册和客户端集成

自研网关05-接入nacos服务注册和客户端集成

nacos 核心特性

服务注册及其元数据管理

Nacos 能让您从微服务平台建设的视角管理数据中心的所有服务及元数据

服务发现和服务健康监测

  • Nacos 支持基于 DNS 和基于 RPC 的服务发现
  • Nacos 提供对服务的实时的健康检查,阻止向不健康的主机或服务实例发送请求

动态配置服务

动态配置服务可以让您以中心化、外部化和动态化的方式管理所有环境的应用配置和服务配置

bootStrap 集成注册中心

当网关 bootstrap 启动的时候,需要收集 nacos 的服务注册中心相关客户端元数据配置,放在本地缓存 map,进行后续的网关路由等等操作,所以需要从注册中心 poll 操作,并且也需要将自己注册到服务中心

实现集成 nacos 服务注册中心

接口类图

通过该接口主要是控制客户端注册到 nacos 服务,以及相关初始化工作,注销等相关的生命周期活动,其中

subscribeAllServices 方法就是从 nacos 服务端拉取所以组,服务名对应的客户端元数据信息,存到网关一个

serviceInstanceMap 中,后续网关路由转发等操作就是从 serviceInstanceMap 中获取对应的元数据信息,实现动态路由转发

NacosRegisterCenter

package com.xiaohu.register.nacos;

import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingFactory;
import com.alibaba.nacos.api.naming.NamingMaintainFactory;
import com.alibaba.nacos.api.naming.NamingMaintainService;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.Event;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.Service;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.common.executor.NameThreadFactory;
import com.xiaohu.common.config.ServiceDefinition;
import com.xiaohu.common.config.ServiceInstance;
import com.xiaohu.common.constants.GatewayConst;
import com.xiaohu.gateway.register.api.RegisterCenter;
import com.xiaohu.gateway.register.api.RegisterCenterListener;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * @author xiaohugg
 * @version 1.0
 * @date 2024/4/27 15:26:45
 */
@Slf4j
public class NacosRegisterCenter implements RegisterCenter {
    /**
     * 注册中心侦听器列表
     */
    private final List<RegisterCenterListener> registerCenterListenerList = new ArrayList<>();
    /**
     * 环境
     */
    private String env;
    /**
     * 命名服务
     */
    private NamingService namingService;
    /**
     * 命名维护服务
     */
    private NamingMaintainService namingMaintainService;

    @Override
    public void init(String registerAddress, String env) {
        this.env = env;
        try {
            this.namingMaintainService = NamingMaintainFactory.createMaintainService(registerAddress);
            this.namingService = NamingFactory.createNamingService(registerAddress);
        } catch (NacosException e) {
            throw new IllegalArgumentException(e.getMessage(), e);
        }
    }

    @Override
    public void register(ServiceDefinition serviceDefinition, ServiceInstance serviceInstance) {

        try {
            //构造nacos实例信息
            Instance nacosInstance = new Instance();
            nacosInstance.setInstanceId(serviceInstance.getServiceInstanceId());
            nacosInstance.setPort(serviceInstance.getPort());
            nacosInstance.setIp(serviceInstance.getIp());
            nacosInstance.setMetadata(Map.of(GatewayConst.META_DATA_KEY,
                    JSON.toJSONString(serviceInstance)));

            //注册
            namingService.registerInstance(serviceDefinition.getServiceId(), env, nacosInstance);
            //更新服务定义
            namingMaintainService.updateService(serviceDefinition.getServiceId(), env, 0,
                    Map.of(GatewayConst.META_DATA_KEY, JSON.toJSONString(serviceDefinition)));

            log.info("register {} {}", serviceDefinition, serviceInstance);
        } catch (NacosException e) {
            log.error("register {} {} error", serviceDefinition, serviceInstance, e);
            throw new IllegalArgumentException(e.getMessage(), e);
        }
    }

    @Override
    public void deregister(ServiceDefinition serviceDefinition, ServiceInstance serviceInstance) {

        try {
            namingService.deregisterInstance(
                    serviceDefinition.getServiceId(),
                    env, serviceInstance.getIp(), serviceInstance.getPort()
            );
        } catch (NacosException e) {
            log.error("deregisterInstance {} {} error", serviceDefinition, serviceInstance, e);
            throw new IllegalArgumentException(e.getMessage(), e);
        }
    }

    @Override
    public void subscribeAllServices(RegisterCenterListener registerCenterListener) {
        this.registerCenterListenerList.add(registerCenterListener);
        doSubscribeAllServices();

        //可能有新服务加入,所以需要有一个定时任务来检查
        ScheduledExecutorService scheduledThreadPool = Executors
                .newScheduledThreadPool(1, new NameThreadFactory("doSubscribeAllServices"));
        scheduledThreadPool.scheduleWithFixedDelay(this::doSubscribeAllServices,
                10, 10, TimeUnit.SECONDS);
    }


    private void doSubscribeAllServices() {
        try {
            //已经订阅的服务
            Set<String> subscribeService = namingService.getSubscribeServices().stream()
                    .map(ServiceInfo::getName).collect(Collectors.toSet());

            int pageNo = 1;
            int pageSize = 100;

            //分页从nacos拿到服务列表
            List<String> serviseList = namingService
                    .getServicesOfServer(pageNo, pageSize, env).getData();

            while (CollectionUtils.isNotEmpty(serviseList)) {
                log.info("service list size {}", serviseList.size());

                for (String service : serviseList) {
                    if (subscribeService.contains(service)) {
                        continue;
                    }
                    //nacos事件监听器
                    EventListener eventListener = new NacosRegisterListener();
                    eventListener.onEvent(new NamingEvent(service, null));
                    namingService.subscribe(service, env, eventListener);
                    log.info("subscribe {} {}", service, env);
                }

                serviseList = namingService.getServicesOfServer(++pageNo, pageSize, env).getData();
            }

        } catch (NacosException e) {
            throw new IllegalArgumentException(e);
        }
    }

    public class NacosRegisterListener implements EventListener {
        @Override
        public void onEvent(Event event) {
            if (event instanceof NamingEvent namingEvent) {
                String serviceName = namingEvent.getServiceName();
                try {
                    Service service = namingMaintainService.queryService(serviceName, env);
                    ServiceDefinition serviceDefinition = JSON.parseObject(service.getMetadata().get(GatewayConst.META_DATA_KEY), ServiceDefinition.class);
                    Set<ServiceInstance> instanceSet = namingService.getAllInstances(serviceName, env)
                            .stream().map(instance -> JSON.parseObject(instance.getMetadata().get(GatewayConst.META_DATA_KEY), ServiceInstance.class))
                            .collect(Collectors.toSet());
                    registerCenterListenerList.forEach(listener -> listener.onChange(serviceDefinition, instanceSet));
                } catch (NacosException e) {
                    throw new IllegalArgumentException(e.getMessage(), e);
                }
            }
        }
    }
}

doSubscribeAllServices 核心方法,通过监听器模式,当有 nacos 服务发生变动事件,会自动触发 NamingEvent 事件,解析该事件能获取到相应改动的服务元数据信息,然后网关进行本地缓存更新

registerCenterListenerList.forEach(listener -> listener.onChange(serviceDefinition, instanceSet)); 进行推送绑定的监听器告知

客户端实现

客户端主要是充当 SDK 二方包,通过控制注解能够反射注册到 nacos,然后网关可以感知到下游配置元数据信息,通过 SpringBoot 自动装配机制,下游服务系统自动装配 sdk 的 bean

SpringBoot 自动装配

package com.xiaohu.gateway.client.core.autoconfigure;

import com.xiaohu.gateway.client.core.ApiProperties;
import com.xiaohu.gateway.client.support.dubbo.Dubbo27ClientRegisterManager;
import com.xiaohu.gateway.client.support.springmvc.SpringMVCClientRegisterManager;
import org.apache.dubbo.config.spring.ServiceBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.web.servlet.DispatcherServlet;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

import javax.servlet.Servlet;

/**
 * @author xiaohugg
 * @version 1.0
 * @date 2024/4/27 18:37:50
 */
@Configurable
@ConditionalOnProperty(prefix = "api", name = {"registerAddress"})
@EnableConfigurationProperties(ApiProperties.class)
public class ApiClientAutoConfiguration {

    @Autowired
    private ApiProperties apiProperties;

    @Bean
    @ConditionalOnClass({Servlet.class, DispatcherServlet.class, WebMvcConfigurer.class})
    @ConditionalOnMissingBean(SpringMVCClientRegisterManager.class)
    public SpringMVCClientRegisterManager springMVCClientRegisterManager() {
        return new SpringMVCClientRegisterManager(apiProperties);
    }

    @Bean
    @ConditionalOnClass({ServiceBean.class})
    @ConditionalOnMissingBean(Dubbo27ClientRegisterManager.class)
    public Dubbo27ClientRegisterManager dubbo27ClientRegisterManager() {
        return new Dubbo27ClientRegisterManager(apiProperties);
    }
}

ApiClientAutoConfiguration 就是用来自动装配 SDK 需要的 bean 对象

AbstractClientRegisterManager

该抽象类管理注册管理类,将注解标记的类注册到相应的服务中心,通过 SPI 机制,获取对应的注册中心实例

package com.xiaohu.gateway.client.support;

import com.xiaohu.common.config.ServiceDefinition;
import com.xiaohu.common.config.ServiceInstance;
import com.xiaohu.gateway.client.core.ApiProperties;
import com.xiaohu.gateway.register.api.RegisterCenter;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.ServiceLoader;

/**
 * @author xiaohugg
 * @version 1.0
 * @date 2024/4/27 17:40:06
 */
@Slf4j
public class AbstractClientRegisterManager {

    @Getter
    private final ApiProperties apiProperties;

    private final List<RegisterCenter> registerCenterList = new ArrayList<>();

    protected AbstractClientRegisterManager(ApiProperties apiProperties) {
        this.apiProperties = apiProperties;

        //初始化注册中心对象
        ServiceLoader<RegisterCenter> serviceLoader = ServiceLoader.load(RegisterCenter.class);
        serviceLoader.iterator().forEachRemaining(registerCenterList::add);
        registerCenterList.forEach(registerCenter -> registerCenter.init(apiProperties.getRegisterAddress(), apiProperties.getEnv()));
    }

    protected void register(ServiceDefinition serviceDefinition, ServiceInstance serviceInstance) {
        registerCenterList.forEach(registerCenter -> registerCenter.register(serviceDefinition, serviceInstance));
    }
}

nacos 配置中心

引入了服务注册,那也需要再接入配置中心,刚好 nacos,服务注册和配置中心都可以充当

从 naocs 获取配置跟从 nacos 获取服务信息,流程大同小异,都是通过 nacos 的 API 根据 group 和 namespace 获取对应的 service,通过自定义监听器模式,进行事件变更推送,修改本地缓存

package com.xiaohu.gateway.config.center.nacos;

import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import com.xiaohu.common.config.Rule;
import com.xiaohu.gateway.center.api.ConfigCenter;
import com.xiaohu.gateway.center.api.RulesChangeListener;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.concurrent.Executor;

/**
 * @author xiaohugg
 * @version 1.0
 * @date 2024/4/27 18:59:36
 */
@Slf4j
public class NacosConfigCenter implements ConfigCenter {

    private static final String DATA_ID = "api-gateway";

    private String env;

    private ConfigService configService;

    @Override
    public void init(String serverAddr, String env) {
        this.env = env;
        try {
            configService = NacosFactory.createConfigService(serverAddr);
        } catch (NacosException e) {
            throw new IllegalArgumentException("nacos config center init error", e);
        }
    }

    @Override
    public void subscribeRulesChange(RulesChangeListener listener) {
        try {
            //初始化通知
            String config = configService.getConfig(DATA_ID, env, 5000);
            if (config == null) {
                return;
            }
            //{"rules":[{}, {}]}
            log.info("config from nacos: {}", config);
            List<Rule> rules = JSON.parseObject(config).getJSONArray("rules").toJavaList(Rule.class);
            if (rules == null) {
                return;
            }
            listener.onRulesChange(rules);

            //监听变化
            configService.addListener(DATA_ID, env, new Listener() {
                @Override
                public Executor getExecutor() {
                    return null;
                }

                @Override
                public void receiveConfigInfo(String configInfo) {
                    log.info("config from nacos: {}", configInfo);
                    List<Rule> rules = JSON.parseObject(configInfo).getJSONArray("rules").toJavaList(Rule.class);
                    listener.onRulesChange(rules);
                }
            });

        } catch (NacosException e) {
            log.error("nacos config center subscribe error", e);
            throw new IllegalArgumentException(e);
        }
    }
}

bootStrap 加载 nacos 注册和配置

定义好了相关模块和功能,那肯定需要一个入口启动类,来定义好加载这些监听器回调函数,当网关程序已启动,就会自动注册和绑定配置,设置相应的监听器,关注各自的事件变更,推送更新本地缓存

   final RegisterCenter registerCenter = registerAndSubscribe(config);

        //服务优雅关机
        //收到kill信号时调用
        Runtime.getRuntime().addShutdownHook(new Thread(() -> registerCenter.deregister(buildGatewayServiceDefinition(config),
                buildGatewayServiceInstance(config))));
    }

    private static RegisterCenter registerAndSubscribe(Config config) {
        ServiceLoader<RegisterCenter> serviceLoader = ServiceLoader.load(RegisterCenter.class);
        final RegisterCenter registerCenter = serviceLoader.findFirst().orElseThrow(() -> {
            log.error("not found RegisterCenter impl");
            return new RuntimeException("not found RegisterCenter impl");
        });
        registerCenter.init(config.getRegistryAddress(), config.getEnv());

        //构造网关服务定义和服务实例
        ServiceDefinition serviceDefinition = buildGatewayServiceDefinition(config);
        ServiceInstance serviceInstance = buildGatewayServiceInstance(config);

        //注册
        registerCenter.register(serviceDefinition, serviceInstance);

        //订阅
        registerCenter.subscribeAllServices((serviceDefinition1, serviceInstanceSet) -> {
            log.info("refresh service and instance: {} {}", serviceDefinition1.getUniqueId(),
                    JSON.toJSON(serviceInstanceSet));
            DynamicConfigManager manager = DynamicConfigManager.getInstance();
            manager.addServiceInstance(serviceDefinition1.getUniqueId(), serviceInstanceSet);
        });
        return registerCenter;
    }

        //插件初始化
        //配置中心管理器初始化,连接配置中心,监听配置的新增、修改、删除
        ServiceLoader<ConfigCenter> serviceLoader = ServiceLoader.load(ConfigCenter.class);
        final ConfigCenter configCenter = serviceLoader.findFirst().orElseThrow(() -> {
            log.error("not found ConfigCenter impl");
            return new RuntimeException("not found ConfigCenter impl");
        });
        configCenter.init(config.getRegistryAddress(), config.getEnv());

        configCenter.subscribeRulesChange(rule -> DynamicConfigManager.getInstance().putAllRule(rule));

验证效果

后天有个 HTTP 服务,定义了一个 controller 标记了 @ApiService 注解,通过 maven 引入 client 的 sdk,会自动装配,就会自动扫描绑定注册到 nacos 中

package com.xiaohu.controller;

import com.xiaohu.gateway.client.core.ApiInvoker;
import com.xiaohu.gateway.client.core.ApiProperties;
import com.xiaohu.gateway.client.core.ApiProtocol;
import com.xiaohu.gateway.client.core.ApiService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@ApiService(serviceId = "backend-http-server", protocol = ApiProtocol.HTTP, patternPath = "/http-server/**")
public class PingController {

    @Autowired()
    private ApiProperties apiProperties;

    @ApiInvoker(path = "/http-server/ping")
    @GetMapping("/http-server/ping")
    public String ping() {
        log.info("{}", apiProperties);
        return "pong";
    }
}

当程序一启动,就会自动注册到 nacos 中

就会绑定自己的元数据信息到 ncaos,网关那边就能自动更新到本地缓存中

可以看到网关已经可以正常感知到有新服务注册到 nacos 了,相应的监听器就会自动触发,更新本地缓存


标题:自研网关05-接入nacos服务注册和客户端集成
作者:xiaohugg
地址:https://xiaohugg.top/articles/2024/04/27/1714233490099.html

人民有信仰 民族有希望 国家有力量