为了兼容只允许HTTP协议的客户,对长轮询的方式进行调研。
1、Servlet 3.0的AsyncContext
一般情况下,Servlet都是用一个线程对应一次请求,从接收请求到逻辑处理再到做出响应都是由一个线程负责,其中如果出现耗时的操作(如IO),那么线程只能阻塞等待。在这种情况下,线程无法及时释放回收至HTTP线程池,当并发量变大时,会带来性能问题。所以在Servlet 3.0引入了异步处理。以下是实例代码:
@WebServlet(urlPatterns={"/asyncservlet"}, asyncSupported=true)
public class AsyncServlet extends HttpServlet {
/* ... Same variables and init method as in SyncServlet ... */
@Override
public void doGet(HttpServletRequest request,
HttpServletResponse response) {
response.setContentType("text/html;charset=UTF-8");
// 通过HttpServletRequest开启异步
final AsyncContext acontext = request.startAsync();
acontext.start(new Runnable() {
public void run() {
String param = acontext.getRequest().getParameter("param");
String result = resource.process(param);
HttpServletResponse response = acontext.getResponse();
/* ... print to the response ... */
// 通知Servlet容器请求处理完成,做出响应。
acontext.complete();
}
});
}
}
此处示例中,通过start方法向Servlet容器申请新的线程处理业务逻辑,实际上Servlet3.0并未对异步任务实现做出规定,不同的容器可以有不同实现。除此之外,还可以使用自定义的线程完成异步任务:
@WebServlet(urlPatterns={"/asyncservlet"}, asyncSupported=true)
public class AsyncServlet extends HttpServlet {
/* ... Same variables and init method as in SyncServlet ... */
@Override
public void doGet(HttpServletRequest request,
HttpServletResponse response) {
response.setContentType("text/html;charset=UTF-8");
// 通过HttpServletRequest开启异步
final AsyncContext acontext = request.startAsync();
Runnable runnable = new Runnable() {
public void run() {
String param = acontext.getRequest().getParameter("param");
String result = resource.process(param);
HttpServletResponse response = acontext.getResponse();
/* ... print to the response ... */
// 通知Servlet容器请求处理完成,做出响应。
acontext.complete();
}
};
// 自定义线程执行异步操作
new Thread(runnable).start();
}
}
当然,一般情况下我们都是自定义线程池来执行任务,这种直接创建线程的方式只是作为示例。
注意:只有使用AsyncContext才能够达到上面所讲的效果,如果直接new Thread()或者类似的方式的,HTTP Thread并不会归还到容器,响应的过程依旧是由原来的HTTP Thread执行。
@PostMapping("/async-context")
public void asyncContext(HttpServletRequest request,
HttpServletResponse response) throws IOException {
request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
log.info("Before startAsync.");
final AsyncContext asyncContext = request.startAsync();
log.info("After startAsync.");
asyncContext.start(() -> {
try {
response.getWriter().write("Async back!");
Thread.sleep(3000L);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
log.info("Before complete!");
asyncContext.complete();
log.info("After complete!");
});
log.info("After start!");
}
@PostMapping("/direct-thread")
public void directThread(HttpServletRequest request,
HttpServletResponse response) throws IOException {
log.info("After startAsync.");
final Runnable runnable = () -> {
try {
log.info("direct-thread start!");
response.getWriter().write("Direct Thread back!");
Thread.sleep(3000L);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
log.info("direct-thread end!");
};
log.info("After start!");
new Thread(runnable).start();
}
如上有两个基于SpringBoot的接口,请求async-context接口,由于使用了AsyncContext并且在休眠后才调用complete()方法,所以客户端需要等待大概3秒获得响应;而请求direct-thread接口时,调用response.getWriter().write()后,客户端会立刻得到响应,自行创建的线程依然会休眠3秒。
2、Nacos1.X Server配置变化与长轮询
在Nacos(Server版本1.4.2)中,服务端长轮询就是用了AsyncContext。目前网上有许多讲述Nacos长轮询原理的文章,这里就不再重复讲述。以下列出一些文章:
https://blog.csdn.net/Zong_0915/article/details/113089265
但是绝大部分文章都有一个问题没有讲清楚:服务端在长轮询超时之前,如果有对应的配置变化,可以及时地向客户端响应,而不必等到长轮询timeout。关于这一功能的实现,Nacos确实是使用了事件发布与监听(观察者模式)实现的。但是配置变更时发布的事件和长轮询监听的事件是不同的,二者也不是父类子类的关系,长轮询的订阅者Subscriber也没有直接订阅配置变更的事件。换句话说,通过/publis接口发布配置锁触发的事件,是通过其它方式又触发了长轮询关注的事件,而不是像许多网上文章所说,将二者混为一谈,简单错误地认为这两个事件是等同的。这里,将配置变更的ConfigDataChangeEvent与LocalDataChangeEvent之间的关系详细说明一下。
2.1、服务端变更配置
com.alibaba.nacos.config.server.controller#publishConfig()
/**
* Adds or updates non-aggregated data.
*
* @throws NacosException NacosException.
*/
@PostMapping
@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
@RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
@RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,
@RequestParam(value = "appName", required = false) String appName,
@RequestParam(value = "src_user", required = false) String srcUser,
@RequestParam(value = "config_tags", required = false) String configTags,
@RequestParam(value = "desc", required = false) String desc,
@RequestParam(value = "use", required = false) String use,
@RequestParam(value = "effect", required = false) String effect,
@RequestParam(value = "type", required = false) String type,
@RequestParam(value = "schema", required = false) String schema) throws NacosException {
final String srcIp = RequestUtil.getRemoteIp(request);
final String requestIpApp = RequestUtil.getAppName(request);
srcUser = RequestUtil.getSrcUserName(request);
//check type
if (!ConfigType.isValidType(type)) {
type = ConfigType.getDefaultType().getType();
}
// check tenant
ParamUtils.checkTenant(tenant);
ParamUtils.checkParam(dataId, group, "datumId", content);
ParamUtils.checkParam(tag);
Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(10);
MapUtils.putIfValNoNull(configAdvanceInfo, "config_tags", configTags);
MapUtils.putIfValNoNull(configAdvanceInfo, "desc", desc);
MapUtils.putIfValNoNull(configAdvanceInfo, "use", use);
MapUtils.putIfValNoNull(configAdvanceInfo, "effect", effect);
MapUtils.putIfValNoNull(configAdvanceInfo, "type", type);
MapUtils.putIfValNoNull(configAdvanceInfo, "schema", schema);
ParamUtils.checkParam(configAdvanceInfo);
if (AggrWhitelist.isAggrDataId(dataId)) {
LOGGER.warn("[aggr-conflict] {} attempt to publish single data, {}, {}", RequestUtil.getRemoteIp(request),
dataId, group);
throw new NacosException(NacosException.NO_RIGHT, "dataId:" + dataId + " is aggr");
}
final Timestamp time = TimeUtils.getCurrentTime();
String betaIps = request.getHeader("betaIps");
ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
configInfo.setType(type);
if (StringUtils.isBlank(betaIps)) {
if (StringUtils.isBlank(tag)) {
persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true);
// 发布了一个ConfigDataChangeEvent事件,下面的其它分支也会发布该事件
ConfigChangePublisher
.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
} else {
persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true);
ConfigChangePublisher.notifyConfigChange(
new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
}
} else {
// beta publish
persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, true);
ConfigChangePublisher
.notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
}
ConfigTraceService
.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), InetUtils.getSelfIP(),
ConfigTraceService.PERSISTENCE_EVENT_PUB, content);
return true;
}
2.2、ConfigDataChangeEvent事件监听
该事件的监听器的注册在
com.alibaba.nacos.config.server.service.notify.AsyncNotifyService#AsyncNotifyService()
具体逻辑:
@Autowired
public AsyncNotifyService(ServerMemberManager memberManager) {
this.memberManager = memberManager;
// Register ConfigDataChangeEvent to NotifyCenter.
NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);
// Register A Subscriber to subscribe ConfigDataChangeEvent.
NotifyCenter.registerSubscriber(new Subscriber() {
@Override
public void onEvent(Event event) {
// Generate ConfigDataChangeEvent concurrently
if (event instanceof ConfigDataChangeEvent) {
ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
long dumpTs = evt.lastModifiedTs;
String dataId = evt.dataId;
String group = evt.group;
String tenant = evt.tenant;
String tag = evt.tag;
// 集群所有节点
Collection<Member> ipList = memberManager.allMembers();
// In fact, any type of queue here can be
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
// 将所有节点添加至通知队列中
for (Member member : ipList) {
queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
evt.isBeta));
}
// 包装了一个异步任务,通知每一个节点
ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue));
}
}
@Override
public Class<? extends Event> subscribeType() {
return ConfigDataChangeEvent.class;
}
});
}
接下来主要看ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue));中的AsyncTask,它是一个内部类:
class AsyncTask implements Runnable {
private Queue<NotifySingleTask> queue;
private NacosAsyncRestTemplate restTemplate;
public AsyncTask(NacosAsyncRestTemplate restTemplate, Queue<NotifySingleTask> queue) {
this.restTemplate = restTemplate;
this.queue = queue;
}
@Override
public void run() {
executeAsyncInvoke();
}
private void executeAsyncInvoke() {
while (!queue.isEmpty()) {
NotifySingleTask task = queue.poll();
String targetIp = task.getTargetIP();
if (memberManager.hasMember(targetIp)) {
// start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify 节点的健康检查
boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp);
if (unHealthNeedDelay) {
// target ip is unhealthy, then put it in the notification list
ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,
0, task.target);
// get delay time and set fail count to the task
asyncTaskExecute(task);
} else {
Header header = Header.newInstance();
header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified()));
header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP());
if (task.isBeta) {
header.addParam("isBeta", "true");
}
AuthHeaderUtil.addIdentityToHeader(header);
// 这里使用restTemplate进行远程调用
restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task));
}
}
}
}
}
private void asyncTaskExecute(NotifySingleTask task) {
int delay = getDelayTime(task);
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
queue.add(task);
AsyncTask asyncTask = new AsyncTask(nacosAsyncRestTemplate, queue);
ConfigExecutor.scheduleAsyncNotify(asyncTask, delay, TimeUnit.MILLISECONDS);
}
这里要看一下restTemplate到底调用了哪个接口,NotifySingleTask的定义中给出了接口地址:
static class NotifySingleTask extends NotifyTask {
private String target;
public String url;
private boolean isBeta;
private static final String URL_PATTERN =
"http://{0}{1}" + Constants.COMMUNICATION_CONTROLLER_PATH + "/dataChange" + "?dataId={2}&group={3}";
private static final String URL_PATTERN_TENANT =
"http://{0}{1}" + Constants.COMMUNICATION_CONTROLLER_PATH + "/dataChange"
+ "?dataId={2}&group={3}&tenant={4}";
private int failCount;
public NotifySingleTask(String dataId, String group, String tenant, long lastModified, String target) {
this(dataId, group, tenant, lastModified, target, false);
}
public NotifySingleTask(String dataId, String group, String tenant, long lastModified, String target,
boolean isBeta) {
this(dataId, group, tenant, null, lastModified, target, isBeta);
}
public NotifySingleTask(String dataId, String group, String tenant, String tag, long lastModified,
String target, boolean isBeta) {
super(dataId, group, tenant, lastModified);
this.target = target;
this.isBeta = isBeta;
try {
dataId = URLEncoder.encode(dataId, Constants.ENCODE);
group = URLEncoder.encode(group, Constants.ENCODE);
} catch (UnsupportedEncodingException e) {
LOGGER.error("URLEncoder encode error", e);
}
if (StringUtils.isBlank(tenant)) {
this.url = MessageFormat.format(URL_PATTERN, target, EnvUtil.getContextPath(), dataId, group);
} else {
this.url = MessageFormat
.format(URL_PATTERN_TENANT, target, EnvUtil.getContextPath(), dataId, group, tenant);
}
if (StringUtils.isNotEmpty(tag)) {
url = url + "&tag=" + tag;
}
failCount = 0;
// this.executor = executor;
}
@Override
public void setFailCount(int count) {
this.failCount = count;
}
@Override
public int getFailCount() {
return failCount;
}
public String getTargetIP() {
return target;
}
}
该类有两个关键的常量,有一个相同的部分Constants.COMMUNICATION_CONTROLLER_PATH + "/dataChange",而Constants.COMMUNICATION_CONTROLLER_PATH = "/v1/cs/communication",所以我们要继续往下找/v1/cs/communication/dataChange路由对应的方法。
2.3、CommunicationController
/v1/cs/communication/dataChange对应的方法是:
com.alibaba.nacos.config.server.controller#notifyConfigInfo()
* Notify the change of config information.
*
*/
@GetMapping("/dataChange")
public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId,
@RequestParam("group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
@RequestParam(value = "tag", required = false) String tag) {
dataId = dataId.trim();
group = group.trim();
String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);
long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);
String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);
String isBetaStr = request.getHeader("isBeta");
if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {
// 这里是关键
dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);
} else {
dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);
}
return true;
}
2.4、DumpService相关
com.alibaba.nacos.config.server.service.dump.DumpService#dump()
/**
* Add DumpTask to TaskManager, it will execute asynchronously.
*/
public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) {
String groupKey = GroupKey2.getKey(dataId, group, tenant);
String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta));
dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, lastModified, handleIp, isBeta));
DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);
}
可以看到,这里有个dump的任务管理器,新建了一个DumpTask,DumpService使用的任务管理器是继承自NacosDelayTaskExecuteEngine,addTask方法的主要逻辑也在该类中
com.alibaba.nacos.common.task.engine.NacosDelayTaskExecuteEngine#addTask()
// 这里是新增一个任务
@Override
public void addTask(Object key, AbstractDelayTask newTask) {
lock.lock();
try {
AbstractDelayTask existTask = tasks.get(key);
if (null != existTask) {
newTask.merge(existTask);
}
tasks.put(key, newTask);
} finally {
lock.unlock();
}
}
/**
* process tasks in execute engine.
* 实际处理任务的方法
*/
protected void processTasks() {
Collection<Object> keys = getAllTaskKeys();
for (Object taskKey : keys) {
AbstractDelayTask task = removeTask(taskKey);
if (null == task) {
continue;
}
// 根据taskKey获取任务的执行器
NacosTaskProcessor processor = getProcessor(taskKey);
if (null == processor) {
getEngineLog().error("processor not found for task, so discarded. " + task);
continue;
}
try {
// ReAdd task if process failed
if (!processor.process(task)) {
retryFailedTask(taskKey, task);
}
} catch (Throwable e) {
getEngineLog().error("Nacos task execute error : " + e.toString(), e);
retryFailedTask(taskKey, task);
}
}
}
@Override
public NacosTaskProcessor getProcessor(Object key) {
// 如果根据key找不到执行器,就使用默认的执行器
return taskProcessors.containsKey(key) ? taskProcessors.get(key) : defaultTaskProcessor;
}
现在想要知道到底用了哪个执行器,那就要回到DumpService中,查看当初addTask具体使用的是哪个任务管理器
com.alibaba.nacos.config.server.service.dump.DumpService#DumpService()
/**
* Here you inject the dependent objects constructively, ensuring that some of the dependent functionality is
* initialized ahead of time.
*
* @param persistService {@link PersistService}
* @param memberManager {@link ServerMemberManager}
*/
public DumpService(PersistService persistService, ServerMemberManager memberManager) {
this.persistService = persistService;
this.memberManager = memberManager;
this.processor = new DumpProcessor(this);
this.dumpAllProcessor = new DumpAllProcessor(this);
this.dumpAllBetaProcessor = new DumpAllBetaProcessor(this);
this.dumpAllTagProcessor = new DumpAllTagProcessor(this);
// dump方法中使用的就是这个TaskManager
this.dumpTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpTaskManager");
// 可以看到TaskManager中只注册了一个默认的任务执行器
this.dumpTaskMgr.setDefaultTaskProcessor(processor);
this.dumpAllTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpAllTaskManager");
this.dumpAllTaskMgr.setDefaultTaskProcessor(dumpAllProcessor);
this.dumpAllTaskMgr.addProcessor(DumpAllTask.TASK_ID, dumpAllProcessor);
this.dumpAllTaskMgr.addProcessor(DumpAllBetaTask.TASK_ID, dumpAllBetaProcessor);
this.dumpAllTaskMgr.addProcessor(DumpAllTagTask.TASK_ID, dumpAllTagProcessor);
DynamicDataSource.getInstance().getDataSource();
}
根据这个构造方法可以看到,实际的执行器就是this.processor = new DumpProcessor(this);
com.alibaba.nacos.config.server.service.dump.processor.DumpProcessor#process()
该类实现了NacosTaskProcessor接口,那么看一下它的process方法
@Override
public boolean process(NacosTask task) {
final PersistService persistService = dumpService.getPersistService();
DumpTask dumpTask = (DumpTask) task;
String[] pair = GroupKey2.parseKey(dumpTask.getGroupKey());
String dataId = pair[0];
String group = pair[1];
String tenant = pair[2];
long lastModified = dumpTask.getLastModified();
String handleIp = dumpTask.getHandleIp();
boolean isBeta = dumpTask.isBeta();
String tag = dumpTask.getTag();
ConfigDumpEvent.ConfigDumpEventBuilder build = ConfigDumpEvent.builder().namespaceId(tenant).dataId(dataId)
.group(group).isBeta(isBeta).tag(tag).lastModifiedTs(lastModified).handleIp(handleIp);
if (isBeta) {
// beta发布,则dump数据,更新beta缓存
ConfigInfo4Beta cf = persistService.findConfigInfo4Beta(dataId, group, tenant);
build.remove(Objects.isNull(cf));
build.betaIps(Objects.isNull(cf) ? null : cf.getBetaIps());
build.content(Objects.isNull(cf) ? null : cf.getContent());
// 包括以下分支,都有这个方法,
return DumpConfigHandler.configDump(build.build());
} else {
if (StringUtils.isBlank(tag)) {
ConfigInfo cf = persistService.findConfigInfo(dataId, group, tenant);
build.remove(Objects.isNull(cf));
build.content(Objects.isNull(cf) ? null : cf.getContent());
build.type(Objects.isNull(cf) ? null : cf.getType());
return DumpConfigHandler.configDump(build.build());
} else {
ConfigInfo4Tag cf = persistService.findConfigInfo4Tag(dataId, group, tenant, tag);
build.remove(Objects.isNull(cf));
build.content(Objects.isNull(cf) ? null : cf.getContent());
return DumpConfigHandler.configDump(build.build());
}
}
}
很明显,这里有一个在各个分支都调用的方法DumpConfigHandler.configDump(),继续跟踪下去:
com.alibaba.nacos.config.server.service.dump.DumpConfigHandler#configDump()
这个方法比较长:
/**
* trigger config dump event.
*
* @param event {@link ConfigDumpEvent}
* @return {@code true} if the config dump task success , else {@code false}
*/
public static boolean configDump(ConfigDumpEvent event) {
final String dataId = event.getDataId();
final String group = event.getGroup();
final String namespaceId = event.getNamespaceId();
final String content = event.getContent();
final String type = event.getType();
final long lastModified = event.getLastModifiedTs();
if (event.isBeta()) {
boolean result = false;
if (event.isRemove()) {
result = ConfigCacheService.removeBeta(dataId, group, namespaceId);
if (result) {
ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0);
}
return result;
} else {
result = ConfigCacheService
.dumpBeta(dataId, group, namespaceId, content, lastModified, event.getBetaIps());
if (result) {
ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,
content.length());
}
}
return result;
}
if (StringUtils.isBlank(event.getTag())) {
if (dataId.equals(AggrWhitelist.AGGRIDS_METADATA)) {
AggrWhitelist.load(content);
}
if (dataId.equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {
ClientIpWhiteList.load(content);
}
if (dataId.equals(SwitchService.SWITCH_META_DATAID)) {
SwitchService.load(content);
}
boolean result;
if (!event.isRemove()) {
result = ConfigCacheService.dump(dataId, group, namespaceId, content, lastModified, type);
if (result) {
ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,
content.length());
}
} else {
result = ConfigCacheService.remove(dataId, group, namespaceId);
if (result) {
ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0);
}
}
return result;
} else {
//
boolean result;
if (!event.isRemove()) {
result = ConfigCacheService.dumpTag(dataId, group, namespaceId, event.getTag(), content, lastModified);
if (result) {
ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,
content.length());
}
} else {
result = ConfigCacheService.removeTag(dataId, group, namespaceId, event.getTag());
if (result) {
ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0);
}
}
return result;
}
}
逻辑中,有一个类的方法一直被使用ConfigCacheService,这个类
com.alibaba.nacos.config.server.service.ConfigCacheService
这个类中有很多方法,在上面的configDump所用的到的方法,最后都会调用NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));例如:
/**
* Delete config file, and delete cache.
*
* @param dataId dataId string value.
* @param group group string value.
* @param tenant tenant string value.
* @return remove success or not.
*/
public static boolean remove(String dataId, String group, String tenant) {
final String groupKey = GroupKey2.getKey(dataId, group, tenant);
final int lockResult = tryWriteLock(groupKey);
// If data is non-existent.
if (0 == lockResult) {
DUMP_LOG.info("[remove-ok] {} not exist.", groupKey);
return true;
}
// try to lock failed
if (lockResult < 0) {
DUMP_LOG.warn("[remove-error] write lock failed. {}", groupKey);
return false;
}
try {
if (!PropertyUtil.isDirectRead()) {
DiskUtil.removeConfigInfo(dataId, group, tenant);
}
CACHE.remove(groupKey);
// 这里就是我们最终要找的时间发布
NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));
return true;
} finally {
releaseWriteLock(groupKey);
}
}
到这里,我们终于从ConfigDataChangeEvent到达了LocalDataChangeEvent,两个不同的事件终于串联起来了。
2.5、Nacos总结
从ConfigDataChangeEvent到LocalDataChangeEvent,某个节点publish配置后,会往整个集群同步这一信息,各个节点都会发起LocalDataChangeEvent事件。这里说一下,如何将这两个事件关联起来的,我是先分别查找两个事件及其Subscriber。对于ConfigDataChangeEvent,最后追踪到了com.alibaba.nacos.config.server.service.notify.AsyncNotifyService#AsyncNotifyService(),以及后面的restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task)),当时并没有意识到NotifySingleTask中就有路由地址。后来转而去查找LocalDataChangeEvent及其Subscriber,最后找到了DumpConfigHandler。此时二者还是无法关联起来,然后重新ConfigDataChangeEvent的AsyncNotifyService的各个内部类,发现了communication的路由。找到CommunicationController之后看到了一个熟悉的词—Dump,然后从DumpService出发将这两个事件关联了起来。
对于网上很多文章,直接说ConfigDataChangeEvent导致了LocalDataChangeEvent发生,从整体上来看也可以算对,但是似乎有意混淆了二者的不同,并且没有对二者如何关联探究思考。
3、自行实现长轮询的思考
可以简化事件通知的机制,直接使用消息队列直接向网关集群广播相关事件,因为常见业务对于外部中间件的依赖要求没有那么严格,对于常用的中间件能够被利用起来的,都可以使用。