e42e9ec51b0070d1915074e0e5b8deaa.jpeg

背景

从昨日的两篇文章:SpringCloud源码:客户端分析(一)- SpringBootApplication注解类加载流程SpringCloud源码:客户端分析(二)- 客户端源码分析

我们理解了客户端的初始化,其实跟SpringBootApplication初始化机制息息相关,也和自动化配置类有关。

现在我们一起来分析下服务端的初始化流程,开始之前,我们先梳理下几个常用的框架注解。

@Import注解

作用:

  • 导入一个或多个Bean

  • 导入@Configuration类

  • 导入ImportSelector的实现类

  • 导入ImportBeanDefinitionRegistrar的实现类

使用前提:@Import必须要在@controller、@Service、@Component、@Configuration、@Repository修饰的类下,并且要在springboot扫描范围内,这些注解修饰的类默认是必须和springboot启动类同包才会被扫描到,当然也可以手动指定扫描包。

@EnableConfigurationProperties注解

后面跟着的,就是一个活多个配置类了。

参考:https://www.cnblogs.com/JavaYuYin/p/18060520

@ConditionalOnBean注解

跟@ConditionalOnMissingBean相反,@ConditionalOnBean注解是,如果有容器中有类,就注入备用类,如果没有类就不注入。

其他个性化加载配置

76e49a1d49013465fced3d377a4971ef.png

源码分析

@EnableEurekaServer注解:修饰启动类

@EnableEurekaServer
@SpringBootApplication
public class EurekaApplication {
   public static void main(String[] args) {
       SpringApplication.run(EurekaApplication.class, args);
   }
}

@EnableEurekaServer是一个启动类注解

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {


}

分析:

  • @Import(EurekaServerMarkerConfiguration.class):引入了 EurekaServerMarkerConfiguration资源类

  • EurekaServerMarkerConfiguration是一个标识配置类

EurekaServerMarkerConfiguration

@Configuration(proxyBeanMethods = false)
public class EurekaServerMarkerConfiguration {
   @Bean
   public Marker eurekaServerMarkerBean() {
      return new Marker();
   }


   class Marker {
   // 空类,没有任何定义
   }
}

分析:

  • @Configuration 和 @Bean 结合使用,添加了Marker对象到容器里

  • 而Marker是一个空类,它的作用如其名,只是起到标识作用;

留下问题:那么Marker类在哪里起作用呢?通过全局搜索,知道在EurekaServerAutoConfiguration 注册中心的自动配置类有匹配到。

EurekaServerAutoConfiguration:自动装配类

@Configuration(proxyBeanMethods = false)
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
      InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
    // ...
}

分析:

  • @ConditionalOnBean:容器存在EurekaServerMarkerConfiguration.Marker时,则进行 EurekaServerAutoConfiguration 的自动初始化装配

  • @EnableConfigurationProperties:加载特定的@Configuration类(EurekaDashboardProperties 和 InstanceRegistryProperties)

    • EurekaDashboardProperties:

    • InstanceRegistryProperties:

  • @Configuration + @Import:资源加载EurekaServerInitializerConfiguration配置类

  • @PropertySource:读取资源配置文件

从EurekaServerAutoConfiguration,我们发现有两条初始化EurekaServer相关组件的路线:@Import资源加载 和 @Bean初始化。

两条初始化路线

  • @Import资源加载:加载EurekaServerInitializerConfiguration.class,并执行start方法

    • 执行eurekaServerBootstrap.contextInitialized

    • 新建了一个剔除无效服务任务,并给线程池周期性执行

  • @Bean初始化:新建DefaultEurekaServerContext,并在初始化bean之后,执行@PostConstruct修饰的initialize()方法:

    • scheduleRenewalThresholdUpdateTask(),新建了一个续约线程,并给线程池周期性执行

    • 新建了一个taskExecutor(单线程池),更新注册表线程updatePeerEurekaNodes,并给线程池周期性执行

    • peerEurekaNodes.start();

    • registry.init(peerEurekaNodes);

下面我将按照流程图来讲解两条路线的源码

cbfb816e5be17a5c3d6ceef12cb4a4f0.png


路线一:@Import资源加载

加载的资源是EurekaServerInitializerConfiguration。

初始化配置类的整体流程如下

EurekaServerInitializerConfiguration

->EurekaServerInitializerConfiguration#start

->EurekaServerBootstrap#contextInitialized

->EurekaServerBootstrap#initEurekaServerContext

->PeerAwareInstanceRegistryImpl#syncUp

->PeerAwareInstanceRegistryImpl#openForTraffic

->AbstractInstanceRegistry#postInit

