文章 62
浏览 15135
spring的STOMP落地实践

spring的STOMP落地实践

背景

公司有个主系统 A,这个系统将来用途作为 SaaS 服务系统,设计尽可能保持纯粹,不要引入过多的中间件技术

其下有 B,C 甚至未来还有多个子系统,现在的需求是需要当子系统 B,C 开启了某个配置,需要对接 A 系统,A 系统能够提供 B,C 系统的能力赋能,当 BC 断开配置,B,C 又是个纯粹可以单独运行的系统,某个系统不共用同一套数据,拥有自己的环境和数据,当子系统开启了配置,相关接口功能需要限制,由主系统提供能力,断开配置,A.B,C 三个系统能够独立运行

功能拆分

主系统:

对主系统相关的增量和全量接口数据进行同步传输:这里有两个关键点

  • 有许多增量接口需要进行同步传输&&传输数据的逻辑不能影响原先的业务逻辑(通过 AOP 注解)
  • 需要找到一个类型消息队列 mq 那种形势进行数据同步,A 系统数据发生改动子系统能够感知

但是这个系统不能引入过多的中间件,最终经过调研,选择使用了 Spring 的 STOMP 和 Spring event 事件实现一种伪消息队列

子系统:

  • 子系统相关接口限制:通过 AOP 接口限制
  • 子系统需要接收主系统的全量和增量数据 (STOMP+AOP+SDK)

最终将功能细节拆分后,对于这个需求设计选型最终采用:

Spring STOMP+ Spring EVENT+AOP+SDK+ 线程池进行业务线程隔离

时序图

  • 全量数据

  • 增量数据同步

设计伪消息队列

客户端

TenantClient

用来与 A 系统进行对接的客户端

send 方法

通过这方法客户端也就是目前的 B,C 系统可以主动向 A 系统发送消息

  • path: 发送的接口路径
  • payload: 发送的消息体
  • stompSession:Spring stomp 的与服务端的连接对象
public void send(String path, Object payload) throws DisconnectException {
        if (stompSession == null) {
            throw new DisconnectException();
        } else {
            stompSession.send(APPLICATION_DESTINATION_PREFIX + path, payload);
        }
    }

subscribe 方法

进行客户端订阅事件,与服务端进行绑定

 /**
     * 订阅指定路径的事件,可以多次订阅
     *
     * @param path     订阅的路径
     * @param observer 消费者
     * @param <T>      消息类型
     */
    @SuppressWarnings("unchecked")
    public <T> void subscribe(String path, Observer<T> observer) {
        subscribe(path, new MessageHandler() {
            @Override
            public Type getType() {
                return observer.getType();
            }

            @Override
            public void handle(Object payload) {
                observer.handle((T) payload);
            }
        });
    }
    /**
     * 订阅指定路径的事件,可以多次订阅
     *
     * @param path    订阅的路径
     * @param handler 消费者
     */
    public void subscribe(String path, MessageHandler handler) {
        subscriptions.compute(path, (key, oldValue) -> {
            List<TenantSubscription> value = oldValue == null ? new CopyOnWriteArrayList<>() : oldValue;
            value.add(new TenantSubscription(handler));
            return value;
        });
    }
  private final ConcurrentMap<String, List<TenantSubscription>> subscriptions = new ConcurrentHashMap<>();

通过一个 ConcurrentMap 容器存储 key:客户端接口路径 value:消费者列表 支持一个路径对应多个客户端,防止读写并发 使用 CopyOnWriteArrayList

start 方法:

容器启动使用使用 Spring 事件 调用 start 方法

 @EventListener
    public void onAppStarted(ApplicationStartedEvent ignored) {
        tenantClient().start();
    }

destory 方法:

容器销毁执行

 @PreDestroy
    public void destroy() {
        tenantClient().stop();
    }
public class TenantClient {
    private static final Logger logger = LoggerFactory.getLogger(TenantClient.class);
    public static final String APPLICATION_DESTINATION_PREFIX = "/app";
    public static final String TOPIC_PREFIX = "/topic";
    public static final String DATE_TIME_FORMATTER_PATTERN = "yyyy-MM-dd HH🇲🇲ss";

    /** 配置信息 */
    private final TenantConfig config;

    /** STOMP客户端 */
    private final StompClient client;

    /** 订阅映射: 订阅路径 -> 消费者列表 */
    private final ConcurrentMap<String, List<TenantSubscription>> subscriptions = new ConcurrentHashMap<>();

    private Runnable onConnectedListener;

    /** 是否已开启 */
    private final AtomicBoolean started = new AtomicBoolean();

