Java学习

  • 首页
  • 文章归档
  • 默认分类
  • 关于页面

  • 搜索
CAP 分布式 计算机网络 MySQL 源码 备份 Redis

长轮询与Nacos

发表于 2021-10-27 | 分类于 默认分类 | 0 | 阅读次数 491

为了兼容只允许HTTP协议的客户,对长轮询的方式进行调研。

1、Servlet 3.0的AsyncContext

一般情况下,Servlet都是用一个线程对应一次请求,从接收请求到逻辑处理再到做出响应都是由一个线程负责,其中如果出现耗时的操作(如IO),那么线程只能阻塞等待。在这种情况下,线程无法及时释放回收至HTTP线程池,当并发量变大时,会带来性能问题。所以在Servlet 3.0引入了异步处理。以下是实例代码:

@WebServlet(urlPatterns={"/asyncservlet"}, asyncSupported=true)
public class AsyncServlet extends HttpServlet {
   /* ... Same variables and init method as in SyncServlet ... */

   @Override
   public void doGet(HttpServletRequest request, 
                     HttpServletResponse response) {
      response.setContentType("text/html;charset=UTF-8");
      // 通过HttpServletRequest开启异步
      final AsyncContext acontext = request.startAsync();
      acontext.start(new Runnable() {
         public void run() {
            String param = acontext.getRequest().getParameter("param");
            String result = resource.process(param);
            HttpServletResponse response = acontext.getResponse();
            /* ... print to the response ... */
            // 通知Servlet容器请求处理完成,做出响应。
            acontext.complete();
            }
      });
   }
}

此处示例中,通过start方法向Servlet容器申请新的线程处理业务逻辑,实际上Servlet3.0并未对异步任务实现做出规定,不同的容器可以有不同实现。除此之外,还可以使用自定义的线程完成异步任务:

@WebServlet(urlPatterns={"/asyncservlet"}, asyncSupported=true)
public class AsyncServlet extends HttpServlet {
   /* ... Same variables and init method as in SyncServlet ... */

   @Override
   public void doGet(HttpServletRequest request, 
                     HttpServletResponse response) {
      response.setContentType("text/html;charset=UTF-8");
      // 通过HttpServletRequest开启异步
      final AsyncContext acontext = request.startAsync();
      Runnable runnable = new Runnable() {
         public void run() {
            String param = acontext.getRequest().getParameter("param");
            String result = resource.process(param);
            HttpServletResponse response = acontext.getResponse();
            /* ... print to the response ... */
            // 通知Servlet容器请求处理完成,做出响应。
            acontext.complete();
            }
      };
      // 自定义线程执行异步操作
      new Thread(runnable).start();
   }
}

当然,一般情况下我们都是自定义线程池来执行任务,这种直接创建线程的方式只是作为示例。