1、EurekaServerInitializerConfiguration#start

由上面的资源加载@Import,可以知道实现了SmartLifecycle接口,即Lifecycle接口。

@Configuration(proxyBeanMethods = false)
public class EurekaServerInitializerConfiguration
      implements ServletContextAware, SmartLifecycle, Ordered {


    @Override
    public void start() {
       new Thread(() -> {
          try {
             eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
             log.info("Started Eureka Server");


             publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
             EurekaServerInitializerConfiguration.this.running = true;
             publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
          }
          catch (Exception ex) {
             // Help!
             log.error("Could not initialize Eureka servlet context", ex);
          }
       }).start();
    }
}

分析:

  • EurekaServerInitializerConfiguration的start()方法,是start启动方法

    • 该方法会在,新建线程里,被触发执行

2、EurekaServerBootstrap#contextInitialized

public void contextInitialized(ServletContext context) {
   try {
      initEurekaEnvironment();
      initEurekaServerContext();


      context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
   }
   catch (Throwable e) {
      log.error("Cannot bootstrap eureka server :", e);
      throw new RuntimeException("Cannot bootstrap eureka server :", e);
   }
}

分析:

  • initEurekaServerContext():

3、EurekaServerBootstrap#initEurekaServerContext

protected void initEurekaServerContext() throws Exception {
   // For backward compatibility
   JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
         XStream.PRIORITY_VERY_HIGH);
   XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
         XStream.PRIORITY_VERY_HIGH);


   if (isAws(this.applicationInfoManager.getInfo())) {
      this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
            this.eurekaClientConfig, this.registry, this.applicationInfoManager);
      this.awsBinder.start();
   }


   EurekaServerContextHolder.initialize(this.serverContext);


   log.info("Initialized server context");


   // Copy registry from neighboring eureka node
   int registryCount = this.registry.syncUp();
   this.registry.openForTraffic(this.applicationInfoManager, registryCount);


   // Register all monitoring statistics.
   EurekaMonitors.registerAllStats();
}

分析:

  • 注册JSON和XML序列化转换器以保持向后兼容性。

  • 如果应用信息表明运行在AWS环境中,则初始化并启动AWS绑定代理。

  • 初始化Eureka服务器上下文。

  • 从邻近Eureka节点同步注册表数据,并打开流量:registry.syncUp():

  • 注册所有监控统计信息:registry.openForTraffic:

4、PeerAwareInstanceRegistryImpl.syncUp()

public int syncUp() {
    // Copy entire entry from neighboring DS node
    int count = 0;


    // 【1】serverConfig获取配置项(注册重试次数)
    for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
        if (i > 0) {
            try {
                // 【2】serverConfig获取配置项(重试间隔),进行休眠
                Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
            } catch (InterruptedException e) {
                logger.warn("Interrupted during registry transfer..");
                break;
            }
        }
        Applications apps = eurekaClient.getApplications();
        for (Application app : apps.getRegisteredApplications()) {
            for (InstanceInfo instance : app.getInstances()) {
                try {
                    if (isRegisterable(instance)) {
                        register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                        count++;
                    }
                } catch (Throwable t) {
                    logger.error("During DS init copy", t);
                }
            }
        }
    }
    return count;
}

5、PeerAwareInstanceRegistryImpl#openForTraffic

@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
    this.expectedNumberOfClientsSendingRenews = count;
    updateRenewsPerMinThreshold();
    logger.info("Got {} instances from neighboring DS node", count);
    logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
    this.startupTime = System.currentTimeMillis();
    if (count > 0) {
        this.peerInstancesTransferEmptyOnStartup = false;
    }
    DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
    boolean isAws = Name.Amazon == selfName;
    if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
        logger.info("Priming AWS connections for all replicas..");
        primeAwsReplicas(applicationInfoManager);
    }
    logger.info("Changing status to UP");
    // 将当前的EurekaServer上线
    applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
    super.postInit();
}

6、AbstractInstanceRegistry#postInit

private Timer evictionTimer = new Timer("Eureka-EvictionTimer", true);


private final AtomicReference<EvictionTask> evictionTaskRef = new AtomicReference<EvictionTask>();


protected void postInit() {
    renewsLastMin.start();
    if (evictionTaskRef.get() != null) {
        evictionTaskRef.get().cancel();
    }
    evictionTaskRef.set(new EvictionTask());
    //【1】剔除任务执行
    evictionTimer.schedule(evictionTaskRef.get(),
            serverConfig.getEvictionIntervalTimerInMs(),
            serverConfig.getEvictionIntervalTimerInMs());
}


class EvictionTask extends TimerTask {