    /** 当前与服务端的连接 */
    private StompSession stompSession;

    /** LDAP数据源 */
    private BaseLdapPathContextSource contextSource;

    private ObjectMapper objectMapper = createObjectMapper();

    public TenantClient(TenantConfig config) {
        this.config = Objects.requireNonNull(config);
        String url = config.getBaseUrl() + config.getPath();
        this.client = new StompClient(url, new TenantStompSessionHandler(), config)
                .setReconnectInterval(config.getReconnectInterval());
        setObjectMapper(objectMapper);
    }

    /**
     * 配置用于传输数据的JSON序列化对象
     *
     * @param objectMapper objectMapper
     */
    public void setObjectMapper(ObjectMapper objectMapper) {
        this.objectMapper = Objects.requireNonNull(objectMapper);
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setObjectMapper(objectMapper);
        this.client.setMessageConverter(converter);
    }

    /**
     * 配置LDAP数据源
     *
     * @param contextSource LDAP数据源
     */
    public void setContextSource(BaseLdapPathContextSource contextSource) {
        this.contextSource = contextSource;
    }

    /**
     * 配置任务调度器,用于心跳检测
     *
     * @param taskScheduler 任务调度器
     */
    public void setTaskScheduler(TaskScheduler taskScheduler) {
        this.client.setTaskScheduler(taskScheduler);
    }

    /**
     * 开始连接
     *
     * @return 连接结果Future
     */
    @SuppressWarnings({"unchecked", "rawtypes"})
    public ListenableFuture<Object> start() {
        if (!started.compareAndSet(false, true)) {
            throw new IllegalStateException("不能重复启动");
        }
        return (ListenableFuture) client.connect();
    }

    /**
     * 关闭客户端,断开连接
     */
    public void stop() {
        if (started.compareAndSet(true, false)) {
            if (stompSession != null) {
                stompSession.disconnect();
            }
            stompSession = null;
            client.stop();
        }
    }

    /**
     * 订阅指定路径的事件,可以多次订阅
     *
     * @param path     订阅的路径
     * @param observer 消费者
     * @param <T>      消息类型
     */
    @SuppressWarnings("unchecked")
    public <T> void subscribe(String path, Observer<T> observer) {
        subscribe(path, new MessageHandler() {
            @Override
            public Type getType() {
                return observer.getType();
            }

            @Override
            public void handle(Object payload) {
                observer.handle((T) payload);
            }
        });
    }

    /**
     * 订阅指定路径的事件,可以多次订阅
     *
     * @param path    订阅的路径
     * @param handler 消费者
     */
    public void subscribe(String path, MessageHandler handler) {
        subscriptions.compute(path, (key, oldValue) -> {
            List<TenantSubscription> value = oldValue == null ? new CopyOnWriteArrayList<>() : oldValue;
            value.add(new TenantSubscription(handler));
            return value;
        });
    }

    /**
     * 取消指定路径的订阅
     *
     * @param path 取消订阅的路径
     * @return 是否有消费者被取消
     */
    public boolean unsubscribe(String path) {
        List<TenantSubscription> list = subscriptions.remove(path);
        if (list == null) return false;

        for (TenantSubscription it : list) {
            if (it.getSubscription() != null) {
                it.getSubscription().unsubscribe();
            }
        }
        return true;
    }

    /**
     * 发送消息到租户融合中心
     *
     * @param path    消息路径
     * @param payload 消息内容
     * @throws DisconnectException 如果未连接到客户端
     */
    public void send(String path, Object payload) throws DisconnectException {
        if (stompSession == null) {
            throw new DisconnectException();
        } else {
            stompSession.send(APPLICATION_DESTINATION_PREFIX + path, payload);
        }
    }

    /**
     * 用户身份认证
     *
     * @param username 用户名
     * @param password 密码
     * @return true为认证通过,false为用户名或密码不正确
     */
    public boolean auth(String username, String password) {
        try {
            Name dn = contextSource.getBaseLdapName()
                    .add(config.getUserBase())
                    .add(MessageFormat.format(config.getUserRdn(), username));
            contextSource.getContext(dn.toString(), password);
            return true;
        } catch (NamingException ex) {
            if ((ex instanceof org.springframework.ldap.AuthenticationException)
                    || (ex instanceof org.springframework.ldap.OperationNotSupportedException)) {
                return false;
            } else {
                throw ex;
            }
        } catch (InvalidNameException e) {
            throw new IllegalArgumentException(e);
        }
    }

