【SpringCloud Eureka源码】从Eureka Client提议注册要求


玖富娱乐是一家为代理招商,直属主管信息发布为主的资讯网站,同时也兼顾玖富娱乐代理注册登录地址。

目次

  • Eureka Client启动并挪用Eureka Server的注册接口
    • Spring Cloud Eureka的自动设置装备摆设
      • @EnableDiscoveryClient
      • EurekaDiscoveryClientConfiguration
      • EurekaClientAutoConfiguration
      • DiscoveryClient继承完成干系
    • 建立DiscoveryClient的历程
      • DiscoveryClient组织要领
      • 【重点】initScheduledTasks() 初始化准时义务
    • Eureka Client复制InstanceInfo,提议注册
      • InstanceInfoReplicator复制器自动准时更新
      • onDemandUpdate() 主动按需更新
    • DiscoveryClient#register() 注册
      • registrationClient - 效劳注册相干的EurekaHttpClient
      • SessionedEurekaHttpClient - 准时重连
      • RetryableEurekaHttpClient - 候选范围内失利重试
      • RedirectingEurekaHttpClient - 按Server端要求重定向到新Server
      • MetricsCollectingEurekaHttpClient - 统计网络实行状况
      • AbstractJerseyEurekaHttpClient - 底层经由过程Jersery发送注册、心跳要求
      • Eureka Server接收到的注册要求概况
    • Eureka Client注册流程总结

本文运用Spring Cloud Eureka剖析

Spring Cloud版本: Dalston.SR5

spring-cloud-starter-eureka版本: 1.3.6.RELEASE

netflix eureka版本: 1.6.2


Eureka Client启动并挪用Eureka Server的注册接口

Spring Cloud Eureka的自动设置装备摆设

@EnableDiscoveryClient

首先从运用Eureka Client必需引入的@EnableDiscoveryClient注解提及

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {

    /**
     * If true, the ServiceRegistry will automatically register the local server.
     */
    boolean autoRegister() default true;
}

@EnableDiscoveryClient注解的作用:

  • autoRegister默许值为true,即效劳发明客户端默许会自动注册到效劳端

  • Import导入EnableDiscoveryClientImportSelector.class,其作用是

    • 导入了 spring-cloud-eureka-client.jar!META-INFspring.factories 中的 EurekaDiscoveryClientConfiguration

      org.springframework.cloud.client.discovery.EnableDiscoveryClient=
      org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration
    • 由于autoRegister默许为true,故还会导入AutoServiceRegistrationConfiguration,即启用自动效劳注册的设置装备摆设,等同于在设置装备摆设文件中spring.cloud.service-registry.auto-registration.enabled = true


EurekaDiscoveryClientConfiguration

  • 向Spring容器注册EurekaDiscoveryClientConfiguration.Marker.class,使得真正导入DiscoveryClient的EurekaClientAutoConfiguration设置装备摆设类知足启用前提
  • 建立监听RefreshScopeRefreshedEvent事宜的监听器,知足在运用RefreshScope革新时能够重修EurekaClient(不是本文重点)
  • 在设置装备摆设eureka.client.healthcheck.enabled=true的前提下,向Spring容器注册EurekaHealthCheckHandler用于健康搜检(不是本文重点)

以是,EurekaDiscoveryClientConfiguration的重要作用是向Spring容器注册EurekaDiscoveryClientConfiguration.Marker.class,使得EurekaClientAutoConfiguration设置装备摆设类知足启用前提


EurekaClientAutoConfiguration

EurekaClientAutoConfiguration设置装备摆设类中触及的内容比较多,重要内容:

  • 1、注册了spring cloud包下的EurekaClientConfigBean,这是个对netflix的EurekaClientConfig客户端设置装备摆设接口的完成
  • 2、注册了spring cloud包下的EurekaInstanceConfigBean,这是个对netflix的EurekaInstanceConfig实例信息设置装备摆设接口的完成
  • 3、注册了一些AutoServiceRegistration,即客户端自动注册的组件,如
    • EurekaRegistration: Eureka实例的效劳注册信息(在开启客户端自动注册时才会注册)
    • EurekaServiceRegistry: Eureka效劳注册器
    • EurekaAutoServiceRegistration: Eureka效劳自动注册器,完成了SmartLifecycle,会在Spring容器的refresh的末了阶段被挪用,经由过程EurekaServiceRegistry注册器注册EurekaRegistration信息
  • 4、注册netflix的EurekaClientApplicationInfoManager,注册时分为两种状况,即是不是知足RefreshScope,若是知足,注入的Bean是带有 @Lazy @RefreshScope 注解
    • ApplicationInfoManager: 治理并初始化以后Instance实例的注册信息,并供应了实例状况监听机制
    • EurekaClient : netflix的接口类,用于和Eureka Server交互的客户端,而netflix的默许完成是DiscoveryClient,也是本文剖析的重点
  • 5、注册EurekaHealthIndicator,为/health端点供应Eureka相干信息,重要有Status以后实例状况和applications效劳列表,在从Eureka Server猎取效劳列表一般的状况下,Status运用Eureka Server上的InstanceRemoteStatus,不一般状况下,代码中有一些推断逻辑
public class EurekaClientAutoConfiguration {
    ...省略