    private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);


    @Override
    public void run() {
        try {
            long compensationTimeMs = getCompensationTimeMs();
            logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
            // 剔除任务
            evict(compensationTimeMs);
        } catch (Throwable e) {
            logger.error("Could not run the evict task", e);
        }
    }
}


public void evict(long additionalLeaseMs) {
    logger.debug("Running the evict task");
    // 【1】判断是否进行剔除操作
    if (!isLeaseExpirationEnabled()) {
        logger.debug("DS: lease expiration is currently disabled.");
        return;
    }


    // 【2】遍历注册服务列表
    List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
    for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
        Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
        if (leaseMap != null) {
            for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                Lease<InstanceInfo> lease = leaseEntry.getValue();
                if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                    expiredLeases.add(lease);
                }
            }
        }
    }


    int registrySize = (int) getLocalRegistrySize();
    int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
    int evictionLimit = registrySize - registrySizeThreshold;


    // 【3】从失效租约数量,和失效最大限制里,取最小值
    int toEvict = Math.min(expiredLeases.size(), evictionLimit);
    if (toEvict > 0) {
        logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);


        Random random = new Random(System.currentTimeMillis());
        // 【4】循环剔除
        for (int i = 0; i < toEvict; i++) {
            // 【5】随机从expiredLeases里剔除
            int next = i + random.nextInt(expiredLeases.size() - i);
            Collections.swap(expiredLeases, i, next);
            Lease<InstanceInfo> lease = expiredLeases.get(i);


            String appName = lease.getHolder().getAppName();
            String id = lease.getHolder().getId();
            EXPIRED.increment();
            logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
            // 【6】剔除服务名
            internalCancel(appName, id, false);
        }
    }
}


  protected boolean internalCancel(String appName, String id, boolean isReplication) {
      read.lock();
      try {
          CANCEL.increment(isReplication);
          // 【7】找到注册服务信息
          Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
          Lease<InstanceInfo> leaseToCancel = null;
          if (gMap != null) {
              // 【8】找到租约
              leaseToCancel = gMap.remove(id);
          }


          //....


          if (leaseToCancel == null) {
              CANCEL_NOT_FOUND.increment(isReplication);
              logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
              return false;
          } else {
              // 【9】租约取消
              leaseToCancel.cancel();
              // 【10】找到取消的实例信息
              InstanceInfo instanceInfo = leaseToCancel.getHolder();
              String vip = null;
              String svip = null;
              if (instanceInfo != null) {
                  instanceInfo.setActionType(ActionType.DELETED);
                  recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                  instanceInfo.setLastUpdatedTimestamp();
                  vip = instanceInfo.getVIPAddress();
                  svip = instanceInfo.getSecureVipAddress();
              }
              // 【11】取消实例信息、vip区域、svip区域
              invalidateCache(appName, vip, svip);
              logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
          }
      } finally {
          read.unlock();
      }


      synchronized (lock) {
          if (this.expectedNumberOfClientsSendingRenews > 0) {
              // Since the client wants to cancel it, reduce the number of clients to send renews.
              this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
              updateRenewsPerMinThreshold();
          }
      }


      return true;
  }


protected boolean internalCancel(String appName, String id, boolean isReplication) {
  read.lock();
  try {
      CANCEL.increment(isReplication);
      Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
      Lease<InstanceInfo> leaseToCancel = null;
      if (gMap != null) {
          leaseToCancel = gMap.remove(id);
      }
      recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
      InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
      if (instanceStatus != null) {
          logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
      }
      if (leaseToCancel == null) {
          CANCEL_NOT_FOUND.increment(isReplication);
          logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
          return false;
      } else {
          leaseToCancel.cancel();
          InstanceInfo instanceInfo = leaseToCancel.getHolder();
          String vip = null;
          String svip = null;
          if (instanceInfo != null) {
              instanceInfo.setActionType(ActionType.DELETED);
              recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
              instanceInfo.setLastUpdatedTimestamp();
              vip = instanceInfo.getVIPAddress();
              svip = instanceInfo.getSecureVipAddress();
          }
          invalidateCache(appName, vip, svip);
          logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
      }
  } finally {
      read.unlock();
  }


  synchronized (lock) {
      if (this.expectedNumberOfClientsSendingRenews > 0) {
          // Since the client wants to cancel it, reduce the number of clients to send renews.
          this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
          updateRenewsPerMinThreshold();
      }
  }


  return true;
}

分析:

  • evictionTimer定时启动任务EvictionTask,即剔除任务

7842dba907c0b6865da01a39aef29571.png

路线二:@Bean 初始化 DefaultEurekaServerContext

@Bean初始化:new DefaultEurekaServerContext