    public void onConnected(Runnable onConnectedListener) {
        this.onConnectedListener = onConnectedListener;
    }

    public boolean isConnected() {
        return stompSession != null;
    }

    private static ObjectMapper createObjectMapper() {
        ObjectMapper mapper = new ObjectMapper();
        mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
        mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        mapper.disable(SerializationFeature.WRITE_DATE_TIMESTAMPS_AS_NANOSECONDS);

        // 设置默认日期格式
        mapper.setDateFormat(new SimpleDateFormat(DATE_TIME_FORMATTER_PATTERN));
        mapper.registerModule(new JavaTimeModule());
        // 修改LocalDateTime的默认格式
        mapper.registerModule(createTimeModule());
        return mapper;
    }

    private static SimpleModule createTimeModule() {
        DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(DATE_TIME_FORMATTER_PATTERN);
        SimpleModule timeModule = new SimpleModule();
        timeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(dateTimeFormatter));
        timeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(dateTimeFormatter));
        return timeModule;
    }

    private class TenantStompSessionHandler extends StompSessionHandlerAdapter {
        @Override
        public void afterConnected(@NonNull StompSession session, @NonNull StompHeaders connectedHeaders) {
            logger.info("与租户融合中心建立连接: {}", config.getBaseUrl());
            stompSession = session;
            subscriptions.forEach((path, list) -> {
                for (TenantSubscription subscription : list) {
                    registerSubscription(session, path, subscription);
                }
            });

            if (onConnectedListener != null) {
                try {
                    onConnectedListener.run();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }

        private void registerSubscription(StompSession session, String path, TenantSubscription subscription) {
            StompSession.Subscription sub = session.subscribe(TOPIC_PREFIX + path, new StompFrameHandler() {
                @Override
                public @NonNull Type getPayloadType(@NonNull StompHeaders headers) {
                    Type type = subscription.getHandler().getType();
                    return type instanceof ParameterizedType ? byte[].class : type;
                }

                @Override
                public void handleFrame(@NonNull StompHeaders headers, Object payload) {
                    String path = getPath(headers);
                    String errorMessage = null;
                    try {
                        Type type = subscription.getHandler().getType();
                        if (type instanceof ParameterizedType) {
                            // 处理泛型参数
                            payload = objectMapper.readValue((
                                    byte[]) payload, TypeFactory.defaultInstance().constructType(type));
                        }

                        subscription.getHandler().handle(payload);
                    } catch (Exception e) {
                        logger.error("处理消息[{}]出错. payload: {}, error: {}",
                                path, payload, e.getMessage(), e);
                        errorMessage = e.getMessage();
                    } finally {
                        sendClientLog(path, payload, errorMessage);
                    }
                }
            });
            subscription.setSubscription(sub);
        }

        @Override
        public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {
            logger.error("handleException: {}", exception.getMessage(), exception);
        }

        @Override
        public void handleTransportError(StompSession session, Throwable exception) {
            if (exception instanceof ConnectionLostException) {
                stompSession = null;
            }
            logger.error("handleTransportError: {}", exception.getMessage(), exception);
        }

        private String getPath(StompHeaders headers) {
            String destination = Objects.requireNonNull(headers.getDestination());
            return destination.startsWith(TOPIC_PREFIX) ? destination.substring(TOPIC_PREFIX.length()) : destination;
        }

        private void sendClientLog(String path, Object payload, String errorMessage) {
            try {
                LogEvent logEvent = new LogEvent();
                if (errorMessage == null) {
                    logEvent.setContent(String.format("处理消息[%s]成功", path));
                } else {
                    logEvent.setContent(String.format("处理消息[%s]失败: %s. payload: %s", path, errorMessage, payload));
                }
                logEvent.setSuccess(errorMessage == null);
                logEvent.setCreateTime(LocalDateTime.now());
                send(TenantPaths.CLIENT_LOG, logEvent);
            } catch (Exception e) {
                logger.error("发送调用日志出错: {}", e.getMessage(), e);
            }
        }
    }

    @RequiredArgsConstructor
    @Getter
    @Setter
    private static class TenantSubscription {
        private final MessageHandler handler;
        private StompSession.Subscription subscription;
    }
}

使用 @ConditionalOnProperty 注解,只有当配置文件开关开启,才会加载这个配置类,才会初始化 TenantClient 这个 Bean

@ConditionalOnProperty(value = "xxxx.enable")

@ TenantObserver 注解

客户端方法添加该接口,可以接收到生产者订阅增量接口发送的数据

@TenantObserver(PULL_PROJECT_MEMBER_REQUEST)
    public void pullSyncProjectMemberData(List<ProjectMemberDto> syncProjectMemberDtoList) {

        Map<Integer, List<UserProjectPermission>> existingProjectMembersMap = projectPermissionService.findAll()
                .stream()
                .collect(Collectors.groupingBy(UserProjectPermission::getUserId));


        Map<String, List<ProjectMemberDto>> incomingUsersMap = syncProjectMemberDtoList
                .stream()
                .collect(Collectors.groupingBy(ProjectMemberDto::getUsername));

        Map<String, Integer> idAndNameUserMap = userService2.findByUsernames(incomingUsersMap.keySet())
                .stream()
                .collect(Collectors.toMap(User::getUserName, User::getId));


        Map<Integer, List<ProjectMemberDto>> idIncomingUsersMap = incomingUsersMap.entrySet()
                .stream()
                .filter(entry -> idAndNameUserMap.containsKey(entry.getKey()))
                .collect(Collectors.toMap(
                        entry -> idAndNameUserMap.get(entry.getKey()),
                        Map.Entry::getValue,
                        (existing, replacement) -> replacement));

        CompletableFuture.runAsync(() -> processNewAndUpdatedPermissions(idIncomingUsersMap));

        processDeletePermission(existingProjectMembersMap, idIncomingUsersMap.keySet());
    }

消费者接口幂等性

因为生产者可能迟迟等不到消费者的确认 confirm 反馈,会不断重试轮询消息表,消费者接口需要保证数据的一致性,会将当前消费的消息存储起来,数据库存在该消息,则直接返回 confirm 确认

消费者成功消费消息后,callBack 回调 tenant.send 方法给生产者,返回当前事务 id 和 confirm 确认机制

消费者如何绑定 subscribe 方法呢?

客户端订阅接口需要绑定发送,利用 AOP 注解或者实现 BeanPostProcessor 这个 Spring bean 后置处理器

@RequiredArgsConstructor
public class TenantObserverBeanProcessor implements BeanPostProcessor {
    private final TenantClient client;

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);

        if (AnnotationUtils.isCandidateClass(targetClass, TenantObserver.class)) {
            MethodIntrospector.selectMethods(
                            targetClass,
                            (MethodIntrospector.MetadataLookup<TenantObserver>) method ->
                                    AnnotatedElementUtils.findMergedAnnotation(method, TenantObserver.class))
                    .forEach((method, tenantObserver) -> subscribeObserver(bean, method, tenantObserver));
        }

        return bean;
    }

    private void subscribeObserver(Object bean, Method method, TenantObserver tenantObserver) {
        Type[] parameterTypes = method.getGenericParameterTypes();
        if (parameterTypes.length > 1) {
            throw new IllegalStateException(method + "方法只能有一个参数");
        }
        Type payloadType = parameterTypes[0];
        client.subscribe(tenantObserver.value(), new MessageHandler() {
            @Override
            public Type getType() {
                return payloadType;
            }

            @Override
            public void handle(Object payload) throws Exception {
                try {
                    if (method.getParameterCount() == 0) {
                        method.invoke(bean);
                    } else {
                        method.invoke(bean, payload);
                    }
                } catch (InvocationTargetException e) {
                    Throwable target = e.getTargetException();
                    if (target instanceof Exception) {
                        throw (Exception) target;
                    }
                    throw e;
                }
            }
        });
    }
}