注意:只有使用AsyncContext才能够达到上面所讲的效果,如果直接new Thread()或者类似的方式的,HTTP Thread并不会归还到容器,响应的过程依旧是由原来的HTTP Thread执行。

 @PostMapping("/async-context")
    public void asyncContext(HttpServletRequest request,
                             HttpServletResponse response) throws IOException {
        request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
        log.info("Before startAsync.");
        final AsyncContext asyncContext = request.startAsync();
        log.info("After startAsync.");
        asyncContext.start(() -> {
            try {
                response.getWriter().write("Async back!");
                Thread.sleep(3000L);
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
            log.info("Before complete!");
            asyncContext.complete();
            log.info("After complete!");
        });
        log.info("After start!");
    }

    @PostMapping("/direct-thread")
    public void directThread(HttpServletRequest request,
                             HttpServletResponse response) throws IOException {
        log.info("After startAsync.");
        final Runnable runnable = () -> {
            try {
                log.info("direct-thread start!");
                response.getWriter().write("Direct Thread back!");
                Thread.sleep(3000L);
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
            log.info("direct-thread end!");
        };
        log.info("After start!");
        new Thread(runnable).start();
    }

如上有两个基于SpringBoot的接口,请求async-context接口,由于使用了AsyncContext并且在休眠后才调用complete()方法,所以客户端需要等待大概3秒获得响应;而请求direct-thread接口时,调用response.getWriter().write()后,客户端会立刻得到响应,自行创建的线程依然会休眠3秒。

2、Nacos1.X Server配置变化与长轮询

在Nacos(Server版本1.4.2)中,服务端长轮询就是用了AsyncContext。目前网上有许多讲述Nacos长轮询原理的文章,这里就不再重复讲述。以下列出一些文章:

https://blog.csdn.net/Zong_0915/article/details/113089265

但是绝大部分文章都有一个问题没有讲清楚:服务端在长轮询超时之前,如果有对应的配置变化,可以及时地向客户端响应,而不必等到长轮询timeout。关于这一功能的实现,Nacos确实是使用了事件发布与监听(观察者模式)实现的。但是配置变更时发布的事件和长轮询监听的事件是不同的,二者也不是父类子类的关系,长轮询的订阅者Subscriber也没有直接订阅配置变更的事件。换句话说,通过/publis接口发布配置锁触发的事件,是通过其它方式又触发了长轮询关注的事件,而不是像许多网上文章所说,将二者混为一谈,简单错误地认为这两个事件是等同的。这里,将配置变更的ConfigDataChangeEvent与LocalDataChangeEvent之间的关系详细说明一下。

2.1、服务端变更配置

com.alibaba.nacos.config.server.controller#publishConfig()
    /**
     * Adds or updates non-aggregated data.
     *
     * @throws NacosException NacosException.
     */
    @PostMapping
    @Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
    public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
            @RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,
            @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
            @RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,
            @RequestParam(value = "appName", required = false) String appName,
            @RequestParam(value = "src_user", required = false) String srcUser,
            @RequestParam(value = "config_tags", required = false) String configTags,
            @RequestParam(value = "desc", required = false) String desc,
            @RequestParam(value = "use", required = false) String use,
            @RequestParam(value = "effect", required = false) String effect,
            @RequestParam(value = "type", required = false) String type,
            @RequestParam(value = "schema", required = false) String schema) throws NacosException {
        
        final String srcIp = RequestUtil.getRemoteIp(request);
        final String requestIpApp = RequestUtil.getAppName(request);
        srcUser = RequestUtil.getSrcUserName(request);
        //check type
        if (!ConfigType.isValidType(type)) {
            type = ConfigType.getDefaultType().getType();
        }
        // check tenant
        ParamUtils.checkTenant(tenant);
        ParamUtils.checkParam(dataId, group, "datumId", content);
        ParamUtils.checkParam(tag);
        Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(10);
        MapUtils.putIfValNoNull(configAdvanceInfo, "config_tags", configTags);
        MapUtils.putIfValNoNull(configAdvanceInfo, "desc", desc);
        MapUtils.putIfValNoNull(configAdvanceInfo, "use", use);
        MapUtils.putIfValNoNull(configAdvanceInfo, "effect", effect);
        MapUtils.putIfValNoNull(configAdvanceInfo, "type", type);
        MapUtils.putIfValNoNull(configAdvanceInfo, "schema", schema);
        ParamUtils.checkParam(configAdvanceInfo);
        
        if (AggrWhitelist.isAggrDataId(dataId)) {
            LOGGER.warn("[aggr-conflict] {} attempt to publish single data, {}, {}", RequestUtil.getRemoteIp(request),
                    dataId, group);
            throw new NacosException(NacosException.NO_RIGHT, "dataId:" + dataId + " is aggr");
        }
        
        final Timestamp time = TimeUtils.getCurrentTime();
        String betaIps = request.getHeader("betaIps");
        ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
        configInfo.setType(type);
        if (StringUtils.isBlank(betaIps)) {
            if (StringUtils.isBlank(tag)) {
                persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true);
              // 发布了一个ConfigDataChangeEvent事件,下面的其它分支也会发布该事件
                ConfigChangePublisher
                        .notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
            } else {
                persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true);
                ConfigChangePublisher.notifyConfigChange(
                        new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
            }
        } else {
            // beta publish
            persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, true);
            ConfigChangePublisher
                    .notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
        }
        ConfigTraceService
                .logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), InetUtils.getSelfIP(),
                        ConfigTraceService.PERSISTENCE_EVENT_PUB, content);
        return true;
    }

2.2、ConfigDataChangeEvent事件监听

该事件的监听器的注册在

com.alibaba.nacos.config.server.service.notify.AsyncNotifyService#AsyncNotifyService()