    /**
     * 1、注册EurekaClientConfigBean
     */
    @Bean
    @ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT)
    public EurekaClientConfigBean eurekaClientConfigBean() {
        EurekaClientConfigBean client = new EurekaClientConfigBean();
        if ("bootstrap".equals(propertyResolver.getProperty("spring.config.name"))) {
            // We don't register during bootstrap by default, but there will be another
            // chance later.
            client.setRegisterWithEureka(false);
        }
        return client;
    }

    /**
     * 2、注册EurekaInstanceConfigBean
     */
    @Bean
    @ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT)
    public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils) throws MalformedURLException {
        PropertyResolver eurekaPropertyResolver = new RelaxedPropertyResolver(this.env, "eureka.instance.");
        String hostname = eurekaPropertyResolver.getProperty("hostname");

        boolean preferIpAddress = Boolean.parseBoolean(eurekaPropertyResolver.getProperty("preferIpAddress"));
        int nonSecurePort = Integer.valueOf(propertyResolver.getProperty("server.port", propertyResolver.getProperty("port", "8080")));
        int managementPort = Integer.valueOf(propertyResolver.getProperty("management.port", String.valueOf(nonSecurePort)));
        String managementContextPath = propertyResolver.getProperty("management.contextPath", propertyResolver.getProperty("server.contextPath", "/"));
        EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils);
        instance.setNonSecurePort(nonSecurePort);
        instance.setInstanceId(getDefaultInstanceId(propertyResolver));
        instance.setPreferIpAddress(preferIpAddress);
        if (managementPort != nonSecurePort && managementPort != 0) {
            if (StringUtils.hasText(hostname)) {
                instance.setHostname(hostname);
            }
            String statusPageUrlPath = eurekaPropertyResolver.getProperty("statusPageUrlPath");
            String healthCheckUrlPath = eurekaPropertyResolver.getProperty("healthCheckUrlPath");
            if (!managementContextPath.endsWith("/")) {
                managementContextPath = managementContextPath   "/";
            }
            if (StringUtils.hasText(statusPageUrlPath)) {
                instance.setStatusPageUrlPath(statusPageUrlPath);
            }
            if (StringUtils.hasText(healthCheckUrlPath)) {
                instance.setHealthCheckUrlPath(healthCheckUrlPath);
            }
            String scheme = instance.getSecurePortEnabled() ? "https" : "http";
            URL base = new URL(scheme, instance.getHostname(), managementPort, managementContextPath);
            instance.setStatusPageUrl(new URL(base, StringUtils.trimLeadingCharacter(instance.getStatusPageUrlPath(), '/')).toString());
            instance.setHealthCheckUrl(new URL(base, StringUtils.trimLeadingCharacter(instance.getHealthCheckUrlPath(), '/')).toString());
        }
        return instance;
    }

    /**
     * 3、注册客户端自动注册相干组件
     *     EurekaRegistration: Eureka实例的效劳注册信息(在开启客户端自动注册时才会注册)
     *     EurekaServiceRegistry: Eureka效劳注册器
     *     EurekaAutoServiceRegistration: Eureka效劳自动注册器,
     *                                     经由过程EurekaServiceRegistry注册器注册EurekaRegistration信息
     */
    @Bean
    public EurekaServiceRegistry eurekaServiceRegistry() {
        return new EurekaServiceRegistry();
    }

    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
    public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient, CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager applicationInfoManager) {
        return EurekaRegistration.builder(instanceConfig)
                .with(applicationInfoManager)
                .with(eurekaClient)
                .with(healthCheckHandler)
                .build();
    }

    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
    public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry registry, EurekaRegistration registration) {
        return new EurekaAutoServiceRegistration(context, registry, registration);
    }


    /**
     * 4、注册netflix的 EurekaClient 和 ApplicationInfoManager
     */
    // 若是禁用客户端自动注册,在此要领debug打断点会触发效劳注册,状况为STARTING
    @Bean
    public DiscoveryClient discoveryClient(EurekaInstanceConfig config, EurekaClient client) {
        return new EurekaDiscoveryClient(config, client);
    }
    
    // 一般的EurekaClient设置装备摆设(弗成革新)
    @Configuration
    @ConditionalOnMissingRefreshScope
    protected static class EurekaClientConfiguration {

        @Autowired
        private ApplicationContext context;

        @Autowired(required = false)
        private DiscoveryClientOptionalArgs optionalArgs;

        @Bean(destroyMethod = "shutdown")
        @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
        public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) {
            return new CloudEurekaClient(manager, config, this.optionalArgs,
                    this.context);
        }

        @Bean
        @ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
        public ApplicationInfoManager eurekaApplicationInfoManager(
                EurekaInstanceConfig config) {
            InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
            return new ApplicationInfoManager(config, instanceInfo);
        }
    }

     // 可革新的EurekaClient设置装备摆设类
    @Configuration
    @ConditionalOnRefreshScope  //知足@ConditionalOnClass(RefreshScope.class)         
                                //    @ConditionalOnBean(RefreshAutoConfiguration.class)
    protected static class RefreshableEurekaClientConfiguration {

        @Autowired
        private ApplicationContext context;

        @Autowired(required = false)
        private DiscoveryClientOptionalArgs optionalArgs;

        // 注册CloudEurekaClient,是com.netflix.discovery.EurekaClient接口的完成类
        @Bean(destroyMethod = "shutdown")
        @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
        @org.springframework.cloud.context.config.annotation.RefreshScope
        @Lazy
        public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance) {
            manager.getInfo(); // force initialization
            return new CloudEurekaClient(manager, config, this.optionalArgs,
                    this.context);
        }

        // 注册ApplicationInfoManager
        @Bean
        @ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
        @org.springframework.cloud.context.config.annotation.RefreshScope
        @Lazy
        public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {
            InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
            return new ApplicationInfoManager(config, instanceInfo);
        }
    }

    
    /**
     * 5、注册 EurekaHealthIndicator
     */
    @Configuration
    @ConditionalOnClass(Endpoint.class)
    protected static class EurekaHealthIndicatorConfiguration {
        @Bean
        @ConditionalOnMissingBean
        public EurekaHealthIndicator eurekaHealthIndicator(EurekaClient eurekaClient,
                                                           EurekaInstanceConfig instanceConfig, EurekaClientConfig clientConfig) {
            return new EurekaHealthIndicator(eurekaClient, instanceConfig, clientConfig);
        }
    }
}

如上,在知足一系列Conditional前提后,会向Spring容器中注册CloudEurekaClient,它是com.netflix.discovery.EurekaClient接口的完成类,详细继承完成干系以下


DiscoveryClient继承完成干系

如上图所示,方才建立的CloudEurekaClientcom.netflix.discovery.DiscoveryClient的子类,它们都完成了com.netflix.discovery.EurekaClient接口

