Nacos1.x服务注册实现原理

本文讲的是v1.x版本的nacos服务注册原理,2.0改成grpc调用了,本文用的是2.0的版本,深入剖析一下nacos在注册时一些巧妙的设计和数据结构的选择。

源码流程

1.任务添加到阻塞队列

该接口是服务端接受注册的接口,服务注册就是通过该接口来实现的。

	 /**
     * Register new instance.
     *
     * @param request http request
     * @return 'ok' if success
     * @throws Exception any error during register
     */
    @CanDistro
    @PostMapping
    @Secured(action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {
        // 拿到namespaceId
        final String namespaceId = WebUtils
                .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        // 拿到serviceName
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);
        // 创建instance实例,用于注册
        final Instance instance = HttpRequestInstanceBuilder.newBuilder()
                .setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();
        // core 注册。
        getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }

1.x版本选择第二个实现类

在这里插入图片描述

进入到ServiceManager

  /**
     * Register an instance to a service in AP mode.
     *
     * <p>This method creates service or cluster silently if they don't exist.
     *
     * @param namespaceId id of namespace
     * @param serviceName service name
     * @param instance    instance to register
     * @throws Exception any error occurred in the process
     */
    public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
        // 创建服务或者集群,如果他们暂时未存在的情况下
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());
        Service service = getService(namespaceId, serviceName);
        checkServiceIsNull(service, namespaceId, serviceName);
        // 核心逻辑
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }

添加instance到服务中

   /**
     * Add instance to service.
     *
     * @param namespaceId namespace
     * @param serviceName service name
     * @param ephemeral   whether instance is ephemeral
     * @param ips         instances
     * @throws NacosException nacos exception
     */
    public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
            throws NacosException {
        
        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
        Service service = getService(namespaceId, serviceName);
        synchronized (service) {
        	// 将服务实例比对,确定要更新、删除的instance
            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
        	// 封装成instances对象
            Instances instances = new Instances();
            instances.setInstanceList(instanceList);
            // 核心 放进注册表中
            consistencyService.put(key, instances);
        }
    }

进入DistroConsistencyServiceImpl查看put方法

    @Override
    public void put(String key, Record value) throws NacosException {
    	// 核心onPut方法
        onPut(key, value);
        // If upgrade to 2.0.X, do not sync for v1.
        if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
            return;
        }
        distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
                DistroConfig.getInstance().getSyncDelayMillis());
    }


 	/**
     * Put a new record.
     *
     * @param key   key of record
     * @param value record
     */
    public void onPut(String key, Record value) {
 
        if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
            Datum<Instances> datum = new Datum<>();
            datum.value = (Instances) value;
            datum.key = key;
            datum.timestamp.incrementAndGet();
            dataStore.put(key, datum);
        }
        
        if (!listeners.containsKey(key)) {
            return;
        }
        // 核心 添加任务到队列中。
        notifier.addTask(key, DataOperation.CHANGE);
    }
    

看完上面的流程,肯定还是很疑问,最终任务被放置到了阻塞队列内,那么阻塞队列是如何完成注册操作?

2.异步线程注册

  public class Notifier implements Runnable {
        
        private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
        
        private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
        
        /**
         * Add new notify task to queue.
         *
         * @param datumKey data key
         * @param action   action for data
         */
        public void addTask(String datumKey, DataOperation action) {
            
            if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
                return;
            }
            if (action == DataOperation.CHANGE) {
                services.put(datumKey, StringUtils.EMPTY);
            }
            tasks.offer(Pair.with(datumKey, action));
        }
        
        public int getTaskSize() {
            return tasks.size();
        }
        
        @Override
        public void run() {
            Loggers.DISTRO.info("distro notifier started");
            
            for (; ; ) {
                try {
                    Pair<String, DataOperation> pair = tasks.take();
                    handle(pair);
                } catch (Throwable e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
                }
            }
        }
        
        private void handle(Pair<String, DataOperation> pair) {
            try {
                String datumKey = pair.getValue0();
                DataOperation action = pair.getValue1();
                
                services.remove(datumKey);
                
                int count = 0;
                
                if (!listeners.containsKey(datumKey)) {
                    return;
                }
                
                for (RecordListener listener : listeners.get(datumKey)) {
                    
                    count++;
                    
                    try {
                        if (action == DataOperation.CHANGE) {
                            listener.onChange(datumKey, dataStore.get(datumKey).value);
                            continue;
                        }
                        
                        if (action == DataOperation.DELETE) {
                            listener.onDelete(datumKey);
                            continue;
                        }
                    } catch (Throwable e) {
                        Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
                    }
                }
                
                if (Loggers.DISTRO.isDebugEnabled()) {
                    Loggers.DISTRO
                            .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
                                    datumKey, count, action.name());
                }
            } catch (Throwable e) {
                Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
            }
        }
    }

先仔细看看,上一节最后添加的是什么类。

notifier.addTask(key, DataOperation.CHANGE);

可以得出几个特性:

  • 本质是一个runnable线程。
  • addTask方法往BlockingQueue中放任务。
  • 线程异步运行时,会拿出任务,交由handle处理。

