From 62ebab50ec4be5270a17ad244fb22b3942de9cea Mon Sep 17 00:00:00 2001 From: sol Date: Tue, 1 Jul 2025 12:24:12 +0800 Subject: [PATCH] feat(gateway): Implement dynamic route synchronization and improve service registration handling --- .../bgasol/gateway/config/GatewayConfig.java | 179 +++++++++++++----- 1 file changed, 136 insertions(+), 43 deletions(-) diff --git a/cloud/gateway-9527/src/main/java/com/bgasol/gateway/config/GatewayConfig.java b/cloud/gateway-9527/src/main/java/com/bgasol/gateway/config/GatewayConfig.java index 84a77bc..e28e786 100644 --- a/cloud/gateway-9527/src/main/java/com/bgasol/gateway/config/GatewayConfig.java +++ b/cloud/gateway-9527/src/main/java/com/bgasol/gateway/config/GatewayConfig.java @@ -5,10 +5,12 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.cloud.client.discovery.event.InstanceRegisteredEvent; +import org.springframework.cloud.gateway.event.RefreshRoutesEvent; import org.springframework.cloud.gateway.handler.predicate.PredicateDefinition; import org.springframework.cloud.gateway.route.RouteDefinition; import org.springframework.cloud.gateway.route.RouteDefinitionLocator; import org.springframework.cloud.gateway.route.RouteDefinitionWriter; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Configuration; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.EnableScheduling; @@ -18,9 +20,9 @@ import reactor.core.publisher.Mono; import java.net.URI; import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; @Configuration @Slf4j @@ -29,61 +31,152 @@ import java.util.Map; public class GatewayConfig { private final RouteDefinitionWriter routeDefinitionWriter; - private final RouteDefinitionLocator routeDefinitionLocator; - private final DiscoveryClient discoveryClient; + private final ApplicationEventPublisher eventPublisher; - private final List routeIdList = new ArrayList<>(); + // 使用线程安全的Set来记录已注册的服务 + private final Set registeredServices = ConcurrentHashMap.newKeySet(); { - // 不需要注册自己 提前添加进来 GatewayConfigValues.SERVICE_NAME - routeIdList.add(GatewayConfigValues.SERVICE_NAME); - } - - public Flux getAllRoutes() { - return routeDefinitionLocator.getRouteDefinitions(); - } - - private boolean isRegistered = false; - - @EventListener(InstanceRegisteredEvent.class) - public void onInstanceRegistered() { - isRegistered = true; - } - - @Scheduled(cron = "0/5 * * * * ?") - public void customRouteLocator() { - if (!isRegistered) { - log.info("Gateway 尚未注册,跳过路由添加"); - return; - } - for (String service : discoveryClient.getServices()) { - if (!routeIdList.contains(service)) { - routeIdList.add(service); - addRoute(service, "lb://" + service, "/" + service + "/**"); - log.info("添加路由:{}", service); - } - } + // 预先添加网关自身服务,避免自己注册自己 + registeredServices.add(GatewayConfigValues.SERVICE_NAME); } + /** + * 添加路由定义 + */ public void addRoute(String id, String uri, String path) { + try { + RouteDefinition definition = createRouteDefinition(id, uri, path); + + routeDefinitionWriter.save(Mono.just(definition)) + .doOnSuccess(unused -> { + log.info("成功添加路由:id={}, uri={}, path={}", id, uri, path); + // 发布路由刷新事件 + eventPublisher.publishEvent(new RefreshRoutesEvent(this)); + }) + .doOnError(error -> log.error("添加路由失败:id={}, error={}", id, error.getMessage())) + .subscribe(); + + } catch (Exception e) { + log.error("创建路由定义失败:id={}, uri={}, path={}, error={}", id, uri, path, e.getMessage()); + } + } + + /** + * 创建路由定义 + */ + private RouteDefinition createRouteDefinition(String id, String uri, String path) throws URISyntaxException { RouteDefinition definition = new RouteDefinition(); definition.setId(id); - try { - definition.setUri(new URI(uri)); - } catch (URISyntaxException e) { - throw new RuntimeException(e); - } + definition.setUri(new URI(uri)); + PredicateDefinition predicateDefinition = new PredicateDefinition(); predicateDefinition.setName("Path"); predicateDefinition.setArgs(Map.of("pattern", path)); definition.getPredicates().add(predicateDefinition); - routeDefinitionWriter.save(Mono.just(definition)) - .doOnSuccess(unused -> log.info("路由 [{}] 添加成功,path: [{}]", id, path)) - .doOnError(error -> log.error("路由 [{}] 添加失败:{}", id, error.getMessage())) - .block(); + return definition; } -} + /** + * 获取所有路由定义 + */ + public Flux getAllRoutes() { + return routeDefinitionLocator.getRouteDefinitions(); + } + + /** + * 服务注册事件监听器 + */ + @EventListener(InstanceRegisteredEvent.class) + public void onInstanceRegistered(InstanceRegisteredEvent event) { + log.debug("接收到服务注册事件:{}", event.getConfig()); + syncRoutes(); + } + + /** + * 定时同步路由(作为兜底机制) + */ + @Scheduled(fixedDelay = 30000, initialDelay = 10000) // 30秒执行一次,初始延迟10秒 + public void scheduledSyncRoutes() { + log.debug("定时同步路由任务执行"); + syncRoutes(); + } + + /** + * 同步路由逻辑 + */ + private synchronized void syncRoutes() { + try { + Set currentServices = Set.copyOf(discoveryClient.getServices()); + log.debug("当前发现的服务:{}", currentServices); + + for (String service : currentServices) { + if (!registeredServices.contains(service)) { + registerServiceRoute(service); + } + } + + // 可选:检查已注册的服务是否还存在,如果不存在则移除路由 + checkAndRemoveInactiveRoutes(currentServices); + + } catch (Exception e) { + log.error("同步路由时发生异常", e); + } + } + + /** + * 注册服务路由 + */ + private void registerServiceRoute(String service) { + try { + String routeUri = "lb://" + service; + String routePath = "/" + service + "/**"; + + addRoute(service, routeUri, routePath); + registeredServices.add(service); + log.info("新增服务路由:service={}, uri={}, path={}", service, routeUri, routePath); + + } catch (Exception e) { + log.error("注册服务路由失败:service={}, error={}", service, e.getMessage()); + } + } + + /** + * 检查并移除不活跃的路由(可选功能) + */ + private void checkAndRemoveInactiveRoutes(Set currentServices) { + Set inactiveServices = registeredServices.stream() + .filter(service -> !currentServices.contains(service)) + .filter(service -> !GatewayConfigValues.SERVICE_NAME.equals(service)) + .collect(java.util.stream.Collectors.toSet()); + + for (String inactiveService : inactiveServices) { + removeRoute(inactiveService); + registeredServices.remove(inactiveService); + log.info("移除不活跃服务路由:{}", inactiveService); + } + } + + /** + * 移除路由 + */ + private void removeRoute(String routeId) { + routeDefinitionWriter.delete(Mono.just(routeId)) + .doOnSuccess(unused -> { + log.info("成功移除路由:{}", routeId); + eventPublisher.publishEvent(new RefreshRoutesEvent(this)); + }) + .doOnError(error -> log.error("移除路由失败:routeId={}, error={}", routeId, error.getMessage())) + .subscribe(); + } + + /** + * 获取已注册的服务列表(用于监控) + */ + public Set getRegisteredServices() { + return Set.copyOf(registeredServices); + } +} \ No newline at end of file