EurekaClient是Netflix对效劳发明客户端笼统的接口,包罗许多要领,而DiscoveryClient是其默许完成,也是本文剖析的重点,CloudEurekaClient是spring cloud的完成,依据类上解释,其重要重写了onCacheRefreshed()要领,这个要领重如果从Eureka Server fetchRegistry()猎取效劳列表以后用于以播送体式格局关照缓存革新事宜的,实在DiscoveryClient也有onCacheRefreshed()要领的完成,但由于DiscoveryClient是Netflix的类,只发送了com.netflix.discovery.EurekaEvent,而CloudEurekaClient运用Spring的ApplicationEventPublisher,发送了HeartbeatEvent

注重:

上面说的都是netflix的DiscoveryClient

另有另外一个DiscoveryClient,是 org.springframework.cloud.client.discovery.DiscoveryClient

是Spring对效劳发明客户端的笼统


建立DiscoveryClient的历程

DiscoveryClient组织要领

@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, 
                EurekaClientConfig config, 
                AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider) {
    /**
     * AbstractDiscoveryClientOptionalArgs 是DiscoveryClient的可选参数,可理解为扩大点
     * 包罗healthCheckHandlerProvider、healthCheckCallbackProvider、eventListeners等
     * spring cloud默许完成为MutableDiscoveryClientOptionalArgs,但此处相干成员变量赋值后以为空
     */
    if (args != null) {
        this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
        this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
        this.eventListeners.addAll(args.getEventListeners());
    } else {
        this.healthCheckCallbackProvider = null;
        this.healthCheckHandlerProvider = null;
    }
    
    this.applicationInfoManager = applicationInfoManager;
    InstanceInfo myInfo = applicationInfoManager.getInfo();

    clientConfig = config;
    staticClientConfig = clientConfig;
    transportConfig = config.getTransportConfig();
    instanceInfo = myInfo;
    if (myInfo != null) {
        appPathIdentifier = instanceInfo.getAppName()   "/"   instanceInfo.getId();
    } else {
        logger.warn("Setting instanceInfo to a passed in null value");
    }

    this.backupRegistryProvider = backupRegistryProvider;

    this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
    localRegionApps.set(new Applications());

    fetchRegistryGeneration = new AtomicLong(0);

    remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
    remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));

    // 若是 shouldFetchRegistry=true,注册netflix servo监控
    if (config.shouldFetchRegistry()) {
        this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX   "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    } else {
        this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
    }

    // 若是 shouldRegisterWithEureka=true,注册netflix servo监控
    if (config.shouldRegisterWithEureka()) {
        this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX   "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    } else {
        this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
    }

    logger.info("Initializing Eureka in region {}", clientConfig.getRegion());

    // 若是既不要向eureka server注册,又不要猎取效劳列表,就甚么都不消初始化
    if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
        logger.info("Client configured to neither register nor query for data.");
        scheduler = null;
        heartbeatExecutor = null;
        cacheRefreshExecutor = null;
        eurekaTransport = null;
        instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

        // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
        // to work with DI'd DiscoveryClient
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);

        initTimestampMs = System.currentTimeMillis();

        logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                initTimestampMs, this.getApplications().size());
        return;  // no need to setup up an network tasks and we are done
    }

    // 【重点】建立种种Executor 和 eurekaTransport、instanceRegionChecker
    try {
        // 实行准时义务的准时器,准时线程名为 DiscoveryClient-%d
        // 在准时器中用于准时实行TimedSupervisorTask监视义务,监视义务会强迫超时 和 纪录监控数据
        scheduler = Executors.newScheduledThreadPool(3,
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-%d")
                        .setDaemon(true)
                        .build());

        // 实行heartbeat心跳义务的实行器,默许最大线程数=2,线程名为:DiscoveryClient-HeartbeatExecutor-%d
        heartbeatExecutor = new ThreadPoolExecutor(
                1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                        .setDaemon(true)
                        .build()
        );  // use direct handoff

        // 实行效劳列表缓存革新的实行器,默许最大线程数=2,线程名为:DiscoveryClient-CacheRefreshExecutor-%d
        cacheRefreshExecutor = new ThreadPoolExecutor(
                1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                        .setDaemon(true)
                        .build()
        );  // use direct handoff

        eurekaTransport = new EurekaTransport();
        // 初始化eurekaTransport在效劳注册,猎取效劳列表时的client
        scheduleServerEndpointTask(eurekaTransport, args);

        AzToRegionMapper azToRegionMapper;
        if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
            azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
        } else {
            azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
        }
        if (null != remoteRegionsToFetch.get()) {
            azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
        }
        instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
    } catch (Throwable e) {
        throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
    }

    // 若是需要从eureka server猎取效劳列表,而且实验fetchRegistry(false)失利,挪用BackupRegistry
    if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
        fetchRegistryFromBackup();
    }

    // 【重点】初始化一切准时义务
    initScheduledTasks();
    
    // 增加servo监控
    try {
        Monitors.registerObject(this);
    } catch (Throwable e) {
        logger.warn("Cannot register timers", e);
    }

    // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
    // to work with DI'd DiscoveryClient
    DiscoveryManager.getInstance().setDiscoveryClient(this);
    DiscoveryManager.getInstance().setEurekaClientConfig(config);

    initTimestampMs = System.currentTimeMillis();
    logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
            initTimestampMs, this.getApplications().size());
}

上面的DiscoveryClient组织要领代码比较多,但多半都是一些赋值,本次剖析的重点在解释中已标出,建立了种种Executor 和 eurekaTransport、instanceRegionChecker,以后又挪用initScheduledTasks()要领初始化一切这些准时义务


【重点】initScheduledTasks() 初始化准时义务

/**
 * Initializes all scheduled tasks.
 */