会再 bean 初始化前,执行扫描 @TenantObserver 注解,替换成代理类,进行方法增强,实现路径的绑定

生产者

发送消息 messagingTemplate

生产者需要生产的消息的时候,通过 messagingTemplate 进行 convertAndSend 进行消息的发送到客户端绑定的路径

 public void send(String path, Object payload) {
        messagingTemplate.convertAndSend(WebSocketConstants.TOPIC_PREFIX + path, payload);
    }

WebSocketConstants.xx 路径需要和客户端 subscribe 路径绑定 key 路径保持一致

业务隔离

 public void syncxxxx(UserDto dto, Optional<UserDto> oldDtoOp, String creator) {
        if (Objects.equals(dto.getUsername(), oldDtoOp.map(UserDto::getUsername).orElse(null))) {
            return;
        }
        List<Long> projectCodes = projectRepository.findCodeListByTenantName(dto.getTenant());
        if (CollUtil.isEmpty(projectCodes)) {
            return;
        }
        oldDtoOp.map(UserDto::getUsername).ifPresent(projectRepository::deleteByUserName);

        List<ProjectMemberDto> inserList = projectCodes
                .stream()
                .map(code -> ProjectMemberDto.valueOf(code, dto.getUsername(), ProjectPermissionType.ADMIN.getCode(), creator))
                .collect(Collectors.toList());
        saveBatchOrUpdateProjectMember(inserList);

        //发送send事件
        eventPublisher.publishEvent(new ProjectMemberEvent(this, inserList,
                oldDtoOp.map(UserDto::getUsername).map(Set::of).orElseGet(Collections::emptySet), projectCodes));
    }
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
@Async(SYNC_DATA_EXECUTOR)
    public void send(ProjectMemberEvent event) {
       //进行发送数据
        messagingTemplate.convertAndSend(xxxxxx)
    }