@Configuration(proxyBeanMethods = false)
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
      InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {


    @Bean
    @ConditionalOnMissingBean
    public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
          PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
       return new DefaultEurekaServerContext(
               this.eurekaServerConfig, serverCodecs, registry, peerEurekaNodes, this.applicationInfoManager);
    }
}
DefaultEurekaServerContext#initialize

在DefaultEurekaServerContext接口可以看到@PostConstruct修饰了initialize()方法,那么跟着初始化。

@PostConstruct
@Override
public void initialize() {
    logger.info("Initializing ...");
    peerEurekaNodes.start();
    try {
        registry.init(peerEurekaNodes);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
    logger.info("Initialized");
}

代码分析:

- 步骤【1】peerEurekaNodes.start():启动peerEurekaNodes。

- 步骤【2】registry.init(peerEurekaNodes):尝试初始化registry,参数为peerEurekaNodes。

 步骤【1】:PeerEurekaNodes#start

public void start() {
    // 【1】初始化单线程池
    taskExecutor = Executors.newSingleThreadScheduledExecutor(
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                    thread.setDaemon(true);
                    return thread;
                }
            }
    );
    try {
        updatePeerEurekaNodes(resolvePeerUrls());
        // 【2】初始化任务
        Runnable peersUpdateTask = new Runnable() {
            @Override
            public void run() {
                try {
                  // 【2.1】任务的详情,就是更新Eureka节点信息
                    updatePeerEurekaNodes(resolvePeerUrls());
                } catch (Throwable e) {
                    logger.error("Cannot update the replica Nodes", e);
                }


            }
        };
        // 【3】线程池执行任务
        taskExecutor.scheduleWithFixedDelay(
                peersUpdateTask,
                serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                TimeUnit.MILLISECONDS
        );
    } catch (Exception e) {
        throw new IllegalStateException(e);
    }
    for (PeerEurekaNode node : peerEurekaNodes) {
        logger.info("Replica node URL:  {}", node.getServiceUrl());
    }
}

代码分析:

- 新建了一个定时任务,用单线程池周期性去执行。

 步骤【2】:PeerAwareInstanceRegistryImpl#init
@Override
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
    this.numberOfReplicationsLastMin.start();
    this.peerEurekaNodes = peerEurekaNodes;
    //【1】
    initializedResponseCache();
    //【2】
    scheduleRenewalThresholdUpdateTask();
    //【3】
    initRemoteRegionRegistry();


    try {
        Monitors.registerObject(this);
    } catch (Throwable e) {
        logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
    }
}

分析:

  • 【1】缓存相关的初始化信息:   initializedResponseCache();

  • 【2】更新续约的阀值:scheduleRenewalThresholdUpdateTask();

  • 【3】初始化远程区域注册表:initRemoteRegionRegistry();

[1]缓存相关的初始化信息 initializedResponseCache
@Override
public synchronized void initializedResponseCache() {
    if (responseCache == null) {
        responseCache = new ResponseCacheImpl(serverConfig, serverCodecs, this);
    }
}

分析:

  • 最终new了一个ResponseCacheImpl类

查看一下ResponseCacheImpl的构造器

private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();


private final java.util.Timer timer = new java.util.Timer("Eureka-CacheFillTimer", true);


private final LoadingCache<Key, Value> readWriteCacheMap;


ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
    // 【1】
    this.serverConfig = serverConfig;
    this.serverCodecs = serverCodecs;
    this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
    // 【2】
    this.registry = registry;


    long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
    // 【3】
    this.readWriteCacheMap =
            CacheBuilder.newBuilder()
                    .initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
                    .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
                    .removalListener(new RemovalListener<Key, Value>() {
                        @Override
                        public void onRemoval(RemovalNotification<Key, Value> notification) {
                            Key removedKey = notification.getKey();
                            if (removedKey.hasRegions()) {
                                Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                                regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                            }
                        }
                    })
                    .build(new CacheLoader<Key, Value>() {
                        @Override
                        public Value load(Key key) throws Exception {
                            if (key.hasRegions()) {
                                Key cloneWithNoRegions = key.cloneWithoutRegions();
                                regionSpecificKeys.put(cloneWithNoRegions, key);
                            }
                            Value value = generatePayload(key);
                            return value;
                        }
                    });


    // 【4】
    if (shouldUseReadOnlyResponseCache) {
        timer.schedule(getCacheUpdateTask(),
                new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                        + responseCacheUpdateIntervalMs),
                responseCacheUpdateIntervalMs);
    }


    try {
        Monitors.registerObject(this);
    } catch (Throwable e) {
        logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);
    }
}