private void initScheduledTasks() {
    // 1、若是要从Eureka Server猎取效劳列表
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        
        // 从eureka效劳器猎取注册表信息的频次(默许30s)
        // 同时也是单次猎取效劳列表的超时时候
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        // 若是缓存革新超时,下一次实行的delay最大是registryFetchIntervalSeconds的几倍(默许10),默许每次实行是上一次的2倍
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        
        /**
         * 【#### 实行CacheRefreshThread,效劳列表缓存革新义务 ####】
         * 实行TimedSupervisorTask监视义务的准时器,详细实行器为cacheRefreshExecutor,义务为CacheRefreshThread
         */
        scheduler.schedule(
                new TimedSupervisorTask(
                        "cacheRefresh",               //监控名
                        scheduler,
                        cacheRefreshExecutor,
                        registryFetchIntervalSeconds, //指定详细义务的超时时候
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new CacheRefreshThread()
                ),
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }

    
    // 2、若是要注册到Eureka Server
    if (clientConfig.shouldRegisterWithEureka()) {
        // 续租的时候距离(默许30s)
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        // 若是心跳义务超时,下一次实行的delay最大是renewalIntervalInSecs的几倍(默许10),默许每次实行是上一次的2倍
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: "   "renew interval is: "   renewalIntervalInSecs);

        // Heartbeat timer
        /**
         * 【#### 实行HeartbeatThread,发送心跳数据 ####】
         * 实行TimedSupervisorTask监视义务的准时器,详细实行器为heartbeatExecutor,义务为HeartbeatThread
         */
        scheduler.schedule(
                new TimedSupervisorTask(
                        "heartbeat",
                        scheduler,
                        heartbeatExecutor,
                        renewalIntervalInSecs,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new HeartbeatThread()
                ),
                renewalIntervalInSecs, TimeUnit.SECONDS);

        // InstanceInfo replicator
        /** 
         * 【#### InstanceInfo复制器 ####】
         * 启动背景准时义务scheduler,线程名为 DiscoveryClient-InstanceInfoReplicator-%d
         * 默许每30s实行一次准时义务,检察Instance信息(DataCenterInfo、LeaseInfo、InstanceStatus)是不是有转变
         * 若是有转变,实行 discoveryClient.register()
         */
        instanceInfoReplicator = new InstanceInfoReplicator(
               this,            //以后DiscoveryClient
               instanceInfo,    //以后实例信息
               clientConfig.getInstanceInfoReplicationIntervalSeconds(),//InstanceInfo的复制距离(默许30s)
               2); // burstSize

        /**
         * 【StatusChangeListener 状况转变监听器】
         */
        statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
            @Override
            public String getId() {
                return "statusChangeListener";
            }

            @Override
            public void notify(StatusChangeEvent statusChangeEvent) {
                if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                        InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                    // log at warn level if DOWN was involved
                    logger.warn("Saw local status change event {}", statusChangeEvent);
                } else {
                    logger.info("Saw local status change event {}", statusChangeEvent);
                }
                
                //运用InstanceInfo复制器 scheduler.submit()一个Runnable义务
                //背景立时实行 discoveryClient.register()
                instanceInfoReplicator.onDemandUpdate();
            }
        };

        /**
         * 是不是存眷Instance状况转变,运用背景线程将状况同步到eureka server(默许true)
         * 挪用 ApplicationInfoManager#setInstanceStatus(status) 会触发
         * 将 StatusChangeListener 注册到 ApplicationInfoManager
         */
        if (clientConfig.shouldOnDemandUpdateStatusChange()) {
            applicationInfoManager.registerStatusChangeListener(statusChangeListener);
        }

        // 启动InstanceInfo复制器
        instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } 
    // 以后效劳实例不注册到Eureka Server
    else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

总的来讲initScheduledTasks()做了以下几件事:

  • 若是shouldFetchRegistry=true,即要从Eureka Server猎取效劳列表
    • 启动革新效劳列表准时线程(DiscoveryClient-CacheRefreshExecutor-%d),默许registryFetchIntervalSeconds=30s实行一次,义务为CacheRefreshThread,即从Eureka Server猎取效劳列表,也革新客户端缓存
  • 若是shouldRegisterWithEureka=true,即要注册到Eureka Server
    • 启动heartbeat心跳准时线程(DiscoveryClient-HeartbeatExecutor-%d),默许renewalIntervalInSecs=30s续约一次,义务为HeartbeatThread,即客户端向Eureka Server发送心跳
    • 启动InstanceInfo复制器准时线程(DiscoveryClient-InstanceInfoReplicator-%d),开启准时线程搜检以后Instance的DataCenterInfo、LeaseInfo、InstanceStatus,若是发明调换就实行discoveryClient.register(),将实例信息同步到Server端


Eureka Client复制InstanceInfo,提议注册

由建立DiscoveryClient的历程可知,建立了许多准时实行线程,如准时从Server端革新效劳列表的CacheRefreshThread,准时报心跳续约的HeartbeatThread,另有用于更新并复制当地实例状况到Server端的InstanceInfo复制器准时线程,而恰是InstanceInfoReplicator#run()中的discoveryClient.register()提议了注册

那末怎样能够触发注册行动呢?

