背景
公司有个主系统 A,这个系统将来用途作为 SaaS 服务系统,设计尽可能保持纯粹,不要引入过多的中间件技术
其下有 B,C 甚至未来还有多个子系统,现在的需求是需要当子系统 B,C 开启了某个配置,需要对接 A 系统,A 系统能够提供 B,C 系统的能力赋能,当 BC 断开配置,B,C 又是个纯粹可以单独运行的系统,某个系统不共用同一套数据,拥有自己的环境和数据,当子系统开启了配置,相关接口功能需要限制,由主系统提供能力,断开配置,A.B,C 三个系统能够独立运行
功能拆分
主系统:
对主系统相关的增量和全量接口数据进行同步传输:这里有两个关键点
- 有许多增量接口需要进行同步传输&&传输数据的逻辑不能影响原先的业务逻辑(通过 AOP 注解)
- 需要找到一个类型消息队列 mq 那种形势进行数据同步,A 系统数据发生改动子系统能够感知
但是这个系统不能引入过多的中间件,最终经过调研,选择使用了 Spring 的 STOMP 和 Spring event 事件实现一种伪消息队列
子系统:
- 子系统相关接口限制:通过 AOP 接口限制
- 子系统需要接收主系统的全量和增量数据 (STOMP+AOP+SDK)
最终将功能细节拆分后,对于这个需求设计选型最终采用:
Spring STOMP+ Spring EVENT+AOP+SDK+ 线程池进行业务线程隔离
时序图
- 全量数据
- 增量数据同步
设计伪消息队列
客户端
TenantClient
用来与 A 系统进行对接的客户端
send 方法
通过这方法客户端也就是目前的 B,C 系统可以主动向 A 系统发送消息
- path: 发送的接口路径
- payload: 发送的消息体
- stompSession:Spring stomp 的与服务端的连接对象
public void send(String path, Object payload) throws DisconnectException {
if (stompSession == null) {
throw new DisconnectException();
} else {
stompSession.send(APPLICATION_DESTINATION_PREFIX + path, payload);
}
}
subscribe 方法
进行客户端订阅事件,与服务端进行绑定
/**
* 订阅指定路径的事件,可以多次订阅
*
* @param path 订阅的路径
* @param observer 消费者
* @param <T> 消息类型
*/
@SuppressWarnings("unchecked")
public <T> void subscribe(String path, Observer<T> observer) {
subscribe(path, new MessageHandler() {
@Override
public Type getType() {
return observer.getType();
}
@Override
public void handle(Object payload) {
observer.handle((T) payload);
}
});
}
/**
* 订阅指定路径的事件,可以多次订阅
*
* @param path 订阅的路径
* @param handler 消费者
*/
public void subscribe(String path, MessageHandler handler) {
subscriptions.compute(path, (key, oldValue) -> {
List<TenantSubscription> value = oldValue == null ? new CopyOnWriteArrayList<>() : oldValue;
value.add(new TenantSubscription(handler));
return value;
});
}
private final ConcurrentMap<String, List<TenantSubscription>> subscriptions = new ConcurrentHashMap<>();
通过一个 ConcurrentMap 容器存储 key:客户端接口路径 value:消费者列表 支持一个路径对应多个客户端,防止读写并发 使用 CopyOnWriteArrayList
start 方法:
容器启动使用使用 Spring 事件 调用 start 方法
@EventListener
public void onAppStarted(ApplicationStartedEvent ignored) {
tenantClient().start();
}
destory 方法:
容器销毁执行
@PreDestroy
public void destroy() {
tenantClient().stop();
}
public class TenantClient {
private static final Logger logger = LoggerFactory.getLogger(TenantClient.class);
public static final String APPLICATION_DESTINATION_PREFIX = "/app";
public static final String TOPIC_PREFIX = "/topic";
public static final String DATE_TIME_FORMATTER_PATTERN = "yyyy-MM-dd HH🇲🇲ss";
/** 配置信息 */
private final TenantConfig config;
/** STOMP客户端 */
private final StompClient client;
/** 订阅映射: 订阅路径 -> 消费者列表 */
private final ConcurrentMap<String, List<TenantSubscription>> subscriptions = new ConcurrentHashMap<>();
private Runnable onConnectedListener;
/** 是否已开启 */
private final AtomicBoolean started = new AtomicBoolean();
/** 当前与服务端的连接 */
private StompSession stompSession;
/** LDAP数据源 */
private BaseLdapPathContextSource contextSource;
private ObjectMapper objectMapper = createObjectMapper();
public TenantClient(TenantConfig config) {
this.config = Objects.requireNonNull(config);
String url = config.getBaseUrl() + config.getPath();
this.client = new StompClient(url, new TenantStompSessionHandler(), config)
.setReconnectInterval(config.getReconnectInterval());
setObjectMapper(objectMapper);
}
/**
* 配置用于传输数据的JSON序列化对象
*
* @param objectMapper objectMapper
*/
public void setObjectMapper(ObjectMapper objectMapper) {
this.objectMapper = Objects.requireNonNull(objectMapper);
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setObjectMapper(objectMapper);
this.client.setMessageConverter(converter);
}
/**
* 配置LDAP数据源
*
* @param contextSource LDAP数据源
*/
public void setContextSource(BaseLdapPathContextSource contextSource) {
this.contextSource = contextSource;
}
/**
* 配置任务调度器,用于心跳检测
*
* @param taskScheduler 任务调度器
*/
public void setTaskScheduler(TaskScheduler taskScheduler) {
this.client.setTaskScheduler(taskScheduler);
}
/**
* 开始连接
*
* @return 连接结果Future
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public ListenableFuture<Object> start() {
if (!started.compareAndSet(false, true)) {
throw new IllegalStateException("不能重复启动");
}
return (ListenableFuture) client.connect();
}
/**
* 关闭客户端,断开连接
*/
public void stop() {
if (started.compareAndSet(true, false)) {
if (stompSession != null) {
stompSession.disconnect();
}
stompSession = null;
client.stop();
}
}
/**
* 订阅指定路径的事件,可以多次订阅
*
* @param path 订阅的路径
* @param observer 消费者
* @param <T> 消息类型
*/
@SuppressWarnings("unchecked")
public <T> void subscribe(String path, Observer<T> observer) {
subscribe(path, new MessageHandler() {
@Override
public Type getType() {
return observer.getType();
}
@Override
public void handle(Object payload) {
observer.handle((T) payload);
}
});
}
/**
* 订阅指定路径的事件,可以多次订阅
*
* @param path 订阅的路径
* @param handler 消费者
*/
public void subscribe(String path, MessageHandler handler) {
subscriptions.compute(path, (key, oldValue) -> {
List<TenantSubscription> value = oldValue == null ? new CopyOnWriteArrayList<>() : oldValue;
value.add(new TenantSubscription(handler));
return value;
});
}
/**
* 取消指定路径的订阅
*
* @param path 取消订阅的路径
* @return 是否有消费者被取消
*/
public boolean unsubscribe(String path) {
List<TenantSubscription> list = subscriptions.remove(path);
if (list == null) return false;
for (TenantSubscription it : list) {
if (it.getSubscription() != null) {
it.getSubscription().unsubscribe();
}
}
return true;
}
/**
* 发送消息到租户融合中心
*
* @param path 消息路径
* @param payload 消息内容
* @throws DisconnectException 如果未连接到客户端
*/
public void send(String path, Object payload) throws DisconnectException {
if (stompSession == null) {
throw new DisconnectException();
} else {
stompSession.send(APPLICATION_DESTINATION_PREFIX + path, payload);
}
}
/**
* 用户身份认证
*
* @param username 用户名
* @param password 密码
* @return true为认证通过,false为用户名或密码不正确
*/
public boolean auth(String username, String password) {
try {
Name dn = contextSource.getBaseLdapName()
.add(config.getUserBase())
.add(MessageFormat.format(config.getUserRdn(), username));
contextSource.getContext(dn.toString(), password);
return true;
} catch (NamingException ex) {
if ((ex instanceof org.springframework.ldap.AuthenticationException)
|| (ex instanceof org.springframework.ldap.OperationNotSupportedException)) {
return false;
} else {
throw ex;
}
} catch (InvalidNameException e) {
throw new IllegalArgumentException(e);
}
}
public void onConnected(Runnable onConnectedListener) {
this.onConnectedListener = onConnectedListener;
}
public boolean isConnected() {
return stompSession != null;
}
private static ObjectMapper createObjectMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
mapper.disable(SerializationFeature.WRITE_DATE_TIMESTAMPS_AS_NANOSECONDS);
// 设置默认日期格式
mapper.setDateFormat(new SimpleDateFormat(DATE_TIME_FORMATTER_PATTERN));
mapper.registerModule(new JavaTimeModule());
// 修改LocalDateTime的默认格式
mapper.registerModule(createTimeModule());
return mapper;
}
private static SimpleModule createTimeModule() {
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(DATE_TIME_FORMATTER_PATTERN);
SimpleModule timeModule = new SimpleModule();
timeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(dateTimeFormatter));
timeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(dateTimeFormatter));
return timeModule;
}
private class TenantStompSessionHandler extends StompSessionHandlerAdapter {
@Override
public void afterConnected(@NonNull StompSession session, @NonNull StompHeaders connectedHeaders) {
logger.info("与租户融合中心建立连接: {}", config.getBaseUrl());
stompSession = session;
subscriptions.forEach((path, list) -> {
for (TenantSubscription subscription : list) {
registerSubscription(session, path, subscription);
}
});
if (onConnectedListener != null) {
try {
onConnectedListener.run();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
private void registerSubscription(StompSession session, String path, TenantSubscription subscription) {
StompSession.Subscription sub = session.subscribe(TOPIC_PREFIX + path, new StompFrameHandler() {
@Override
public @NonNull Type getPayloadType(@NonNull StompHeaders headers) {
Type type = subscription.getHandler().getType();
return type instanceof ParameterizedType ? byte[].class : type;
}
@Override
public void handleFrame(@NonNull StompHeaders headers, Object payload) {
String path = getPath(headers);
String errorMessage = null;
try {
Type type = subscription.getHandler().getType();
if (type instanceof ParameterizedType) {
// 处理泛型参数
payload = objectMapper.readValue((
byte[]) payload, TypeFactory.defaultInstance().constructType(type));
}
subscription.getHandler().handle(payload);
} catch (Exception e) {
logger.error("处理消息[{}]出错. payload: {}, error: {}",
path, payload, e.getMessage(), e);
errorMessage = e.getMessage();
} finally {
sendClientLog(path, payload, errorMessage);
}
}
});
subscription.setSubscription(sub);
}
@Override
public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {
logger.error("handleException: {}", exception.getMessage(), exception);
}
@Override
public void handleTransportError(StompSession session, Throwable exception) {
if (exception instanceof ConnectionLostException) {
stompSession = null;
}
logger.error("handleTransportError: {}", exception.getMessage(), exception);
}
private String getPath(StompHeaders headers) {
String destination = Objects.requireNonNull(headers.getDestination());
return destination.startsWith(TOPIC_PREFIX) ? destination.substring(TOPIC_PREFIX.length()) : destination;
}
private void sendClientLog(String path, Object payload, String errorMessage) {
try {
LogEvent logEvent = new LogEvent();
if (errorMessage == null) {
logEvent.setContent(String.format("处理消息[%s]成功", path));
} else {
logEvent.setContent(String.format("处理消息[%s]失败: %s. payload: %s", path, errorMessage, payload));
}
logEvent.setSuccess(errorMessage == null);
logEvent.setCreateTime(LocalDateTime.now());
send(TenantPaths.CLIENT_LOG, logEvent);
} catch (Exception e) {
logger.error("发送调用日志出错: {}", e.getMessage(), e);
}
}
}
@RequiredArgsConstructor
@Getter
@Setter
private static class TenantSubscription {
private final MessageHandler handler;
private StompSession.Subscription subscription;
}
}
使用 @ConditionalOnProperty 注解,只有当配置文件开关开启,才会加载这个配置类,才会初始化 TenantClient 这个 Bean
@ConditionalOnProperty(value = "xxxx.enable")
@ TenantObserver 注解
客户端方法添加该接口,可以接收到生产者订阅增量接口发送的数据
@TenantObserver(PULL_PROJECT_MEMBER_REQUEST)
public void pullSyncProjectMemberData(List<ProjectMemberDto> syncProjectMemberDtoList) {
Map<Integer, List<UserProjectPermission>> existingProjectMembersMap = projectPermissionService.findAll()
.stream()
.collect(Collectors.groupingBy(UserProjectPermission::getUserId));
Map<String, List<ProjectMemberDto>> incomingUsersMap = syncProjectMemberDtoList
.stream()
.collect(Collectors.groupingBy(ProjectMemberDto::getUsername));
Map<String, Integer> idAndNameUserMap = userService2.findByUsernames(incomingUsersMap.keySet())
.stream()
.collect(Collectors.toMap(User::getUserName, User::getId));
Map<Integer, List<ProjectMemberDto>> idIncomingUsersMap = incomingUsersMap.entrySet()
.stream()
.filter(entry -> idAndNameUserMap.containsKey(entry.getKey()))
.collect(Collectors.toMap(
entry -> idAndNameUserMap.get(entry.getKey()),
Map.Entry::getValue,
(existing, replacement) -> replacement));
CompletableFuture.runAsync(() -> processNewAndUpdatedPermissions(idIncomingUsersMap));
processDeletePermission(existingProjectMembersMap, idIncomingUsersMap.keySet());
}
消费者接口幂等性
因为生产者可能迟迟等不到消费者的确认 confirm 反馈,会不断重试轮询消息表,消费者接口需要保证数据的一致性,会将当前消费的消息存储起来,数据库存在该消息,则直接返回 confirm 确认
消费者成功消费消息后,callBack 回调 tenant.send 方法给生产者,返回当前事务 id 和 confirm 确认机制
消费者如何绑定 subscribe 方法呢?
客户端订阅接口需要绑定发送,利用 AOP 注解或者实现 BeanPostProcessor 这个 Spring bean 后置处理器
@RequiredArgsConstructor
public class TenantObserverBeanProcessor implements BeanPostProcessor {
private final TenantClient client;
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
if (AnnotationUtils.isCandidateClass(targetClass, TenantObserver.class)) {
MethodIntrospector.selectMethods(
targetClass,
(MethodIntrospector.MetadataLookup<TenantObserver>) method ->
AnnotatedElementUtils.findMergedAnnotation(method, TenantObserver.class))
.forEach((method, tenantObserver) -> subscribeObserver(bean, method, tenantObserver));
}
return bean;
}
private void subscribeObserver(Object bean, Method method, TenantObserver tenantObserver) {
Type[] parameterTypes = method.getGenericParameterTypes();
if (parameterTypes.length > 1) {
throw new IllegalStateException(method + "方法只能有一个参数");
}
Type payloadType = parameterTypes[0];
client.subscribe(tenantObserver.value(), new MessageHandler() {
@Override
public Type getType() {
return payloadType;
}
@Override
public void handle(Object payload) throws Exception {
try {
if (method.getParameterCount() == 0) {
method.invoke(bean);
} else {
method.invoke(bean, payload);
}
} catch (InvocationTargetException e) {
Throwable target = e.getTargetException();
if (target instanceof Exception) {
throw (Exception) target;
}
throw e;
}
}
});
}
}
会再 bean 初始化前,执行扫描 @TenantObserver 注解,替换成代理类,进行方法增强,实现路径的绑定
生产者
发送消息 messagingTemplate
生产者需要生产的消息的时候,通过 messagingTemplate 进行 convertAndSend 进行消息的发送到客户端绑定的路径
public void send(String path, Object payload) {
messagingTemplate.convertAndSend(WebSocketConstants.TOPIC_PREFIX + path, payload);
}
WebSocketConstants.xx 路径需要和客户端 subscribe 路径绑定 key 路径保持一致
业务隔离
public void syncxxxx(UserDto dto, Optional<UserDto> oldDtoOp, String creator) {
if (Objects.equals(dto.getUsername(), oldDtoOp.map(UserDto::getUsername).orElse(null))) {
return;
}
List<Long> projectCodes = projectRepository.findCodeListByTenantName(dto.getTenant());
if (CollUtil.isEmpty(projectCodes)) {
return;
}
oldDtoOp.map(UserDto::getUsername).ifPresent(projectRepository::deleteByUserName);
List<ProjectMemberDto> inserList = projectCodes
.stream()
.map(code -> ProjectMemberDto.valueOf(code, dto.getUsername(), ProjectPermissionType.ADMIN.getCode(), creator))
.collect(Collectors.toList());
saveBatchOrUpdateProjectMember(inserList);
//发送send事件
eventPublisher.publishEvent(new ProjectMemberEvent(this, inserList,
oldDtoOp.map(UserDto::getUsername).map(Set::of).orElseGet(Collections::emptySet), projectCodes));
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
@Async(SYNC_DATA_EXECUTOR)
public void send(ProjectMemberEvent event) {
//进行发送数据
messagingTemplate.convertAndSend(xxxxxx)
}
使用线程池天然的线程隔离技术和 Spring 事件 进行业务数据和发送消息解耦,实现 mq 消息队列的业务解耦功能
因为使用了多线程,所以 Spring 事务注解无法生效,这里使用了 @TransactionalEventListener 注解,当事务提交后才会触发
生产者消息持久性
在生产者发送数据之前,会将当前的数据产生快照信息,进行数据库入库,增加一个是否被消费的标识,客户端消费后返回一个 confirm 和当前事务 id 进行确认机制,将当前消息删除 (短暂的数据不一致性,后续有全量同步补偿机制)
消费者向生产者全量拉取数据
- 程序容器启动会主动拉取一次
- 定时调度会拉取一次
@EventListener
public void onTenantConnected(TenantConnectedEvent ignore) {
if (isFirst.compareAndSet(true, false)) {
try {
executorService.submit(() -> beginSendPullRequest());
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
private void beginSendPullRequest() throws DisconnectException {
tenantClient.send(PULL_REQUEST, "");
tenantClient.send(PULL_PROJECT_REQUEST, "");
tenantClient.send(PULL_QUEUE_REQUEST, "");
tenantClient.send(PULL_PROJECT_MEMBER_REQUEST, "");
}
@Scheduled(cron = "${sync-data-cron}")
public void schedulePullData() {
if (enable && tenantClient.isConnected()) {
try {
beginSendPullRequest();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
补偿机制
- 增量定时拉取数据
- 日志监控
使用 AOP 注解收集日志上报
@Aspect
@Component
@RequiredArgsConstructor
public class MessageMappingAspect {
private static final Logger logger = LoggerFactory.getLogger(MessageMappingAspect.class);
private final DockedClientDomainService dockedClientDomainService;
@Around("@annotation(org.springframework.messaging.handler.annotation.MessageMapping)")
public Object handle(ProceedingJoinPoint point) throws Throwable {
String errorMessage = null;
try {
return point.proceed();
} catch (Exception e) {
errorMessage = e.getMessage();
throw e;
} finally {
WebSocketMessageHolder message = WebSocketMessageHolder.get().orElse(null);
try {
if (message != null && message.getDestination() != null &&
!message.getDestination().endsWith(TenantPaths.CLIENT_LOG)) {
addLog(message, errorMessage);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
private void addLog(WebSocketMessageHolder message, String errorMessage) {
var event = new LogEvent();
var path = CharSequenceUtil.removePrefix(message.getDestination(), WebSocketConstants.APPLICATION_DESTINATION_PREFIX);
String content;
if (errorMessage == null) {
content = String.format("发送消息[%s]成功", path);
} else {
content = String.format("发送消息[%s]失败: %s. payload: %s", path, errorMessage, getPayload(message));
}
event.setContent(content);
event.setSuccess(errorMessage == null);
event.setCreateTime(LocalDateTime.now());
dockedClientDomainService.addLog(message.getUser().getName(), event);
}
private static Object getPayload(WebSocketMessageHolder message) {
if (message.getPayload() instanceof byte[] bytes) {
return new String(bytes);
}
return message.getPayload();
}
}
- 客户端连接监控
结论
通过这个伪消息队列,可以解决我们现在的需求,但是没法跟真正的 mq 消息队列相比,也无法做到数据实时的一致性,只能是数据的最终一致性加上补偿机制(增量 + 全量),无法做到消息队列的系统层次的解耦和流量控制
**优点也有,不需要引入其他的中间,只需要 spring+ 线程池就可以实现,运维成本减少,有利有弊,**根据业务选型技术