feat(gateway): Implement dynamic route synchronization and improve service registration handling

This commit is contained in:
sol 2025-07-01 12:24:12 +08:00
parent d5be623aa9
commit 62ebab50ec

View file

@ -5,10 +5,12 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.client.discovery.event.InstanceRegisteredEvent;
import org.springframework.cloud.gateway.event.RefreshRoutesEvent;
import org.springframework.cloud.gateway.handler.predicate.PredicateDefinition;
import org.springframework.cloud.gateway.route.RouteDefinition;
import org.springframework.cloud.gateway.route.RouteDefinitionLocator;
import org.springframework.cloud.gateway.route.RouteDefinitionWriter;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.EnableScheduling;
@ -18,9 +20,9 @@ import reactor.core.publisher.Mono;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@Configuration
@Slf4j
@ -29,61 +31,152 @@ import java.util.Map;
public class GatewayConfig {
private final RouteDefinitionWriter routeDefinitionWriter;
private final RouteDefinitionLocator routeDefinitionLocator;
private final DiscoveryClient discoveryClient;
private final ApplicationEventPublisher eventPublisher;
private final List<String> routeIdList = new ArrayList<>();
// 使用线程安全的Set来记录已注册的服务
private final Set<String> registeredServices = ConcurrentHashMap.newKeySet();
{
// 不需要注册自己 提前添加进来 GatewayConfigValues.SERVICE_NAME
routeIdList.add(GatewayConfigValues.SERVICE_NAME);
}
public Flux<RouteDefinition> getAllRoutes() {
return routeDefinitionLocator.getRouteDefinitions();
}
private boolean isRegistered = false;
@EventListener(InstanceRegisteredEvent.class)
public void onInstanceRegistered() {
isRegistered = true;
}
@Scheduled(cron = "0/5 * * * * ?")
public void customRouteLocator() {
if (!isRegistered) {
log.info("Gateway 尚未注册,跳过路由添加");
return;
}
for (String service : discoveryClient.getServices()) {
if (!routeIdList.contains(service)) {
routeIdList.add(service);
addRoute(service, "lb://" + service, "/" + service + "/**");
log.info("添加路由:{}", service);
}
}
// 预先添加网关自身服务避免自己注册自己
registeredServices.add(GatewayConfigValues.SERVICE_NAME);
}
/**
* 添加路由定义
*/
public void addRoute(String id, String uri, String path) {
try {
RouteDefinition definition = createRouteDefinition(id, uri, path);
routeDefinitionWriter.save(Mono.just(definition))
.doOnSuccess(unused -> {
log.info("成功添加路由id={}, uri={}, path={}", id, uri, path);
// 发布路由刷新事件
eventPublisher.publishEvent(new RefreshRoutesEvent(this));
})
.doOnError(error -> log.error("添加路由失败id={}, error={}", id, error.getMessage()))
.subscribe();
} catch (Exception e) {
log.error("创建路由定义失败id={}, uri={}, path={}, error={}", id, uri, path, e.getMessage());
}
}
/**
* 创建路由定义
*/
private RouteDefinition createRouteDefinition(String id, String uri, String path) throws URISyntaxException {
RouteDefinition definition = new RouteDefinition();
definition.setId(id);
try {
definition.setUri(new URI(uri));
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
definition.setUri(new URI(uri));
PredicateDefinition predicateDefinition = new PredicateDefinition();
predicateDefinition.setName("Path");
predicateDefinition.setArgs(Map.of("pattern", path));
definition.getPredicates().add(predicateDefinition);
routeDefinitionWriter.save(Mono.just(definition))
.doOnSuccess(unused -> log.info("路由 [{}] 添加成功path: [{}]", id, path))
.doOnError(error -> log.error("路由 [{}] 添加失败:{}", id, error.getMessage()))
.block();
return definition;
}
}
/**
* 获取所有路由定义
*/
public Flux<RouteDefinition> getAllRoutes() {
return routeDefinitionLocator.getRouteDefinitions();
}
/**
* 服务注册事件监听器
*/
@EventListener(InstanceRegisteredEvent.class)
public void onInstanceRegistered(InstanceRegisteredEvent event) {
log.debug("接收到服务注册事件:{}", event.getConfig());
syncRoutes();
}
/**
* 定时同步路由作为兜底机制
*/
@Scheduled(fixedDelay = 30000, initialDelay = 10000) // 30秒执行一次初始延迟10秒
public void scheduledSyncRoutes() {
log.debug("定时同步路由任务执行");
syncRoutes();
}
/**
* 同步路由逻辑
*/
private synchronized void syncRoutes() {
try {
Set<String> currentServices = Set.copyOf(discoveryClient.getServices());
log.debug("当前发现的服务:{}", currentServices);
for (String service : currentServices) {
if (!registeredServices.contains(service)) {
registerServiceRoute(service);
}
}
// 可选检查已注册的服务是否还存在如果不存在则移除路由
checkAndRemoveInactiveRoutes(currentServices);
} catch (Exception e) {
log.error("同步路由时发生异常", e);
}
}
/**
* 注册服务路由
*/
private void registerServiceRoute(String service) {
try {
String routeUri = "lb://" + service;
String routePath = "/" + service + "/**";
addRoute(service, routeUri, routePath);
registeredServices.add(service);
log.info("新增服务路由service={}, uri={}, path={}", service, routeUri, routePath);
} catch (Exception e) {
log.error("注册服务路由失败service={}, error={}", service, e.getMessage());
}
}
/**
* 检查并移除不活跃的路由可选功能
*/
private void checkAndRemoveInactiveRoutes(Set<String> currentServices) {
Set<String> inactiveServices = registeredServices.stream()
.filter(service -> !currentServices.contains(service))
.filter(service -> !GatewayConfigValues.SERVICE_NAME.equals(service))
.collect(java.util.stream.Collectors.toSet());
for (String inactiveService : inactiveServices) {
removeRoute(inactiveService);
registeredServices.remove(inactiveService);
log.info("移除不活跃服务路由:{}", inactiveService);
}
}
/**
* 移除路由
*/
private void removeRoute(String routeId) {
routeDefinitionWriter.delete(Mono.just(routeId))
.doOnSuccess(unused -> {
log.info("成功移除路由:{}", routeId);
eventPublisher.publishEvent(new RefreshRoutesEvent(this));
})
.doOnError(error -> log.error("移除路由失败routeId={}, error={}", routeId, error.getMessage()))
.subscribe();
}
/**
* 获取已注册的服务列表用于监控
*/
public Set<String> getRegisteredServices() {
return Set.copyOf(registeredServices);
}
}