// InstanceInfoReplicator#run()
public void run() {
    try {
        /**
         * 革新 InstanceInfo
         * 1、革新 DataCenterInfo
         * 2、革新 LeaseInfo 租约信息
         * 3、依据HealthCheckHandler猎取InstanceStatus,并更新,若是状况发作转变会触发一切StatusChangeListener
         */
        discoveryClient.refreshInstanceInfo();

        // 若是isInstanceInfoDirty=true,返回dirtyTimestamp,否则是null
        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        if (dirtyTimestamp != null) {
            discoveryClient.register();  //提议注册
            instanceInfo.unsetIsDirty(dirtyTimestamp);  //isInstanceInfoDirty置为false
        }
    } 
    catch (Throwable t) {
        logger.warn("There was a problem with the instance info replicator", t);
    } 
    finally { // 继承下次义务
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}

如上,先革新InstanceInfo,革新后若是发明有脏数据,即实例发作了调换,还未同步给Server的数据,就提议注册

那末在Eureka Client启动的这类场景下,怎样会触发有脏数据下的注册?

  • 由InstanceInfoReplicator复制器的自动准时义务在革新InstanceInfo时发明有脏数据,并更新
  • InstanceInfoReplicator复制器供应onDemandUpdate()按需更新要领,一旦挪用,立时会submit()义务,个中会cancel自动更新义务,立时实行InstanceInfoReplicator#run()


InstanceInfoReplicator复制器自动准时更新

InstanceInfoReplicator复制器在启动建立DiscoveryClient时被建立并start()启动

// InstanceInfoReplicator#start()
public void start(int initialDelayMs) { // 默许40s
    if (started.compareAndSet(false, true)) {
        instanceInfo.setIsDirty();  // for initial register 初始化时会将instanceInfo设置为dirty
        Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}

以是当自动更新启动时会设置InstanceInfo为脏数据,由于要触发第一次向Server同步,那末在40s后会挪用InstanceInfoReplicator#run(),假定InstanceInfo并没有别的调换,那末也会提议discoveryClient.register()

注重:

一般状况下是不会由耽误40s的第一次实行准时义务提议注册,而是下面的onDemandUpdate() 主动按需更新提议注册

若是设置@EnableDiscoveryClient(autoRegister = false) 或许 spring.cloud.service-registry.auto-registration.enabled=false,即摒弃自动注册,并在EurekaClientAutoConfiguration的以下要领打断点

@Bean
public DiscoveryClient discoveryClient(EurekaInstanceConfig config, EurekaClient client) {
  return new EurekaDiscoveryClient(config, client);
}

会在断点见效时触发EurekaClient的实例化,而此EurekaClient就是一个DiscoveryClient,会启动InstanceInfoReplicator自动准时更新线程,但由于new InstanceInfoFactory().create(config)时当地实例状况为STARTING,以是注册到Server端的状况也是STARTING

-玖富娱乐是一家为代理招商,直属主管信息发布为主的资讯网站,同时也兼顾玖富娱乐代理注册登录地址。-


onDemandUpdate() 主动按需更新

现在只要在ApplicationInfoManager#setInstanceStatus()更新实例状况,且实例状况真的发作调换,触发StatusChangeListener状况调换监听器时,会挪用onDemandUpdate立时submit义务实行InstanceInfoReplicator#run(),再提议注册

由于Spring Cloud默许是启用效劳自动注册AutoServiceRegistration的,以是在EurekaClientAutoConfiguration自动设置装备摆设时会注册效劳自动注册相干组件(EurekaRegistration、EurekaServiceRegistry、EurekaAutoServiceRegistration),个中EurekaAutoServiceRegistration完成了Spring的SmartLifecycle接口,会在Spring容器refresh要终了时触发作命周期要领start(),个中会运用EurekaServiceRegistry效劳注册器注册EurekaRegistration这个当地实例信息

// EurekaServiceRegistry#register()
public void register(EurekaRegistration reg) {
    maybeInitializeClient(reg);

    if (log.isInfoEnabled()) {
        log.info("Registering application "   reg.getInstanceConfig().getAppname()
                  " with eureka with status "
                  reg.getInstanceConfig().getInitialStatus());
    }

    // 设置初始化状况
    reg.getApplicationInfoManager()
            .setInstanceStatus(reg.getInstanceConfig().getInitialStatus());

    if (reg.getHealthCheckHandler() != null) {
        reg.getEurekaClient().registerHealthCheck(reg.getHealthCheckHandler());
    }
}

重如果设置初始化状况步调,而EurekaInstanceConfigBean当地实例信息的initialStatus初始化状况为 InstanceStatus.UP,以是状况与new InstanceInfo()时的STARTING分歧,发作了状况调换,触发在建立DiscoveryClient时设置的StatusChangeListener

...省略

statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
    @Override
    public String getId() {
        return "statusChangeListener";
    }

    @Override
    public void notify(StatusChangeEvent statusChangeEvent) {
        if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
            // log at warn level if DOWN was involved
            logger.warn("Saw local status change event {}", statusChangeEvent);
        } else {
            logger.info("Saw local status change event {}", statusChangeEvent);
        }
        instanceInfoReplicator.onDemandUpdate();
    }
};

if (clientConfig.shouldOnDemandUpdateStatusChange()) {
    applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}

...省略

个中会挪用 InstanceInfoReplicator.onDemandUpdate() 实例信息复制器做按需更新,立时将UP状况更新/注册到Server端

以是,以我推断,Eureka Client启动时的自动注册大多半应该是Spring Cloud的效劳自动注册机制,在Spring容器基础启动终了时,触发效劳自动注册操纵,个中会运用ApplicationInfoManager更新实例状况为初始状况UP,一旦实例状况调换会被立时监听到,实行复制器的InstanceInfoReplicator.onDemandUpdate()按需更新,立时实行一次discoveryClient.register()操纵

以是,下面就是剖析 discoveryClient.register() 是怎样注册效劳的


DiscoveryClient#register() 注册

// DiscoveryClient#register()
boolean register() throws Throwable {
    logger.info(PREFIX   appPathIdentifier   ": registering service...");
    EurekaHttpResponse<Void> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {
        logger.warn("{} - registration failed {}", PREFIX   appPathIdentifier, e.getMessage(), e);
        throw e;
    }
    if (logger.isInfoEnabled()) {
        logger.info("{} - registration status: {}", PREFIX   appPathIdentifier, httpResponse.getStatusCode());
    }
    return httpResponse.getStatusCode() == 204;
}

如上,注册要领运用eurekaTransport的注册客户端registrationClient挪用了register(instanceinfo)要领

EurekaTransportDiscoveryClient的内部类,个中包罗

  • registrationClient 和 registrationClientFactory: 卖力注册、续约相干事情的EurekaHttpClientEurekaHttpClientFactory的完成类
  • queryClient 和 queryClientFactory: 卖力猎取Server端效劳列表的EurekaHttpClientEurekaHttpClientFactory的完成类
  • TransportClientFactory: 卖力传输音讯的客户端工场(底层用于和Server交互的http框架是 Jersey,此处的工场就和Jersey相干)

那末EurekaTransport的相干组件,尤其是registrationClient注册客户端是怎样初始化的呢?


registrationClient - 效劳注册相干的EurekaHttpClient

初始化是在DiscoveryClient的组织要领中

eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);

重如果scheduleServerEndpointTask()要领

