springboot应用在Eureka安全下线

本章简单介绍了springboot应用如何在Eureka集群中安全下线。
如有错误及不同见解欢迎与我沟通。

问题现象

公司所采用的是springboot应用,并且使用Eureka做为注册中心,Ribbon做负载均衡,标准的Netflix体系。现有A服务需要调用B服务,在B服务其中某一台停
止时(如kill -9操作),A服务在一段时间内还是会向已停止的服务发送请求,导致服务异常。

问题原因分析

了解Eureka的朋友都知道,Eureka客户端是定时向Eureka服务端发送请求获得服务列表信息的,并且依靠心跳机制维持服务在Eureka服务端的状态。所以当
Eureka中某一台服务下线时,需Eureka服务端根据心跳机制剔除该下线的服务(如果Eureka服务端触发了自我保护机制服务还会剔除不掉),然后Eureka客
户端才能拉取到最新的服务列表。所以在B服务下线时,Eureka服务端还没将服务剔除又或者是Eureka客户端还未拉取最新的服务列表,请求都是会有可能打
到已下线的服务。

解决思路

1.切换为其它注册中心。例如zookeeper、consul等。zookeeper的观察者模式机制应该可以达到实时更新服务列表,zookeeper的缺点也很明显,不过本公司的
业务量使用zookeeper应该也不会有什么问题。对于consul目前本人还没有什么研究,暂不讨论。由于考虑到换成其它注册中心可能会出现新的问题且需要学习
时间,所以没有选择该方法。各注册中心的区别网上很多博客大家可以自己去看一看。
2.调整Eureka相关配置,例如:eureka.instance.lease-expiration-duration-in-seconds、eureka.client.initial-instance-info-replication-interval-seconds
等配置缩短服务发现、服务剔除等时间。该方法无法根治只能降低错误时间。且这里默认的时间间隔配置不推荐调整,开放时可以这么做。
3.开启ribbon的重试机制,开启后如果访问到错误的服务器,会进入重试机制,当请求访问到正确的服务后就可以了。由于底层服务接口
没有做到等幂所以重试时会产生重复数据,而且还是会有段时间有概率出错,故没有使用该方法。另外由于springcloud版本不同,ribbon负载均衡策略会有不同,
Brixton版本默认为ZoneAvoidanceRule,这导致我们服务挂掉后请求会一直打到挂掉的那台服务而不是时好时坏。
4.最后就是我们目前使用的方法,服务停机后主动通知Eureka服务端我要下线了,然后再通知其它服务去拉取最新的服务列表。

解决方法

如何去通知Eureka服务端我下线了

这里我们可以调用com.netflix.discovery.DiscoveryClient的shutdown()方法,以下是shutdown方法源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* Shuts down Eureka Client. Also sends a deregistration request to the
* eureka server.(关闭Eureka客户。,还将注销请求发送到eureka服务器)
*/
@PreDestroy
@Override
public synchronized void shutdown() {
if (isShutdown.compareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ...");

if (statusChangeListener != null && applicationInfoManager != null) {
applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}
// 停止所有定时任务,即心跳、缓存刷新等任务停止后将不在向Eureka服务端注册。
cancelScheduledTasks();

// If APPINFO was registered
// 如果是注册的话,通知下线
if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka()) {
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
unregister();
}

if (eurekaTransport != null) {
eurekaTransport.shutdown();
}

heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();

logger.info("Completed shut down of DiscoveryClient");
}
}

另外该由@PreDestroy注解标记,所以关闭spring容器时也会走该方法。

1
2
DiscoveryClient discoveryClient = DiscoveryManager.getInstance().getDiscoveryClient();
discoveryClient.shutdown();

可以通过DiscoveryManager获得DiscoveryClient实例再执行shutdown,该方法被标记为过时,当不影响使用。

如何主动去更新服务列表

由于服务已在Eureka服务端下线,所以其它客户端重新去拉取即可。Eureka客户端刷新注册表的方法为com.netflix.discovery.DiscoveryClient的refreshRegistry()方法
该方法不是public方法,故需要通过反射去强行调用。通常情况该方法都是内部定时调用的,经实践主动调用并不会有什么问题。

