Eureka Server端源码分析

Eureka架构图

Eurek架构图

Eureka核心功能

  • 服务注册(Register):Eureka Client会通过发送REST请求的方式向Eureka Server注册自己的服务,提供自身的元数据,比如ip地址、端 口、运行状况指标的url、主页地址等信息。Eureka Server接收到注册请求后,就会把这些元数据信息存储在一个双层的Map中。
  • 服务续约(Renew):在服务注册后,Eureka Client会维护一个心跳来持续通知Eureka Server,说明服务一直处于可用状态,防止被剔 除。Eureka Client在默认的情况下会每隔30秒(eureka.instance.leaseRenewalIntervalInSeconds)发送一次心跳来进行服务续约。
  • 服务下线(Cancel):当Eureka Client需要关闭或重启时,就不希望在这个时间段内再有请求进来,所以,就需要提前先发送REST请求给 Eureka Server,告诉Eureka Server自己要下线了,Eureka Server在收到请求后,就会把该服务状态置为下线(DOWN),并把该下线 事件传播出去。
  • 服务同步(Replicate):Eureka Server之间会互相进行注册,构建Eureka Server集群,不同Eureka Server之间会进行服务同步,用来保 证服务信息的一致性。
  • 获取服务注册信息(Get Registry):服务消费者(Eureka Client)在启动的时候,会发送一个REST请求给Eureka Server,获取上面注册的服务清 单,并且缓存在Eureka Client本地,默认缓存30秒(eureka.client.registryFetchIntervalSeconds)。同时,为了性能考虑,Eureka Server也会维护一份只读的服务清单缓存,该缓存每隔30秒更新一次。
  • 服务调用(Make Remote Call):服务消费者在获取到服务清单后,就可以根据清单中的服务列表信息,查找到其他服务的地址,从而进行远程调用。Eureka有 Region和Zone的概念,一个Region可以包含多个Zone,在进行服务调用时,优先访问处于同一个Zone中的服务提供者。
  • 服务剔除(Evict):有时候,服务实例可能会因为网络故障等原因导致不能提供服务,而此时该实例也没有发送请求给Eureka Server来进 行服务下线,所以,还需要有服务剔除的机制。Eureka Server在启动的时候会创建一个定时任务,每隔一段时间(默认60秒),从当前服务清单中把超时没有续约(默认90秒,eureka.instance.leaseExpirationDurationInSeconds)的服务剔除。180s被剔除
  • 自我保护机制:既然Eureka Server会定时剔除超时没有续约的服务,那就有可能出现一种场景,网络一段时间内发生了异常,所有的服务都没能够进行续约,Eureka Server就把所有的服务都剔除了,这样显然不太合理。所以,就有了自我保护机制,当短时间内,统计续约失败的 比例,如果达到一定阈值,则会触发自我保护的机制,在该机制下,Eureka Server不会剔除任何的微服务,等到正常后,再退出自我保护 机制。自我保护开关(eureka.server.enable-self-preservation: false)

源码分析

2.1 主流程图

主流程图

2.2 关键代码分析

  • EurekaServerAutoConfiguration

Eureka服务端的自动配置类,在服务启动时,做相关初始化