private void scheduleServerEndpointTask(EurekaTransport eurekaTransport,
                                        AbstractDiscoveryClientOptionalArgs args) {

    Collection<?> additionalFilters = args == null
            ? Collections.emptyList()
            : args.additionalFilters;

    EurekaJerseyClient providedJerseyClient = args == null
            ? null
            : args.eurekaJerseyClient;
    
    TransportClientFactories argsTransportClientFactories = null;
    if (args != null && args.getTransportClientFactories() != null) {
        argsTransportClientFactories = args.getTransportClientFactories();
    }
    
    // Ignore the raw types warnings since the client filter interface changed between jersey 1/2
    @SuppressWarnings("rawtypes")
    TransportClientFactories transportClientFactories = argsTransportClientFactories == null
            ? new Jersey1TransportClientFactories()
            : argsTransportClientFactories;

    // If the transport factory was not supplied with args, assume they are using jersey 1 for passivity
    // 1、参数中是不是供应了transportClientFactory的完成,没有就运用Jersey1TransportClientFactories
    eurekaTransport.transportClientFactory = providedJerseyClient == null
            ? transportClientFactories.newTransportClientFactory(clientConfig, additionalFilters, applicationInfoManager.getInfo())
            : transportClientFactories.newTransportClientFactory(additionalFilters, providedJerseyClient);

    ApplicationsResolver.ApplicationsSource applicationsSource = new ApplicationsResolver.ApplicationsSource() {
        @Override
        public Applications getApplications(int stalenessThreshold, TimeUnit timeUnit) {
            long thresholdInMs = TimeUnit.MILLISECONDS.convert(stalenessThreshold, timeUnit);
            long delay = getLastSuccessfulRegistryFetchTimePeriod();
            if (delay > thresholdInMs) {
                logger.info("Local registry is too stale for local lookup. Threshold:{}, actual:{}",
                        thresholdInMs, delay);
                return null;
            } else {
                return localRegionApps.get();
            }
        }
    };

    eurekaTransport.bootstrapResolver = EurekaHttpClients.newBootstrapResolver(
            clientConfig,
            transportConfig,
            eurekaTransport.transportClientFactory,
            applicationInfoManager.getInfo(),
            applicationsSource
    );

    /**
     * 是不是要想Eureka Server注册
     * 2、建立RegistrationClient用于注册的客户端及工场,并设置到eurekaTransport
     */ 
    if (clientConfig.shouldRegisterWithEureka()) {
        EurekaHttpClientFactory newRegistrationClientFactory = null;
        EurekaHttpClient newRegistrationClient = null;
        try {
            newRegistrationClientFactory = EurekaHttpClients.registrationClientFactory(
                    eurekaTransport.bootstrapResolver,
                    eurekaTransport.transportClientFactory,
                    transportConfig
            );
            newRegistrationClient = newRegistrationClientFactory.newClient();
        } catch (Exception e) {
            logger.warn("Transport initialization failure", e);
        }
        eurekaTransport.registrationClientFactory = newRegistrationClientFactory;
        eurekaTransport.registrationClient = newRegistrationClient;
    }

    // 
    /**
     * 是不是要从Server端猎取效劳列表
     * 3、建立QueryClient用于查询效劳列表的客户端及工场,并设置到eurekaTransport
     */ 
    // new method (resolve from primary servers for read)
    // Configure new transport layer (candidate for injecting in the future)
    if (clientConfig.shouldFetchRegistry()) {
        EurekaHttpClientFactory newQueryClientFactory = null;
        EurekaHttpClient newQueryClient = null;
        try {
            newQueryClientFactory = EurekaHttpClients.queryClientFactory(
                    eurekaTransport.bootstrapResolver,
                    eurekaTransport.transportClientFactory,
                    clientConfig,
                    transportConfig,
                    applicationInfoManager.getInfo(),
                    applicationsSource
            );
            newQueryClient = newQueryClientFactory.newClient();
        } catch (Exception e) {
            logger.warn("Transport initialization failure", e);
        }
        eurekaTransport.queryClientFactory = newQueryClientFactory;
        eurekaTransport.queryClient = newQueryClient;
    }
}

以是,下面就是逐层深切剖析RegistrationClient用于注册的客户端及工场是怎样建立的?

由于RegistrationClient实际上是一种EurekaHttpClient,而EurekaHttpClient是接口,其完成类许多

检察源码发明,Netflix接纳的是 Factory工场 署理 的形式,从最外层建立的EurekaHttpClient工场包罗一个成员变量是另外一个署理的EurekaHttpClient工场,每一个工场天生的EurekaHttpClient功用不一样,在从外层实行一个操纵时,最外层的工场实行其相干功用后,运用署理的工场新建EurekaHttpClient实例,再挪用其雷同的要领,也完成这个EurekaHttpClient的相干功用,就这样逐层深切,各司其职后,末了运用Jersey发送POST要求到Eureka Server提议注册,而这些EurekaHttpClient都是在com.netflix.discovery.shared.transport.decoratorEurekaHttpClient的包装类的包下的,由外到内大抵是:

  • SessionedEurekaHttpClient: 强迫在一准时候距离后重连EurekaHttpClient,防备永久只衔接特定Eureka Server,反过来包管了在Server端集群拓扑发作转变时的负载重分派
    • RetryableEurekaHttpClient: 带有重试功用,默许最多3次,在设置装备摆设的一切候选Server地点中实验要求,胜利重用,失利会重试另外一Server,并保护断绝清单,下次跳过,当断绝数目到达阈值,清空断绝清单,重新开始
      • RedirectingEurekaHttpClient: Server端返回302重定向时,客户端shutdown原EurekaHttpClient,依据response header中的Location新建EurekaHttpClient
        • MetricsCollectingEurekaHttpClient: 统计网络Metrics信息
          • JerseyApplicationClient: AbstractJerseyEurekaHttpClient的子类
            • AbstractJerseyEurekaHttpClient: 底层完成经由过程Jersery注册、发心跳等的中心类
              • jerseyClient: Jersery客户端


SessionedEurekaHttpClient - 准时重连