进入Server::onchange方法

    @Override
    public void onChange(String key, Instances value) throws Exception {
        
        Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
        
        for (Instance instance : value.getInstanceList()) {
            
            if (instance == null) {
                // Reject this abnormal instance list:
                throw new RuntimeException("got null instance " + key);
            }
            
            if (instance.getWeight() > 10000.0D) {
                instance.setWeight(10000.0D);
            }
            
            if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
                instance.setWeight(0.01D);
            }
        }
        
        updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
        
        recalculateChecksum();
    }

   /**
     * 更新注册表
     *
     * @param instances instances
     * @param ephemeral whether is ephemeral instance
     */
    public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
        Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
        for (String clusterName : clusterMap.keySet()) {
            ipMap.put(clusterName, new ArrayList<>());
        }
        
        for (Instance instance : instances) {
            try {
                if (instance == null) {
                    Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
                    continue;
                }
                
                if (StringUtils.isEmpty(instance.getClusterName())) {
                    instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
                }
                
                if (!clusterMap.containsKey(instance.getClusterName())) {
                    Loggers.SRV_LOG.warn(
                            "cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                            instance.getClusterName(), instance.toJson());
                    Cluster cluster = new Cluster(instance.getClusterName(), this);
                    cluster.init();
                    getClusterMap().put(instance.getClusterName(), cluster);
                }
                
                List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
                if (clusterIPs == null) {
                    clusterIPs = new LinkedList<>();
                    ipMap.put(instance.getClusterName(), clusterIPs);
                }
                
                clusterIPs.add(instance);
            } catch (Exception e) {
                Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
            }
        }
        
        for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
            //make every ip mine
            List<Instance> entryIPs = entry.getValue();
            // 核心逻辑 通过Server::updateIPs -> 调用Cluster::updateIPs
            clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
        }
        
        setLastModifiedMillis(System.currentTimeMillis());
        getPushService().serviceChanged(this);
        ApplicationUtils.getBean(DoubleWriteEventListener.class).doubleWriteToV2(this, ephemeral);
        StringBuilder stringBuilder = new StringBuilder();
        
        for (Instance instance : allIPs()) {
         	stringBuilder.append(instance.toIpAddr()).append('_').append(instance.isHealthy()).append(',');
        }
        
        Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
                stringBuilder.toString());
        
    }

终于最后我们定位到了Cluster::updateIps中,更新注册表的核心就是更新cluster中的set集合,set集合就是所有的服务实例

    /**
     * Update instance list.
     *
     * @param ips       instance list
     * @param ephemeral whether these instances are ephemeral
     */
    public void updateIps(List<Instance> ips, boolean ephemeral) {
        // 复制了一份出来,此次用了CopyOnwrite的思想
        Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
        
        HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
        
        for (Instance ip : toUpdateInstances) {
            oldIpMap.put(ip.getDatumKey(), ip);
        }
        
        List<Instance> updatedIps = updatedIps(ips, oldIpMap.values());
        if (updatedIps.size() > 0) {
            for (Instance ip : updatedIps) {
                Instance oldIP = oldIpMap.get(ip.getDatumKey());
                
                // do not update the ip validation status of updated ips
                // because the checker has the most precise result
                // Only when ip is not marked, don't we update the health status of IP:
                if (!ip.isMarked()) {
                    ip.setHealthy(oldIP.isHealthy());
                }
                
                if (ip.isHealthy() != oldIP.isHealthy()) {
                    // ip validation status updated
                    Loggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(),
                            (ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName());
                }
                
                if (ip.getWeight() != oldIP.getWeight()) {
                    // ip validation status updated
                    Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP, ip);
                }
            }
        }
        
        List<Instance> newIPs = subtract(ips, oldIpMap.values());
        if (newIPs.size() > 0) {
            Loggers.EVT_LOG
                    .info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(),
                            getName(), newIPs.size(), newIPs);
            
            for (Instance ip : newIPs) {
                HealthCheckStatus.reset(ip);
            }
        }
        
        List<Instance> deadIPs = subtract(oldIpMap.values(), ips);
        
        if (deadIPs.size() > 0) {
            Loggers.EVT_LOG
                    .info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(),
                            getName(), deadIPs.size(), deadIPs);
            
            for (Instance ip : deadIPs) {
                HealthCheckStatus.remv(ip);
            }
        }
        
        toUpdateInstances = new HashSet<>(ips);
        
        if (ephemeral) {
            ephemeralInstances = toUpdateInstances;
        } else {
            persistentInstances = toUpdateInstances;
        }
    }

通过上面代码可以得出,异步线程在阻塞队列中取,然后在Cluster中用了COW去进行注册表的更新。

设计思想

1.用Blockingqueue异步,消息积压如何解决?

消息积压会导致延迟感知,但影响不大,只是导致服务注册变慢了,牺牲一定的时间,支持高并发注册,是值得的。同时我们服务启动分批,不会导致同时几千上万台同时注册。造成严重服务积压,所以单线程很好的解决数据安全问题。

2.后台notifier线程什么时候启动的?

在DistroConsistencyServiceImpl初始化的时候会调用

  
    @PostConstruct
    public void init() {
    	// 提交到线程池
        GlobalExecutor.submitDistroNotifyTask(notifier);
    }

总结

本文归纳了Nacos 1.x下的服务端注册机制,设计的很巧妙,异步+阻塞队列的方式来支持高并发,用服务端单线程来避免线程不安全的情况,很巧妙地解决了注册难点。

end
  • 作者:Endwas(联系作者)
  • 发表时间:2022-05-27 10:35
  • 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)
  • 转载声明:如果是转博主转载的文章,请附上原文链接
  • 公众号转载:请在文末添加作者名字和博客地址
  • 评论