具体逻辑:

    @Autowired
    public AsyncNotifyService(ServerMemberManager memberManager) {
        this.memberManager = memberManager;
        
        // Register ConfigDataChangeEvent to NotifyCenter.
        NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);
        
        // Register A Subscriber to subscribe ConfigDataChangeEvent.
        NotifyCenter.registerSubscriber(new Subscriber() {
            
            @Override
            public void onEvent(Event event) {
                // Generate ConfigDataChangeEvent concurrently
                if (event instanceof ConfigDataChangeEvent) {
                    ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
                    long dumpTs = evt.lastModifiedTs;
                    String dataId = evt.dataId;
                    String group = evt.group;
                    String tenant = evt.tenant;
                    String tag = evt.tag;
                  	// 集群所有节点
                    Collection<Member> ipList = memberManager.allMembers();
                    
                    // In fact, any type of queue here can be
                    Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
                  	// 将所有节点添加至通知队列中
                    for (Member member : ipList) {
                        queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
                                evt.isBeta));
                    }
                  	// 包装了一个异步任务,通知每一个节点
                    ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue));
                }
            }
            
            @Override
            public Class<? extends Event> subscribeType() {
                return ConfigDataChangeEvent.class;
            }
        });
    }

接下来主要看ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue));中的AsyncTask,它是一个内部类:

    class AsyncTask implements Runnable {
        
        private Queue<NotifySingleTask> queue;
    
        private NacosAsyncRestTemplate restTemplate;
        
        public AsyncTask(NacosAsyncRestTemplate restTemplate, Queue<NotifySingleTask> queue) {
            this.restTemplate = restTemplate;
            this.queue = queue;
        }
        
        @Override
        public void run() {
            executeAsyncInvoke();
        }
        
        private void executeAsyncInvoke() {
            while (!queue.isEmpty()) {
                NotifySingleTask task = queue.poll();
                String targetIp = task.getTargetIP();
                if (memberManager.hasMember(targetIp)) {
                    // start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify 节点的健康检查
                    boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp);
                    if (unHealthNeedDelay) {
                        // target ip is unhealthy, then put it in the notification list
                        ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
                                task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,
                                0, task.target);
                        // get delay time and set fail count to the task
                        asyncTaskExecute(task);
                    } else {
                        Header header = Header.newInstance();
                        header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified()));
                        header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP());
                        if (task.isBeta) {
                            header.addParam("isBeta", "true");
                        }
                        AuthHeaderUtil.addIdentityToHeader(header);
                      	// 这里使用restTemplate进行远程调用
                        restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task));
                    }
                }
            }
        }
    }
    
    private void asyncTaskExecute(NotifySingleTask task) {
        int delay = getDelayTime(task);
        Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
        queue.add(task);
        AsyncTask asyncTask = new AsyncTask(nacosAsyncRestTemplate, queue);
        ConfigExecutor.scheduleAsyncNotify(asyncTask, delay, TimeUnit.MILLISECONDS);
    }

这里要看一下restTemplate到底调用了哪个接口,NotifySingleTask的定义中给出了接口地址:

static class NotifySingleTask extends NotifyTask {
        
        private String target;
        
        public String url;
        
        private boolean isBeta;
        
        private static final String URL_PATTERN =
                "http://{0}{1}" + Constants.COMMUNICATION_CONTROLLER_PATH + "/dataChange" + "?dataId={2}&group={3}";
        
        private static final String URL_PATTERN_TENANT =
                "http://{0}{1}" + Constants.COMMUNICATION_CONTROLLER_PATH + "/dataChange"
                        + "?dataId={2}&group={3}&tenant={4}";
        
        private int failCount;
        
        public NotifySingleTask(String dataId, String group, String tenant, long lastModified, String target) {
            this(dataId, group, tenant, lastModified, target, false);
        }
        
        public NotifySingleTask(String dataId, String group, String tenant, long lastModified, String target,
                boolean isBeta) {
            this(dataId, group, tenant, null, lastModified, target, isBeta);
        }
        
        public NotifySingleTask(String dataId, String group, String tenant, String tag, long lastModified,
                String target, boolean isBeta) {
            super(dataId, group, tenant, lastModified);
            this.target = target;
            this.isBeta = isBeta;
            try {
                dataId = URLEncoder.encode(dataId, Constants.ENCODE);
                group = URLEncoder.encode(group, Constants.ENCODE);
            } catch (UnsupportedEncodingException e) {
                LOGGER.error("URLEncoder encode error", e);
            }
            if (StringUtils.isBlank(tenant)) {
                this.url = MessageFormat.format(URL_PATTERN, target, EnvUtil.getContextPath(), dataId, group);
            } else {
                this.url = MessageFormat
                        .format(URL_PATTERN_TENANT, target, EnvUtil.getContextPath(), dataId, group, tenant);
            }
            if (StringUtils.isNotEmpty(tag)) {
                url = url + "&tag=" + tag;
            }
            failCount = 0;
            // this.executor = executor;
        }
        