@Configuration(proxyBeanMethods = false)
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
    InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {


  // Eureka对外提供的访问接口,咱访问的Eureka页面就是从这里访问
  @Bean
  @ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled",
      matchIfMissing = true)
  public EurekaController eurekaController() {
    return new EurekaController(this.applicationInfoManager);
  }
  
  // 不仅有基础的服务注册、下线、续约、剔除、获取应用、实例信息的功能,
  // 还扩展了多节点时,注册表信息同步的功能。
  @Bean
  public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
      ServerCodecs serverCodecs) {
    this.eurekaClient.getApplications(); // force initialization
    return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
        serverCodecs, this.eurekaClient,
        this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
        this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
  }
  
  //管理PeerEurekaNodes集合的生命周期的Helper类
  @Bean
  @ConditionalOnMissingBean
  public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
      ServerCodecs serverCodecs,
      ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {
    return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
        this.eurekaClientConfig, serverCodecs, this.applicationInfoManager,
        replicationClientAdditionalFilters);
  }
  
  // Eureka Server上下文,提供了获取Eureka Server的配置、
  // Eureka节点、操作的注册表Registry
  @Bean
  @ConditionalOnMissingBean
  public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
      PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
    return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
        registry, peerEurekaNodes, this.applicationInfoManager);
  }
  
  // 一个启动类,负责初始化Eureka运行环境和运行上下文
  @Bean
  public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
      EurekaServerContext serverContext) {
    return new EurekaServerBootstrap(this.applicationInfoManager,
        this.eurekaClientConfig, this.eurekaServerConfig, registry,
        serverContext);
  }
  
  // 注册一个过滤器,拦截Eureka Server对外提供的Rest请求
  @Bean
  public FilterRegistrationBean<?> jerseyFilterRegistration(
      javax.ws.rs.core.Application eurekaJerseyApp) {
    FilterRegistrationBean bean = new FilterRegistrationBean();
    bean.setFilter(new ServletContainer(eurekaJerseyApp));
    bean.setOrder(Ordered.LOWEST_PRECEDENCE);
    bean.setUrlPatterns(
        Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
    return bean;
  }


}


  • EurekaServerInitializerConfiguration

实现了SmartLifecycle接口的类,有个特点,isAutoStartup()返回了true时,会执行start()方法。

@Configuration(proxyBeanMethods = false)
public class EurekaServerInitializerConfiguration
    implements ServletContextAware, SmartLifecycle, Ordered {
  
  @Override
  public void start() {
    new Thread(() -> {
      try {
        // 初始化和启动Eureka Server
        eurekaServerBootstrap.contextInitialized(
            EurekaServerInitializerConfiguration.this.servletContext);
        log.info("Started Eureka Server");


        publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
        // 修改EurekaServer的运行状态为true
        EurekaServerInitializerConfiguration.this.running = true;
        publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
      }
      catch (Exception ex) {
        // Help!
        log.error("Could not initialize Eureka servlet context", ex);
      }
    }).start();
  }
   
  @Override
  public boolean isAutoStartup() {
    return true;
  } 
}


  • EurekaServerBootstrap
public class EurekaServerBootstrap {
  public void contextInitialized(ServletContext context) {
    try {
     // 初始化Eureka的环境信息
      initEurekaEnvironment();
      initEurekaServerContext();


      context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
    }
    catch (Throwable e) {
      log.error("Cannot bootstrap eureka server :", e);
      throw new RuntimeException("Cannot bootstrap eureka server :", e);
    }
  }
  
    protected void initEurekaServerContext() throws Exception {
     // ...
    // Copy registry from neighboring eureka node
    int registryCount = this.registry.syncUp();
    this.registry.openForTraffic(this.applicationInfoManager, registryCount);
         // ...
    }
}

Eureka Server初始化最核心的两个方法:

  • syncUp()

功能:从其它Peer Nodes同步注册表信息,并注册到当前Eureka节点的注册表中。

执行syncUp逻辑的前提是
serverConfig.getRegistrySyncRetries()
大于0,EurekaServerConfigBean‍默认配置是0,所以不会执行同步操作,从
EurekaServerAutoConfiguration
下面代码段:

@Configuration(proxyBeanMethods = false)
  protected static class EurekaServerConfigBeanConfiguration {
    @Bean
    @ConditionalOnMissingBean
    public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {
      EurekaServerConfigBean server = new EurekaServerConfigBean();
      if (clientConfig.shouldRegisterWithEureka()) {
        // Set a sensible default if we are supposed to replicate
        server.setRegistrySyncRetries(5);
      }
      return server;
    }
  }

可以看出Eureka Server需要做如下配置,syncUp()才能执行:

