Eurek架构图
Eureka核心功能
2.1 主流程图
主流程图
2.2 关键代码分析
Eureka服务端的自动配置类,在服务启动时,做相关初始化
@Configuration(proxyBeanMethods = false)
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
// Eureka对外提供的访问接口,咱访问的Eureka页面就是从这里访问
@Bean
@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled",
matchIfMissing = true)
public EurekaController eurekaController() {
return new EurekaController(this.applicationInfoManager);
}
// 不仅有基础的服务注册、下线、续约、剔除、获取应用、实例信息的功能,
// 还扩展了多节点时,注册表信息同步的功能。
@Bean
public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
ServerCodecs serverCodecs) {
this.eurekaClient.getApplications(); // force initialization
return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
serverCodecs, this.eurekaClient,
this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
}
//管理PeerEurekaNodes集合的生命周期的Helper类
@Bean
@ConditionalOnMissingBean
public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
ServerCodecs serverCodecs,
ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {
return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
this.eurekaClientConfig, serverCodecs, this.applicationInfoManager,
replicationClientAdditionalFilters);
}
// Eureka Server上下文,提供了获取Eureka Server的配置、
// Eureka节点、操作的注册表Registry
@Bean
@ConditionalOnMissingBean
public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
registry, peerEurekaNodes, this.applicationInfoManager);
}
// 一个启动类,负责初始化Eureka运行环境和运行上下文
@Bean
public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
EurekaServerContext serverContext) {
return new EurekaServerBootstrap(this.applicationInfoManager,
this.eurekaClientConfig, this.eurekaServerConfig, registry,
serverContext);
}
// 注册一个过滤器,拦截Eureka Server对外提供的Rest请求
@Bean
public FilterRegistrationBean<?> jerseyFilterRegistration(
javax.ws.rs.core.Application eurekaJerseyApp) {
FilterRegistrationBean bean = new FilterRegistrationBean();
bean.setFilter(new ServletContainer(eurekaJerseyApp));
bean.setOrder(Ordered.LOWEST_PRECEDENCE);
bean.setUrlPatterns(
Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
return bean;
}
}
实现了SmartLifecycle接口的类,有个特点,isAutoStartup()返回了true时,会执行start()方法。
@Configuration(proxyBeanMethods = false)
public class EurekaServerInitializerConfiguration
implements ServletContextAware, SmartLifecycle, Ordered {
@Override
public void start() {
new Thread(() -> {
try {
// 初始化和启动Eureka Server
eurekaServerBootstrap.contextInitialized(
EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
// 修改EurekaServer的运行状态为true
EurekaServerInitializerConfiguration.this.running = true;
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
}
catch (Exception ex) {
// Help!
log.error("Could not initialize Eureka servlet context", ex);
}
}).start();
}
@Override
public boolean isAutoStartup() {
return true;
}
}
public class EurekaServerBootstrap {
public void contextInitialized(ServletContext context) {
try {
// 初始化Eureka的环境信息
initEurekaEnvironment();
initEurekaServerContext();
context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
}
catch (Throwable e) {
log.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
protected void initEurekaServerContext() throws Exception {
// ...
// Copy registry from neighboring eureka node
int registryCount = this.registry.syncUp();
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
// ...
}
}
Eureka Server初始化最核心的两个方法:
功能:从其它Peer Nodes同步注册表信息,并注册到当前Eureka节点的注册表中。
执行syncUp逻辑的前提是
serverConfig.getRegistrySyncRetries() 大于0,EurekaServerConfigBean默认配置是0,所以不会执行同步操作,从
EurekaServerAutoConfiguration下面代码段:
@Configuration(proxyBeanMethods = false)
protected static class EurekaServerConfigBeanConfiguration {
@Bean
@ConditionalOnMissingBean
public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {
EurekaServerConfigBean server = new EurekaServerConfigBean();
if (clientConfig.shouldRegisterWithEureka()) {
// Set a sensible default if we are supposed to replicate
server.setRegistrySyncRetries(5);
}
return server;
}
}
可以看出Eureka Server需要做如下配置,syncUp()才能执行:
eureka:
client:
register-with-eureka: true
@Override
public int syncUp() {
// Copy entire entry from neighboring DS node
int count = 0;
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
// 除第1次外,后面重试间隔30s
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
// 获取已注册的客户端应用
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
// 遍历客户端注册应用的实例
for (InstanceInfo instance : app.getInstances()) {
try {
if (isRegisterable(instance)) {
// 往当前Eureka Server节点注册实例信息,如何注册放在客户端源码分析详解
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
// 返回同步实例数量
return count;
}
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// Renewals happen every 30 seconds and for a minute it should be a factor of 2.
this.expectedNumberOfClientsSendingRenews = count;
updateRenewsPerMinThreshold();
logger.info("Got {} instances from neighboring DS node", count);
logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
this.startupTime = System.currentTimeMillis();
if (count > 0) {
this.peerInstancesTransferEmptyOnStartup = false;
}
DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
boolean isAws = Name.Amazon == selfName;
if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
}
logger.info("Changing status to UP");
// 修改实例的状态为UP
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
super.postInit();
}
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
// ...
protected void postInit() {
renewsLastMin.start();
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
evictionTaskRef.set(new EvictionTask());
// 默认每60执行一次EvictionTask
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}
class EvictionTask extends TimerTask {
private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
@Override
public void run() {
try {
long compensationTimeMs = getCompensationTimeMs();
logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
// 服务提出的核心逻辑,详细逻辑放在后面的源码分析详细分解
evict(compensationTimeMs);
} catch (Throwable e) {
logger.error("Could not run the evict task", e);
}
}
// ...
}
下面三个类是,应用信息、实例信息、租约信息的结构,了解有利于其它核心功能的理解。
public class Application {
// 应用名称
private String name;
// ?
@XStreamOmitField
private volatile boolean isDirty = false;
// 实例列表
@XStreamImplicit
private final Set instances;
// 实例列表
private final AtomicReference> shuffledInstances;
//
private final Map instancesMap;
}
应用的实例信息,
public class InstanceInfo {
private static final String VERSION_UNKNOWN = "unknown";
/**
* {@link InstanceInfo} JSON and XML format for port information does not follow the usual conventions, which
* makes its mapping complicated. This class represents the wire format for port information.
*/
public static class PortWrapper {
private final boolean enabled;
private final int port;
@JsonCreator
public PortWrapper(@JsonProperty("@enabled") boolean enabled, @JsonProperty("#34;) int port) {
this.enabled = enabled;
this.port = port;
}
public boolean isEnabled() {
return enabled;
}
public int getPort() {
return port;
}
}
private static final Logger logger = LoggerFactory.getLogger(InstanceInfo.class);
public static final int DEFAULT_PORT = 7001;
public static final int DEFAULT_SECURE_PORT = 7002;
public static final int DEFAULT_COUNTRY_ID = 1; // US
// 实例ID,应用内唯一
private volatile String instanceId;
// ??这个跟Application中的是否相同
private volatile String appName;
@Auto
private volatile String appGroupName;
// ip地址
private volatile String ipAddr;
private static final String SID_DEFAULT = "na";
@Deprecated
private volatile String sid = SID_DEFAULT;
// 实例的默认端口是7001
private volatile int port = DEFAULT_PORT;
private volatile int securePort = DEFAULT_SECURE_PORT;
@Auto
private volatile String homePageUrl;
@Auto
private volatile String statusPageUrl;
@Auto
private volatile String healthCheckUrl;
@Auto
private volatile String secureHealthCheckUrl;
@Auto
private volatile String vipAddress;
@Auto
private volatile String secureVipAddress;
@XStreamOmitField
private String statusPageRelativeUrl;
@XStreamOmitField
private String statusPageExplicitUrl;
@XStreamOmitField
private String healthCheckRelativeUrl;
@XStreamOmitField
private String healthCheckSecureExplicitUrl;
@XStreamOmitField
private String vipAddressUnresolved;
@XStreamOmitField
private String secureVipAddressUnresolved;
@XStreamOmitField
private String healthCheckExplicitUrl;
@Deprecated
private volatile int countryId = DEFAULT_COUNTRY_ID; // Defaults to US
private volatile boolean isSecurePortEnabled = false;
private volatile boolean isUnsecurePortEnabled = true;
private volatile DataCenterInfo dataCenterInfo;
// 主机名
private volatile String hostName;
// 实例状态
private volatile InstanceStatus status = InstanceStatus.UP;
private volatile InstanceStatus overriddenStatus = InstanceStatus.UNKNOWN;
@XStreamOmitField
private volatile boolean isInstanceInfoDirty = false;
// 实例的租约信息
private volatile LeaseInfo leaseInfo;
@Auto
private volatile Boolean isCoordinatingDiscoveryServer = Boolean.FALSE;
@XStreamAlias("metadata")
private volatile Map metadata;
@Auto
private volatile Long lastUpdatedTimestamp;
@Auto
private volatile Long lastDirtyTimestamp;
@Auto
private volatile ActionType actionType;
@Auto
private volatile String asgName;
private String version = VERSION_UNKNOWN;
}
服务注册到Eureka上的租约信息。
public class LeaseInfo {
// 默认的续约时间间隔,每30s
public static final int DEFAULT_LEASE_RENEWAL_INTERVAL = 30;
// 表示eureka服务器从接收到最后一次心跳后等待的时间(以秒为单位)
public static final int DEFAULT_LEASE_DURATION = 90;
// Client settings
private int renewalIntervalInSecs = DEFAULT_LEASE_RENEWAL_INTERVAL;
private int durationInSecs = DEFAULT_LEASE_DURATION;
// Server populated
// 服务注册时间
private long registrationTimestamp;
// 上次续约时间
private long lastRenewalTimestamp;
// 剔除时间
private long evictionTimestamp;
// 服务运行时间
private long serviceUpTimestamp;
}
处理Eureka客户端的所有注册请求,包括注册、续约、下线、过期、状态更新
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
// 注册表
private final ConcurrentHashMap>> registry
= new ConcurrentHashMap>>();
}
Eureka注册表的UML
Eureka Server端启动时,主要是做上下文初始化工作,对于Eureka集群会从其它节点同步注册表,并更新实例的状态等。另外,还会创建服务剔除的定时任务,对过期不续约的服务进行剔除。
留言与评论(共有 0 条评论) “” |