        @Override
        public void setFailCount(int count) {
            this.failCount = count;
        }
        
        @Override
        public int getFailCount() {
            return failCount;
        }
        
        public String getTargetIP() {
            return target;
        }
        
    }

该类有两个关键的常量,有一个相同的部分Constants.COMMUNICATION_CONTROLLER_PATH + "/dataChange",而Constants.COMMUNICATION_CONTROLLER_PATH = "/v1/cs/communication",所以我们要继续往下找/v1/cs/communication/dataChange路由对应的方法。

2.3、CommunicationController

/v1/cs/communication/dataChange对应的方法是:

com.alibaba.nacos.config.server.controller#notifyConfigInfo()
     * Notify the change of config information.
     *
     */
    @GetMapping("/dataChange")
    public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId,
            @RequestParam("group") String group,
            @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
            @RequestParam(value = "tag", required = false) String tag) {
        dataId = dataId.trim();
        group = group.trim();
        String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);
        long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);
        String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);
        String isBetaStr = request.getHeader("isBeta");
        if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {
          	// 这里是关键
            dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);
        } else {
            dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);
        }
        return true;
    }

2.4、DumpService相关

com.alibaba.nacos.config.server.service.dump.DumpService#dump()
    /**
     * Add DumpTask to TaskManager, it will execute asynchronously.
     */
    public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) {
        String groupKey = GroupKey2.getKey(dataId, group, tenant);
        String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta));
        dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, lastModified, handleIp, isBeta));
        DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);
    }

可以看到,这里有个dump的任务管理器,新建了一个DumpTask,DumpService使用的任务管理器是继承自NacosDelayTaskExecuteEngine,addTask方法的主要逻辑也在该类中