eureka:
  client:
    register-with-eureka: true
@Override
public int syncUp() {
    // Copy entire entry from neighboring DS node
    int count = 0;


    for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
        if (i > 0) {
            try {
                // 除第1次外,后面重试间隔30s
                Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
            } catch (InterruptedException e) {
                logger.warn("Interrupted during registry transfer..");
                break;
            }
        }
        // 获取已注册的客户端应用
        Applications apps = eurekaClient.getApplications();
        for (Application app : apps.getRegisteredApplications()) {
        // 遍历客户端注册应用的实例
            for (InstanceInfo instance : app.getInstances()) {
                try {
                    if (isRegisterable(instance)) {
                    // 往当前Eureka Server节点注册实例信息,如何注册放在客户端源码分析详解
                        register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                        count++;
                    }
                } catch (Throwable t) {
                    logger.error("During DS init copy", t);
                }
            }
        }
    }
    // 返回同步实例数量
    return count;
}


  • openForTraffic
 @Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
    this.expectedNumberOfClientsSendingRenews = count;
    updateRenewsPerMinThreshold();
    logger.info("Got {} instances from neighboring DS node", count);
    logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
    this.startupTime = System.currentTimeMillis();
    if (count > 0) {
        this.peerInstancesTransferEmptyOnStartup = false;
    }
    DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
    boolean isAws = Name.Amazon == selfName;
    if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
        logger.info("Priming AWS connections for all replicas..");
        primeAwsReplicas(applicationInfoManager);
    }
    logger.info("Changing status to UP");
    // 修改实例的状态为UP
    applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
    
    super.postInit();
}


public abstract class AbstractInstanceRegistry implements InstanceRegistry {
    // ...
    
    protected void postInit() {
        renewsLastMin.start();
        if (evictionTaskRef.get() != null) {
            evictionTaskRef.get().cancel();
        }
        evictionTaskRef.set(new EvictionTask());
        // 默认每60执行一次EvictionTask
        evictionTimer.schedule(evictionTaskRef.get(),
                serverConfig.getEvictionIntervalTimerInMs(),
                serverConfig.getEvictionIntervalTimerInMs());
    }
    
    class EvictionTask extends TimerTask {


        private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);


        @Override
        public void run() {
            try {
                long compensationTimeMs = getCompensationTimeMs();
                logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
                // 服务提出的核心逻辑,详细逻辑放在后面的源码分析详细分解
                evict(compensationTimeMs);
            } catch (Throwable e) {
                logger.error("Could not run the evict task", e);
            }
        }
    
   // ...
}


下面三个类是,应用信息、实例信息、租约信息的结构,了解有利于其它核心功能的理解。

  • Application
public class Application {
      // 应用名称
      private String name;
      // ?
      @XStreamOmitField
      private volatile boolean isDirty = false;
      // 实例列表
      @XStreamImplicit
      private final Set instances;
      // 实例列表
      private final AtomicReference> shuffledInstances;
      // 
      private final Map instancesMap;


}


  • InstanceInfo

应用的实例信息,

public class InstanceInfo {


    private static final String VERSION_UNKNOWN = "unknown";


    /**
     * {@link InstanceInfo} JSON and XML format for port information does not follow the usual conventions, which
     * makes its mapping complicated. This class represents the wire format for port information.
     */
    public static class PortWrapper {
        private final boolean enabled;
        private final int port;


        @JsonCreator
        public PortWrapper(@JsonProperty("@enabled") boolean enabled, @JsonProperty("#34;) int port) {
            this.enabled = enabled;
            this.port = port;
        }


        public boolean isEnabled() {
            return enabled;
        }


        public int getPort() {
            return port;
        }
    }


    private static final Logger logger = LoggerFactory.getLogger(InstanceInfo.class);


    public static final int DEFAULT_PORT = 7001;
    public static final int DEFAULT_SECURE_PORT = 7002;
    public static final int DEFAULT_COUNTRY_ID = 1; // US