// SessionedEurekaHttpClient#execute()
@Override
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
    long now = System.currentTimeMillis();
    long delay = now - lastReconnectTimeStamp;
    // 若是上次重连时候到现在已超过了currentSessionDurationMs,封闭以后EurekaHttpClient
    if (delay >= currentSessionDurationMs) {
        logger.debug("Ending a session and starting anew");
        lastReconnectTimeStamp = now;
        currentSessionDurationMs = randomizeSessionDuration(sessionDurationMs);
        TransportUtils.shutdown(eurekaHttpClientRef.getAndSet(null));
    }

    // 若是EurekaHttpClient为空,clientFactory.newClient()重修
    EurekaHttpClient eurekaHttpClient = eurekaHttpClientRef.get();
    if (eurekaHttpClient == null) {
        eurekaHttpClient = TransportUtils.getOrSetAnotherClient(eurekaHttpClientRef, clientFactory.newClient());
    }
    
    // 继承实行后续
    return requestExecutor.execute(eurekaHttpClient);
}


RetryableEurekaHttpClient - 候选范围内失利重试

// RetryableEurekaHttpClient#execute()
@Override
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
    List<EurekaEndpoint> candidateHosts = null;
    int endpointIdx = 0;
    
    // 最多重试numberOfRetries(默许:3)
    for (int retry = 0; retry < numberOfRetries; retry  ) {
        EurekaHttpClient currentHttpClient = delegate.get();//从AtomicReference<EurekaHttpClient>猎取以后EurekaHttpClient
        EurekaEndpoint currentEndpoint = null;
        if (currentHttpClient == null) {
            if (candidateHosts == null) {
                candidateHosts = getHostCandidates(); //返回候选鸠合 消除 已失利断绝的Host鸠合
                if (candidateHosts.isEmpty()) {
                    throw new TransportException("There is no known eureka server; cluster server list is empty");
                }
            }
            if (endpointIdx >= candidateHosts.size()) {
                throw new TransportException("Cannot execute request on any known server");
            }

            // 依据以后的下标猎取Endpoint,并新建 JerseyClient
            currentEndpoint = candidateHosts.get(endpointIdx  );
            currentHttpClient = clientFactory.newClient(currentEndpoint);
        }

        try {
            // 继承后续实行
            EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
            // 若是依据以后操纵范例 和 返回状况码,知足状况盘算器,纪录currentHttpClient可用,下次继承运用
            // 返回状况码是:200、300、302,或许Register、SendHeartBeat状况下是404
            if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
                delegate.set(currentHttpClient);
                if (retry > 0) {
                    logger.info("Request execution succeeded on retry #{}", retry);
                }
                return response;
            }
            logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());
        } 
        catch (Exception e) {
            logger.warn("Request execution failed with message: {}", e.getMessage());  // just log message as the underlying client should log the stacktrace
        }

        // Connection error or 5xx from the server that must be retried on another server
        // 要求失利 或 报5xx毛病,将delegate清空,重试另外一个Server,并将以后Endpoint放到断绝鸠合
        delegate.compareAndSet(currentHttpClient, null);
        if (currentEndpoint != null) {
            quarantineSet.add(currentEndpoint);
        }
    }
    
    // 屡次重试后仍没法胜利返回效果,上抛非常
    throw new TransportException("Retry limit reached; giving up on completing the request");
}


//########## RetryableEurekaHttpClient#getHostCandidates() 
// 返回 一切候选的Host节点数据 与 断绝鸠合 的数据差集
private List<EurekaEndpoint> getHostCandidates() {
    List<EurekaEndpoint> candidateHosts = clusterResolver.getClusterEndpoints(); //一切候选节点数据
    quarantineSet.retainAll(candidateHosts); //确保quarantineSet断绝鸠合中的数据都在candidateHosts中
                                             //当candidateHosts发作转变时也能实时清算quarantineSet断绝鸠合

    // If enough hosts are bad, we have no choice but start over again
    // 默许:0.66百分比
    int threshold = (int) (candidateHosts.size() * transportConfig.getRetryableClientQuarantineRefreshPercentage());
    
    // 断绝鸠合为空
    if (quarantineSet.isEmpty()) {
        // no-op
    } 
    // 断绝数据已大于阀值,不得已要重新开始,清空断绝鸠合
    else if (quarantineSet.size() >= threshold) { 
        logger.debug("Clearing quarantined list of size {}", quarantineSet.size());
        quarantineSet.clear();
    } 
    // 断绝鸠合不为空,也不大于阀值,消除断绝鸠合中的Endpoint后返回
    else { 
        List<EurekaEndpoint> remainingHosts = new ArrayList<>(candidateHosts.size());
        for (EurekaEndpoint endpoint : candidateHosts) {
            if (!quarantineSet.contains(endpoint)) {
                remainingHosts.add(endpoint);
            }
        }
        candidateHosts = remainingHosts;
    }

    return candidateHosts;
}


//########## ServerStatusEvaluators#LEGACY_EVALUATOR
// Eureka Server返回状况的盘算器,盘算分歧场景下的分歧状况码是不是代表胜利
private static final ServerStatusEvaluator LEGACY_EVALUATOR = new ServerStatusEvaluator() {
    @Override
    public boolean accept(int statusCode, RequestType requestType) {
        if (statusCode >= 200 && statusCode < 300 || statusCode == 302) {
            return true;
        } else if (requestType == RequestType.Register && statusCode == 404) {
            return true;
        } else if (requestType == RequestType.SendHeartBeat && statusCode == 404) {
            return true;
        } else if (requestType == RequestType.Cancel) {  // cancel is best effort
            return true;
        } else if (requestType == RequestType.GetDelta && (statusCode == 403 || statusCode == 404)) {
            return true;
        }
        return false;
    }
};


RedirectingEurekaHttpClient - 按Server端要求重定向到新Server