com.alibaba.nacos.common.task.engine.NacosDelayTaskExecuteEngine#addTask()
    // 这里是新增一个任务
    @Override
    public void addTask(Object key, AbstractDelayTask newTask) {
        lock.lock();
        try {
            AbstractDelayTask existTask = tasks.get(key);
            if (null != existTask) {
                newTask.merge(existTask);
            }
            tasks.put(key, newTask);
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * process tasks in execute engine.
     * 实际处理任务的方法
     */
    protected void processTasks() {
        Collection<Object> keys = getAllTaskKeys();
        for (Object taskKey : keys) {
            AbstractDelayTask task = removeTask(taskKey);
            if (null == task) {
                continue;
            }
          	// 根据taskKey获取任务的执行器
            NacosTaskProcessor processor = getProcessor(taskKey);
            if (null == processor) {
                getEngineLog().error("processor not found for task, so discarded. " + task);
                continue;
            }
            try {
                // ReAdd task if process failed
                if (!processor.process(task)) {
                    retryFailedTask(taskKey, task);
                }
            } catch (Throwable e) {
                getEngineLog().error("Nacos task execute error : " + e.toString(), e);
                retryFailedTask(taskKey, task);
            }
        }
    }
    @Override
    public NacosTaskProcessor getProcessor(Object key) {
      	// 如果根据key找不到执行器,就使用默认的执行器
        return taskProcessors.containsKey(key) ? taskProcessors.get(key) : defaultTaskProcessor;
    }

现在想要知道到底用了哪个执行器,那就要回到DumpService中,查看当初addTask具体使用的是哪个任务管理器

com.alibaba.nacos.config.server.service.dump.DumpService#DumpService()
    /**
     * Here you inject the dependent objects constructively, ensuring that some of the dependent functionality is
     * initialized ahead of time.
     *
     * @param persistService {@link PersistService}
     * @param memberManager  {@link ServerMemberManager}
     */
    public DumpService(PersistService persistService, ServerMemberManager memberManager) {
        this.persistService = persistService;
        this.memberManager = memberManager;
        this.processor = new DumpProcessor(this);
        this.dumpAllProcessor = new DumpAllProcessor(this);
        this.dumpAllBetaProcessor = new DumpAllBetaProcessor(this);
        this.dumpAllTagProcessor = new DumpAllTagProcessor(this);
      	// dump方法中使用的就是这个TaskManager
        this.dumpTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpTaskManager");
      	// 可以看到TaskManager中只注册了一个默认的任务执行器
        this.dumpTaskMgr.setDefaultTaskProcessor(processor);
        
        this.dumpAllTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpAllTaskManager");
        this.dumpAllTaskMgr.setDefaultTaskProcessor(dumpAllProcessor);
        
        this.dumpAllTaskMgr.addProcessor(DumpAllTask.TASK_ID, dumpAllProcessor);
        this.dumpAllTaskMgr.addProcessor(DumpAllBetaTask.TASK_ID, dumpAllBetaProcessor);
        this.dumpAllTaskMgr.addProcessor(DumpAllTagTask.TASK_ID, dumpAllTagProcessor);
        
        DynamicDataSource.getInstance().getDataSource();
    }

根据这个构造方法可以看到,实际的执行器就是this.processor = new DumpProcessor(this);

com.alibaba.nacos.config.server.service.dump.processor.DumpProcessor#process()

该类实现了NacosTaskProcessor接口,那么看一下它的process方法

    @Override
    public boolean process(NacosTask task) {
        final PersistService persistService = dumpService.getPersistService();
        DumpTask dumpTask = (DumpTask) task;
        String[] pair = GroupKey2.parseKey(dumpTask.getGroupKey());
        String dataId = pair[0];
        String group = pair[1];
        String tenant = pair[2];
        long lastModified = dumpTask.getLastModified();
        String handleIp = dumpTask.getHandleIp();
        boolean isBeta = dumpTask.isBeta();
        String tag = dumpTask.getTag();
        
        ConfigDumpEvent.ConfigDumpEventBuilder build = ConfigDumpEvent.builder().namespaceId(tenant).dataId(dataId)
                .group(group).isBeta(isBeta).tag(tag).lastModifiedTs(lastModified).handleIp(handleIp);
        
        if (isBeta) {
            // beta发布,则dump数据,更新beta缓存
            ConfigInfo4Beta cf = persistService.findConfigInfo4Beta(dataId, group, tenant);
            
            build.remove(Objects.isNull(cf));
            build.betaIps(Objects.isNull(cf) ? null : cf.getBetaIps());
            build.content(Objects.isNull(cf) ? null : cf.getContent());
            // 包括以下分支,都有这个方法,
            return DumpConfigHandler.configDump(build.build());
        } else {
            if (StringUtils.isBlank(tag)) {
                ConfigInfo cf = persistService.findConfigInfo(dataId, group, tenant);
                
                build.remove(Objects.isNull(cf));
                build.content(Objects.isNull(cf) ? null : cf.getContent());
                build.type(Objects.isNull(cf) ? null : cf.getType());
                
                return DumpConfigHandler.configDump(build.build());
            } else {
                
                ConfigInfo4Tag cf = persistService.findConfigInfo4Tag(dataId, group, tenant, tag);
                
                build.remove(Objects.isNull(cf));
                build.content(Objects.isNull(cf) ? null : cf.getContent());
                
                return DumpConfigHandler.configDump(build.build());
            }
        }
    }

很明显,这里有一个在各个分支都调用的方法DumpConfigHandler.configDump(),继续跟踪下去:

com.alibaba.nacos.config.server.service.dump.DumpConfigHandler#configDump()

这个方法比较长:

/**
     * trigger config dump event.
     *
     * @param event {@link ConfigDumpEvent}
     * @return {@code true} if the config dump task success , else {@code false}
     */
    public static boolean configDump(ConfigDumpEvent event) {
        final String dataId = event.getDataId();
        final String group = event.getGroup();
        final String namespaceId = event.getNamespaceId();
        final String content = event.getContent();
        final String type = event.getType();
        final long lastModified = event.getLastModifiedTs();
        if (event.isBeta()) {
            boolean result = false;
            if (event.isRemove()) {
                result = ConfigCacheService.removeBeta(dataId, group, namespaceId);
                if (result) {
                    ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
                            ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0);
                }
                return result;
            } else {
                result = ConfigCacheService
                        .dumpBeta(dataId, group, namespaceId, content, lastModified, event.getBetaIps());
                if (result) {
                    ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
                            ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,
                            content.length());
                }
            }
            
            return result;
        }
        if (StringUtils.isBlank(event.getTag())) {
            if (dataId.equals(AggrWhitelist.AGGRIDS_METADATA)) {
                AggrWhitelist.load(content);
            }
            
            if (dataId.equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {
                ClientIpWhiteList.load(content);
            }
            
            if (dataId.equals(SwitchService.SWITCH_META_DATAID)) {
                SwitchService.load(content);
            }
            
            boolean result;
            if (!event.isRemove()) {
                result = ConfigCacheService.dump(dataId, group, namespaceId, content, lastModified, type);
                
                if (result) {
                    ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
                            ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,
                            content.length());
                }
            } else {
                result = ConfigCacheService.remove(dataId, group, namespaceId);
                
                if (result) {
                    ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
                            ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0);
                }
            }
            return result;
        } else {
            //
            boolean result;
            if (!event.isRemove()) {
                result = ConfigCacheService.dumpTag(dataId, group, namespaceId, event.getTag(), content, lastModified);
                if (result) {
                    ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
                            ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,
                            content.length());
                }
            } else {
                result = ConfigCacheService.removeTag(dataId, group, namespaceId, event.getTag());
                if (result) {
                    ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
                            ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0);
                }
            }
            return result;
        }
        
    }