    // 实例ID,应用内唯一
    private volatile String instanceId;
    // ??这个跟Application中的是否相同
    private volatile String appName;
    @Auto
    private volatile String appGroupName;
    // ip地址
    private volatile String ipAddr;
    
    private static final String SID_DEFAULT = "na";
    @Deprecated
    private volatile String sid = SID_DEFAULT;
    // 实例的默认端口是7001
    private volatile int port = DEFAULT_PORT;
    
    private volatile int securePort = DEFAULT_SECURE_PORT;


    @Auto
    private volatile String homePageUrl;
    @Auto
    private volatile String statusPageUrl;
    @Auto
    private volatile String healthCheckUrl;
    @Auto
    private volatile String secureHealthCheckUrl;
    @Auto
    private volatile String vipAddress;
    @Auto
    private volatile String secureVipAddress;
    @XStreamOmitField
    private String statusPageRelativeUrl;
    @XStreamOmitField
    private String statusPageExplicitUrl;
    @XStreamOmitField
    private String healthCheckRelativeUrl;
    @XStreamOmitField
    private String healthCheckSecureExplicitUrl;
    @XStreamOmitField
    private String vipAddressUnresolved;
    @XStreamOmitField
    private String secureVipAddressUnresolved;
    @XStreamOmitField
    private String healthCheckExplicitUrl;
    @Deprecated
    private volatile int countryId = DEFAULT_COUNTRY_ID; // Defaults to US
    private volatile boolean isSecurePortEnabled = false;
    private volatile boolean isUnsecurePortEnabled = true;
    private volatile DataCenterInfo dataCenterInfo;
    // 主机名
    private volatile String hostName;
    // 实例状态
    private volatile InstanceStatus status = InstanceStatus.UP;
    private volatile InstanceStatus overriddenStatus = InstanceStatus.UNKNOWN;
    @XStreamOmitField
    private volatile boolean isInstanceInfoDirty = false;
    // 实例的租约信息
    private volatile LeaseInfo leaseInfo;
    @Auto
    private volatile Boolean isCoordinatingDiscoveryServer = Boolean.FALSE;
    @XStreamAlias("metadata")
    private volatile Map metadata;
    @Auto
    private volatile Long lastUpdatedTimestamp;
    @Auto
    private volatile Long lastDirtyTimestamp;
    @Auto
    private volatile ActionType actionType;
    @Auto
    private volatile String asgName;
    private String version = VERSION_UNKNOWN;
}


  • LeaseInfo

服务注册到Eureka上的租约信息。

public class LeaseInfo {
    // 默认的续约时间间隔,每30s
    public static final int DEFAULT_LEASE_RENEWAL_INTERVAL = 30;
    // 表示eureka服务器从接收到最后一次心跳后等待的时间(以秒为单位)
    public static final int DEFAULT_LEASE_DURATION = 90;


    // Client settings
    private int renewalIntervalInSecs = DEFAULT_LEASE_RENEWAL_INTERVAL;
    private int durationInSecs = DEFAULT_LEASE_DURATION;


    // Server populated
    // 服务注册时间
    private long registrationTimestamp;
    // 上次续约时间
    private long lastRenewalTimestamp;
    // 剔除时间
    private long evictionTimestamp;
    // 服务运行时间
    private long serviceUpTimestamp;
    
}


  • AbstractInstanceRegistry

处理Eureka客户端的所有注册请求,包括注册、续约、下线、过期、状态更新

public abstract class AbstractInstanceRegistry implements InstanceRegistry {
    // 注册表
    private final ConcurrentHashMap>> registry
        = new ConcurrentHashMap>>();


}

Eureka注册表的UML


总结

Eureka Server端启动时,主要是做上下文初始化工作,对于Eureka集群会从其它节点同步注册表,并更新实例的状态等。另外,还会创建服务剔除的定时任务,对过期不续约的服务进行剔除。

发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章