//########## RedirectingEurekaHttpClient#executeOnNewServer
// Server端返回302重定向时,客户端shutdown原EurekaHttpClient,依据response header中的Location新建EurekaHttpClient
private <R> EurekaHttpResponse<R> executeOnNewServer(RequestExecutor<R> requestExecutor,
                                                AtomicReference<EurekaHttpClient> currentHttpClientRef) {
    URI targetUrl = null;
    // 最多重定向默许10次
    for (int followRedirectCount = 0; followRedirectCount < MAX_FOLLOWED_REDIRECTS; followRedirectCount  ) {
        EurekaHttpResponse<R> httpResponse = requestExecutor.execute(currentHttpClientRef.get());
        
        // 若是返回的不是302重定向,返回response
        if (httpResponse.getStatusCode() != 302) {
            if (followRedirectCount == 0) {
                logger.debug("Pinning to endpoint {}", targetUrl);
            } else {
                logger.info("Pinning to endpoint {}, after {} redirect(s)", targetUrl, followRedirectCount);
            }
            return httpResponse;
        }

        // 从response中猎取Location,用于重修EurekaHttpClient
        targetUrl = getRedirectBaseUri(httpResponse.getLocation());
        if (targetUrl == null) {
            throw new TransportException("Invalid redirect URL "   httpResponse.getLocation());
        }

        currentHttpClientRef.getAndSet(null).shutdown();
        currentHttpClientRef.set(factory.newClient(new DefaultEndpoint(targetUrl.toString())));
    }
    String message = "Follow redirect limit crossed for URI "   serviceEndpoint.getServiceUrl();
    logger.warn(message);
    throw new TransportException(message);
}


MetricsCollectingEurekaHttpClient - 统计网络实行状况

// MetricsCollectingEurekaHttpClient#execute()
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
    EurekaHttpClientRequestMetrics requestMetrics = metricsByRequestType.get(requestExecutor.getRequestType());
    Stopwatch stopwatch = requestMetrics.latencyTimer.start(); //统计实行延时
    try {
        EurekaHttpResponse<R> httpResponse = requestExecutor.execute(delegate);
        requestMetrics.countersByStatus.get(mappedStatus(httpResponse)).increment(); //按状况统计
        return httpResponse;
    } catch (Exception e) {
        requestMetrics.connectionErrors.increment(); //统计毛病
        exceptionsMetric.count(e); //按非常名统计
        throw e;
    } finally {
        stopwatch.stop();
    }
}


AbstractJerseyEurekaHttpClient - 底层经由过程Jersery发送注册、心跳要求

public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient {
    protected final Client jerseyClient; //真正处置惩罚要求的Jersery客户端
    protected final String serviceUrl; //衔接的Server端地点
    
    protected AbstractJerseyEurekaHttpClient(Client jerseyClient, String serviceUrl) {
        this.jerseyClient = jerseyClient;
        this.serviceUrl = serviceUrl;
        logger.debug("Created client for url: {}", serviceUrl);
    }
    
    /**
     * 注册要领
     */
    @Override
    public EurekaHttpResponse<Void> register(InstanceInfo info) {
        String urlPath = "apps/"   info.getAppName(); //要求Eureka Server的【/apps/运用名】接口地点
        ClientResponse response = null;
        try {
            Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
            addExtraHeaders(resourceBuilder);
            response = resourceBuilder
                    .header("Accept-Encoding", "gzip")
                    .type(MediaType.APPLICATION_JSON_TYPE)
                    .accept(MediaType.APPLICATION_JSON)
                    .post(ClientResponse.class, info);  //实例InstanceInfo数据,经由过程Post要求body发过去
            return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
        } 
        finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                        response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
                response.close();
            }
        }
    }
    
    ...省略


Eureka Server接收到的注册要求概况

经由上面的步调,客户端已能够经由过程Jersery发送Http要求给Eureka Server端注册,详细要求以下:

POST /eureka/apps/运用名 HTTP/1.1
Accept-Encoding: gzip
Content-Type: application/json
Accept: application/json
DiscoveryIdentity-Name: DefaultClient
DiscoveryIdentity-Version: 1.4
DiscoveryIdentity-Id: 192.168.70.132
Transfer-Encoding: chunked
Host: localhost:8001
Connection: Keep-Alive
User-Agent: Java-EurekaClient/v1.6.2

1a0
{"instance":{
​ "instanceId":"192.168.70.132:运用名:10001",
​ "hostName":"192.168.70.132",
​ "app":"运用名",
​ "ipAddr":"192.168.70.132",
​ "status":"UP",
​ "overriddenstatus":"UNKNOWN",
​ "port": { "(":10001, "@enabled" : "true" }, ​ "securePort": { ")":443, "@enabled" : "false"},
​ "countryId":1,
​ "dataCenterInfo":{"@class":"com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo",
​ "name":"MyOwn"
}


Eureka Client注册流程总结

大致来讲,Eureka Client的注册是由Spring Cloud的AutoServiceRegistration自动注册提议,在设置运用实例Instance初始状况为UP时,触发了InstanceInfoReplicator#onDemandUpdate()按需更新要领,将实例Instance信息经由过程DiscoveryClient注册到Eureka Server,时期经由了一些EurekaHttpClient的装潢类,完成了诸如按期重连失利重试注册重定向、统计网络Metrics信息等功用,末了由JerseryClient发送POST要求挪用Eureka Server的【/eureka/apps/运用名】端点,要求体照顾InstanceInfo实例信息,完成注册


  • EurekaAutoServiceRegistration#start(): 完成Spring的SmartLifecycle,在Spring容器refresh()末了一步finishRefresh()会挪用生命周期的start()要领

    • EurekaServiceRegistry#register(EurekaRegistration): 运用效劳注册器注册效劳信息

      • ApplicationInfoManager#setInstanceStatus(初始状况): 运用实例信息治理器更新初始状况为 UP

        • StatusChangeListener: 触发实例状况监听(此Listener是在DiscoveryClient#initScheduledTasks()要领中设置的)

          • InstanceInfoReplicator.onDemandUpdate(): 实例状况复制器实行按需状况更新

            • DiscoveryClient#register(): DiscoveryClient提议注册实例信息

              • EurekaHttpClientDecorator#execute(): 实行EurekaHttpClient的装潢类,完成其各自功用

                SessionedEurekaHttpClient: 准时重连

                RetryableEurekaHttpClient: 候选范围内失利重试

                RedirectingEurekaHttpClient: 按Eureka Server端要求重定向到新Server

                MetricsCollectingEurekaHttpClient: 统计网络实行状况

                • JerseyApplicationClient#register(): 封装注册要求数据
                  • JerseyClient发送Post注册要求
-玖富娱乐是一家为代理招商,直属主管信息发布为主的资讯网站,同时也兼顾玖富娱乐代理注册登录地址。