逻辑中,有一个类的方法一直被使用ConfigCacheService,这个类

com.alibaba.nacos.config.server.service.ConfigCacheService

这个类中有很多方法,在上面的configDump所用的到的方法,最后都会调用NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));例如:

    /**
     * Delete config file, and delete cache.
     *
     * @param dataId dataId string value.
     * @param group  group string value.
     * @param tenant tenant string value.
     * @return remove success or not.
     */
    public static boolean remove(String dataId, String group, String tenant) {
        final String groupKey = GroupKey2.getKey(dataId, group, tenant);
        final int lockResult = tryWriteLock(groupKey);
        
        // If data is non-existent.
        if (0 == lockResult) {
            DUMP_LOG.info("[remove-ok] {} not exist.", groupKey);
            return true;
        }
        
        // try to lock failed
        if (lockResult < 0) {
            DUMP_LOG.warn("[remove-error] write lock failed. {}", groupKey);
            return false;
        }
        
        try {
            if (!PropertyUtil.isDirectRead()) {
                DiskUtil.removeConfigInfo(dataId, group, tenant);
            }
            CACHE.remove(groupKey);
          	// 这里就是我们最终要找的时间发布
            NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));
            
            return true;
        } finally {
            releaseWriteLock(groupKey);
        }
    }

到这里,我们终于从ConfigDataChangeEvent到达了LocalDataChangeEvent,两个不同的事件终于串联起来了。

2.5、Nacos总结

从ConfigDataChangeEvent到LocalDataChangeEvent,某个节点publish配置后,会往整个集群同步这一信息,各个节点都会发起LocalDataChangeEvent事件。这里说一下,如何将这两个事件关联起来的,我是先分别查找两个事件及其Subscriber。对于ConfigDataChangeEvent,最后追踪到了com.alibaba.nacos.config.server.service.notify.AsyncNotifyService#AsyncNotifyService(),以及后面的restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task)),当时并没有意识到NotifySingleTask中就有路由地址。后来转而去查找LocalDataChangeEvent及其Subscriber,最后找到了DumpConfigHandler。此时二者还是无法关联起来,然后重新ConfigDataChangeEvent的AsyncNotifyService的各个内部类,发现了communication的路由。找到CommunicationController之后看到了一个熟悉的词—Dump,然后从DumpService出发将这两个事件关联了起来。

对于网上很多文章,直接说ConfigDataChangeEvent导致了LocalDataChangeEvent发生,从整体上来看也可以算对,但是似乎有意混淆了二者的不同,并且没有对二者如何关联探究思考。

3、自行实现长轮询的思考

可以简化事件通知的机制,直接使用消息队列直接向网关集群广播相关事件,因为常见业务对于外部中间件的依赖要求没有那么严格,对于常用的中间件能够被利用起来的,都可以使用。

  • 本文作者: fanchw
  • 本文链接: https://www.fanchw.xyz/archives/chang-lun-xun-yu-nacos
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# CAP # 分布式 # 计算机网络 # MySQL # 源码 # 备份 # Redis
对CAP理论的理解
  • 文章目录
  • 站点概览
fanchw

fanchw

11 日志
5 分类
7 标签
Creative Commons
© 2023 fanchw
由 Halo 强力驱动
|
主题 - NexT.Pisces v5.1.4
皖ICP备19014634号-1

皖公网安备 34180202000448号