1
2
3
Method method = DiscoveryClient.class.getDeclaredMethod("refreshRegistry");
method.setAccessible(true);
method.invoke(DiscoveryManager.getInstance().getDiscoveryClient());

测试时发现这里拉取的服务列表还是有可能存在已经下线的服务,原因是Eureka服务端存在只读缓存。客户端通知下线后缓存没有更新,这里我直接粗暴的将该缓存关闭。

1
2
// eureka 服务端配置
eureka.server.use-read-only-response-cache=false

关闭后就能拉到的内容就为最新的数据了。

如何去通知Eureka上其它客户端我下线了

可以将该主动更新服务列表的方法对外提供一个接口,停机时在完成shutdown()方法后逐个去调用即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 远程调用eurekaRefresh接口
* @param discoveryClient
*/
private void restDoEurekaCacheRefresh(DiscoveryClient discoveryClient){
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
String path = environment.getProperty("management.context-path");
RestTemplate restTemplate = createRestTemplate();
Applications applications = discoveryClient.getApplications();
List<Application> registeredApplications = applications.getRegisteredApplications();
for (Application application : registeredApplications) {
List<InstanceInfo> instances = application.getInstances();
for (InstanceInfo instance : instances) {
if(isNotThisApplication(instance.getInstanceId())){
scheduledExecutorService.submit(() -> {
try {
JSONObject forObject = restTemplate.getForObject(instance.getHomePageUrl() + path +"/eurekaRefresh", JSONObject.class);
log.info(instance.getInstanceId() + " 调用成功!" + forObject.toString());
} catch (Exception e) {
log.error(instance.getInstanceId() + " 调用失败!");
}
});
}else{
log.error("本机跳过!" + instance.getInstanceId());
}
}
}
scheduledExecutorService.shutdown();
}

这里更推介mq广播的形式完成通知调用。

ribbon缓存更新

完成上述操作后,发现还是会调用到已下线服务。经排查发现是ribbon缓存没有更新导致。
ribbon内部存在ServerListUpdater且提供了EurekaNotificationServerListUpdater实现类,可以达到更新ribbon缓存的目的。
Eureka客户端执行refreshRegistry刷新注册表调用后会调用onCacheRefreshed发布事件由于Eureka实例使用的是CloudEurekaClient,
而CloudEurekaClient会覆盖DiscoveryClient的onCacheRefreshed方法,将事件改为ApplicationListener的事件,故监听ApplicationListener
事件使用反射再调用 DiscoveryClient 的 fireEvent(DiscoveryClient的onCacheRefreshed方法主要调用该方法)
方法发布 EurekaEventListener监听的RefreshedEvent 事件
ribbon 使用EurekaNotificationServerListUpdater后会注册EurekaEventListener监听事件。从而达到eureka缓存刷新同时通知ribbon刷新缓存.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Slf4j
@Component
public class EurekaCacheRefreshListener implements ApplicationListener<HeartbeatEvent> {

@Override
public void onApplicationEvent(HeartbeatEvent event) {
Object source = event.getSource();
CloudEurekaClient cloudEurekaClient = (CloudEurekaClient)source;
try {
Method method2 = DiscoveryClient.class.getDeclaredMethod("fireEvent", EurekaEvent.class);
method2.setAccessible(true);
method2.invoke(cloudEurekaClient, new CacheRefreshedEvent());
log.info("fireEvent 执行完毕");
} catch (Exception e) {
e.printStackTrace();
}
}
}

停机接口完善

springboot actuator为我们提供了停机接口(需配置开启)
这里我自定义了Endpoint增强自带的停机接口,在执行actuator停机接口前调用了discoveryClient.shutdown()及restDoEurekaCacheRefresh方法。
项目运维中停机调用的都是自定义的Endpoint。


最后如果对源码不是很了解可以在 芋道源码 了解学习