序
本文主要研究一下eureka client的backup-registry-impl属性
配置
配置说明
{ "sourceType": "org.springframework.cloud.netflix.eureka.EurekaClientConfigBean", "name": "eureka.client.fetch-remote-regions-registry", "description": "Comma separated list of regions for which the eureka registry information will be\n fetched. It is mandatory to define the availability zones for each of these regions\n as returned by availabilityZones. Failing to do so, will result in failure of\n discovery client startup.", "type": "java.lang.String" }复制代码
EurekaClientConfigBean
spring-cloud-netflix-eureka-client-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/eureka/EurekaClientConfigBean.java
@ConfigurationProperties(EurekaClientConfigBean.PREFIX)public class EurekaClientConfigBean implements EurekaClientConfig { //...... /** * Comma separated list of regions for which the eureka registry information will be * fetched. It is mandatory to define the availability zones for each of these regions * as returned by availabilityZones. Failing to do so, will result in failure of * discovery client startup. * */ private String fetchRemoteRegionsRegistry; //......}复制代码
配置实例
eureka: client: fetch-registry: true fetch-remote-regions-registry: east backup-registry-impl: com.example.BackupServiceRegistry复制代码
使用
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/DiscoveryClient.java
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, ProviderbackupRegistryProvider) { if (args != null) { this.healthCheckHandlerProvider = args.healthCheckHandlerProvider; this.healthCheckCallbackProvider = args.healthCheckCallbackProvider; this.eventListeners.addAll(args.getEventListeners()); this.preRegistrationHandler = args.preRegistrationHandler; } else { this.healthCheckCallbackProvider = null; this.healthCheckHandlerProvider = null; this.preRegistrationHandler = null; } this.applicationInfoManager = applicationInfoManager; InstanceInfo myInfo = applicationInfoManager.getInfo(); clientConfig = config; staticClientConfig = clientConfig; transportConfig = config.getTransportConfig(); instanceInfo = myInfo; if (myInfo != null) { appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId(); } else { logger.warn("Setting instanceInfo to a passed in null value"); } this.backupRegistryProvider = backupRegistryProvider; this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo); localRegionApps.set(new Applications()); fetchRegistryGeneration = new AtomicLong(0); remoteRegionsToFetch = new AtomicReference (clientConfig.fetchRegistryForRemoteRegions()); remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(",")); if (config.shouldFetchRegistry()) { this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } if (config.shouldRegisterWithEureka()) { this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } logger.info("Initializing Eureka in region {}", clientConfig.getRegion()); if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { logger.info("Client configured to neither register nor query for data."); scheduler = null; heartbeatExecutor = null; cacheRefreshExecutor = null; eurekaTransport = null; instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion()); // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); return; // no need to setup up an network tasks and we are done } try { // default size of 2 - 1 each for heartbeat and cacheRefresh scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue (), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue (), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff eurekaTransport = new EurekaTransport(); scheduleServerEndpointTask(eurekaTransport, args); AzToRegionMapper azToRegionMapper; if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig); } else { azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig); } if (null != remoteRegionsToFetch.get()) { azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(",")); } instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion()); } catch (Throwable e) { throw new RuntimeException("Failed to initialize DiscoveryClient!", e); } if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { fetchRegistryFromBackup(); } // call and execute the pre registration handler before all background tasks (inc registration) is started if (this.preRegistrationHandler != null) { this.preRegistrationHandler.beforeRegistration(); } if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) { try { if (!register() ) { throw new IllegalStateException("Registration error at startup. Invalid server response."); } } catch (Throwable th) { logger.error("Registration error at startup: {}", th.getMessage()); throw new IllegalStateException(th); } } // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch initScheduledTasks(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register timers", e); } // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); }复制代码
重点看下面这段
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { fetchRegistryFromBackup(); }复制代码
这里有个fetchRegistry判断
fetchRegistry
/** * Fetches the registry information. * ** This method tries to get only deltas after the first fetch unless there * is an issue in reconciling eureka server and client registry information. *
* * @param forceFullRegistryFetch Forces a full registry fetch. * * @return true if the registry was fetched */ private boolean fetchRegistry(boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { // If the delta is disabled or if it is the first time, get all // applications Applications applications = getApplications(); if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta()); logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress()); logger.info("Force full registry fetch : {}", forceFullRegistryFetch); logger.info("Application is null : {}", (applications == null)); logger.info("Registered Applications size is zero : {}", (applications.getRegisteredApplications().size() == 0)); logger.info("Application version is -1: {}", (applications.getVersion() == -1)); getAndStoreFullRegistry(); } else { getAndUpdateDelta(applications); } applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e); return false; } finally { if (tracer != null) { tracer.stop(); } } // Notify about cache refresh before updating the instance remote status onCacheRefreshed(); // Update remote status based on refreshed data held in the cache updateInstanceRemoteStatus(); // registry was fetched successfully, so return true return true; }复制代码
如果从指定的eureka server获取信息报异常,则返回false。由此可见,在指定eureka server不可用的时候,会返回false,然后使用backup来获取
fetchRegistryFromBackup
/** * Fetch the registry information from back up registry if all eureka server * urls are unreachable. */ private void fetchRegistryFromBackup() { try { @SuppressWarnings("deprecation") BackupRegistry backupRegistryInstance = newBackupRegistryInstance(); if (null == backupRegistryInstance) { // backward compatibility with the old protected method, in case it is being used. backupRegistryInstance = backupRegistryProvider.get(); } if (null != backupRegistryInstance) { Applications apps = null; if (isFetchingRemoteRegionRegistries()) { String remoteRegionsStr = remoteRegionsToFetch.get(); if (null != remoteRegionsStr) { apps = backupRegistryInstance.fetchRegistry(remoteRegionsStr.split(",")); } } else { apps = backupRegistryInstance.fetchRegistry(); } if (apps != null) { final Applications applications = this.filterAndShuffle(apps); applications.setAppsHashCode(applications.getReconcileHashCode()); localRegionApps.set(applications); logTotalInstances(); logger.info("Fetched registry successfully from the backup"); } } else { logger.warn("No backup registry instance defined & unable to find any discovery servers."); } } catch (Throwable e) { logger.warn("Cannot fetch applications from apps although backup registry was specified", e); } } private boolean isFetchingRemoteRegionRegistries() { return null != remoteRegionsToFetch.get(); } /** * @deprecated Use injection to provide {@link BackupRegistry} implementation. */ @Deprecated @Nullable protected BackupRegistry newBackupRegistryInstance() throws ClassNotFoundException, IllegalAccessException, InstantiationException { return null; }复制代码
- 这里可以看到,如果配置文件指定了eureka.client.backup-registry-impl则这里backupRegistryInstance不为null。
- 这里还判断,如果eureka.client.fetch-remote-regions-registry,如果有指定则调用backupRegistryInstance.fetchRegistry(remoteRegionsStr.split(",")),没有指定则调用backupRegistryInstance.fetchRegistry()
- 最后将backup获取的应用信息填充到localRegionApps本地注册信息表里头
backup-registry-impl实例
public class BackupServiceRegistry implements BackupRegistry { private static final Logger LOGGER = LoggerFactory.getLogger(BackupServiceRegistry.class); private MapremoteRegionVsApps = new HashMap (); private Applications localRegionApps; @Override public Applications fetchRegistry() { LOGGER.info("fetchRegistry() called"); if (null == localRegionApps) { return new Applications(); } return localRegionApps; } @Override public Applications fetchRegistry(String[] includeRemoteRegions) { LOGGER.info("fetchRegistry(String[] includeRemoteRegions) called"); Applications toReturn = new Applications(); for (Application application : localRegionApps.getRegisteredApplications()) { toReturn.addApplication(application); } for (String region : includeRemoteRegions) { Applications applications = remoteRegionVsApps.get(region); if (null != applications) { for (Application application : applications.getRegisteredApplications()) { toReturn.addApplication(application); } } } return toReturn; } public Applications getLocalRegionApps() { return localRegionApps; } public Map getRemoteRegionVsApps() { return remoteRegionVsApps; } public void setRemoteRegionVsApps(Map remoteRegionVsApps) { this.remoteRegionVsApps = remoteRegionVsApps; } public void setLocalRegionApps(Applications localRegionApps) { this.localRegionApps = localRegionApps; }}复制代码
小结
eureka client提供了一个fallback,允许在启动的时候,访问不到指定的eureka server的话,会fallback到指定的backup-registry,然后将backup-registry返回的应用信息填充到localRegionApps中,这样来避免client在启动时因为eureka server故障原因导致返回空的问题。