使用线程池天然的线程隔离技术和 Spring 事件 进行业务数据和发送消息解耦,实现 mq 消息队列的业务解耦功能

因为使用了多线程,所以 Spring 事务注解无法生效,这里使用了 @TransactionalEventListener 注解,当事务提交后才会触发

生产者消息持久性

在生产者发送数据之前,会将当前的数据产生快照信息,进行数据库入库,增加一个是否被消费的标识,客户端消费后返回一个 confirm 和当前事务 id 进行确认机制,将当前消息删除 (短暂的数据不一致性,后续有全量同步补偿机制)

消费者向生产者全量拉取数据

  • 程序容器启动会主动拉取一次
  • 定时调度会拉取一次
 @EventListener
    public void onTenantConnected(TenantConnectedEvent ignore) {
        if (isFirst.compareAndSet(true, false)) {
            try {
                executorService.submit(() -> beginSendPullRequest());
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }
    }

    private void beginSendPullRequest() throws DisconnectException {
        tenantClient.send(PULL_REQUEST, "");
        tenantClient.send(PULL_PROJECT_REQUEST, "");
        tenantClient.send(PULL_QUEUE_REQUEST, "");
        tenantClient.send(PULL_PROJECT_MEMBER_REQUEST, "");
    }

    @Scheduled(cron = "${sync-data-cron}")
    public void schedulePullData() {
        if (enable && tenantClient.isConnected()) {
            try {
                beginSendPullRequest();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }
    }

补偿机制

  • 增量定时拉取数据
  • 日志监控

使用 AOP 注解收集日志上报

@Aspect
@Component
@RequiredArgsConstructor
public class MessageMappingAspect {
    private static final Logger logger = LoggerFactory.getLogger(MessageMappingAspect.class);
    private final DockedClientDomainService dockedClientDomainService;

    @Around("@annotation(org.springframework.messaging.handler.annotation.MessageMapping)")
    public Object handle(ProceedingJoinPoint point) throws Throwable {
        String errorMessage = null;
        try {
            return point.proceed();
        } catch (Exception e) {
            errorMessage = e.getMessage();
            throw e;
        } finally {
            WebSocketMessageHolder message = WebSocketMessageHolder.get().orElse(null);
            try {
                if (message != null && message.getDestination() != null &&
                        !message.getDestination().endsWith(TenantPaths.CLIENT_LOG)) {
                    addLog(message, errorMessage);
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }
    }

    private void addLog(WebSocketMessageHolder message, String errorMessage) {
        var event = new LogEvent();
        var path = CharSequenceUtil.removePrefix(message.getDestination(), WebSocketConstants.APPLICATION_DESTINATION_PREFIX);
        String content;
        if (errorMessage == null) {
            content = String.format("发送消息[%s]成功", path);
        } else {
            content = String.format("发送消息[%s]失败: %s. payload: %s", path, errorMessage, getPayload(message));
        }
        event.setContent(content);
        event.setSuccess(errorMessage == null);
        event.setCreateTime(LocalDateTime.now());
        dockedClientDomainService.addLog(message.getUser().getName(), event);
    }

    private static Object getPayload(WebSocketMessageHolder message) {
        if (message.getPayload() instanceof byte[] bytes) {
            return new String(bytes);
        }
        return message.getPayload();
    }
}
  • 客户端连接监控

结论

通过这个伪消息队列,可以解决我们现在的需求,但是没法跟真正的 mq 消息队列相比,也无法做到数据实时的一致性,只能是数据的最终一致性加上补偿机制(增量 + 全量),无法做到消息队列的系统层次的解耦和流量控制

**优点也有,不需要引入其他的中间,只需要 spring+ 线程池就可以实现,运维成本减少,有利有弊,**根据业务选型技术


标题:spring的STOMP落地实践
作者:xiaohugg
地址:https://xiaohugg.top/articles/2024/01/05/1704440301945.html

人民有信仰 民族有希望 国家有力量