private TimerTask getCacheUpdateTask() {
    return new TimerTask() {
        @Override
        public void run() {
            logger.debug("Updating the client cache from response cache");
            for (Key key : readOnlyCacheMap.keySet()) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
                            key.getEntityType(), key.getName(), key.getVersion(), key.getType());
                }
                try {
                    CurrentRequestVersion.set(key.getVersion());
                    Value cacheValue = readWriteCacheMap.get(key);
                    Value currentCacheValue = readOnlyCacheMap.get(key);
                    if (cacheValue != currentCacheValue) {
                        readOnlyCacheMap.put(key, cacheValue);
                    }
                } catch (Throwable th) {
                    logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
                } finally {
                    CurrentRequestVersion.remove();
                }
            }
        }
    };
}

分析:

  • 【1】serverConfig:是配置类,专门读取“eureka.server”开头的配置项

    • useReadOnlyResponseCache:默认是true,是否开启缓存 配置参数:eureka.server.use-read-only-response-cache

    • responseCacheUpdateIntervalMs:默认30 * 1000ms,如果开启缓存,缓存多少时间同步一次。配置参数:eureka.server.response-cache-update-interval-ms

    • initialCapacityOfResponseCache:默认1000

    • responseCacheAutoExpirationInSeconds:默认180

  • 【2】注册器赋值

  • 【3】实例信息保存在LoadingCache里

  • 【4】新建了一个定时任务,启动了一个名为Eureka-CacheFillTimer的定时更新缓存任务

[2]更新续约的阈值 initializedResponseCache
private void scheduleRenewalThresholdUpdateTask() {
    timer.schedule(new TimerTask() {
                       @Override
                       public void run() {
                           updateRenewalThreshold();
                       }
                   }, serverConfig.getRenewalThresholdUpdateIntervalMs(),
            serverConfig.getRenewalThresholdUpdateIntervalMs());
}


private void updateRenewalThreshold() {
    try {
        Applications apps = eurekaClient.getApplications();
        int count = 0;
        for (Application app : apps.getRegisteredApplications()) {
            for (InstanceInfo instance : app.getInstances()) {
                if (this.isRegisterable(instance)) {
                    ++count;
                }
            }
        }
        synchronized (lock) {
            // Update threshold only if the threshold is greater than the
            // current expected threshold or if self preservation is disabled.
            if ((count) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfClientsSendingRenews)
                    || (!this.isSelfPreservationModeEnabled())) {
                this.expectedNumberOfClientsSendingRenews = count;
                updateRenewsPerMinThreshold();
            }
        }
        logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
    } catch (Throwable e) {
        logger.error("Cannot update renewal threshold", e);
    }
}

代码分析

  • 新建了一个定时任务,并用定时器周期性去执行

    • ‍获取所有注册的应用实例信息。

    • 统计符合条件的可注册实例数量。

    • 在同步锁内,若统计数量大于当前预期阈值或自我保护模式未启用,则更新预期发送心跳的客户端数量,并重新计算心跳阈值。

    • 记录更新后的心跳阈值日志。

步骤【2】initializedResponseCache 最终启动了一个RenewalThresholdTask

小结

后台启动了三个Timer定时线程

  • EurekaServerInitializerConfiguration#start

    • 启动一个定时任务:周期执行EvictionTask

      • 功能:清理过期的注册信息。

  • DefaultEurekaServerContext的@Bean初始化过程

    • 启动了一个定时任务:CacheUpdateTask

      • 功能:自动过期缓存中的响应。(Eureka Server 维护了一个响应缓存,用于存储客户端请求的结果以提高性能。这个定时任务负责定期使缓存中的条目过期,以确保缓存数据的新鲜度)

    • 启动了一个定时任务:RenewalThresholdTask

      • 功能:更新续约阈值(这个阈值是基于最近一段时间内收到的续约请求数量动态计算的)

总结

Eureka Client的续约机制是确保服务注册信息准确性的关键,通过定时向Eureka Server发送续约请求,Eureka Client能够有效地维护其在服务注册表中的状态。

同时,Eureka Server通过监控续约情况,及时剔除不活跃的服务实例,保证了服务发现的可靠性。

其他文章

SpringCloud源码:客户端分析(一)- SpringBootApplication注解类加载流程

SpringCloud源码:客户端分析(二)- 客户端源码分析

Kafka消息堆积问题排查

基于SpringMVC的API灰度方案

理解到位:灾备和只读数据库

SQL治理经验谈:索引覆盖

Mybatis链路分析:JDK动态代理和责任链模式的应用

大模型安装部署、测试、接入SpringCloud应用体系

Mybatis插件-租户ID的注入&拦截应用

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部