前面我已经讲了许多关于NameNode的后台任务线程:HeartbeatMonitor、DecommissionManager$Monitor、LeaseManager$Monitor、PendingReplicationMonitor等,今天终于可以讲一讲ReplicationMonitor ,它在FSNamesystem中可算是大家伙了。那么,NameNode$FSNamesystem到底用ReplicationMonitor 来干啥子用的呢?其实,从它的名字我们就应该可以窥测出的大概。是的,ReplicationMonitor主要用来检测所有文件的Blocks的副本情况,对于Blocks的副本多了或者不够,同时也包括无效的Blocks,他都会进行相应的处理措施。先来具体的看看ReplicationMonitor的源代码吧!
简单的说明一下先:
INVALIDATE_WORK_PCI_PER_ITERATION:是用来限制每一次处理Blocks的数量;
INVALIDATE_WORK_PCI_PER_ITERATION:是用来限制每一次处理有无效Blocks的数据节点数量;
replicationRecheckInterval:每一次检测的间隔时间(由配置文件的项dfs.replication.interval设置,单位为秒);
再来具体看看方法computeDatanodeWork和processPendingReplications都干了些什么。
对于方法computeDatanodeWork,它主要负责安排那些Blocks可以复制它的副本(当然这还包括安排那些数据节点去copy block的副本),那些数据节点处理它的无效Blocks。
public int computeDatanodeWork() throws IOException { int workFound = 0; int blocksToProcess = 0; int nodesToProcess = 0; // blocks should not be replicated or removed if safe mode is on if (isInSafeMode()) return workFound; synchronized(heartbeats) { blocksToProcess = (int)(heartbeats.size() * ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION); nodesToProcess = (int)Math.ceil((double)heartbeats.size() * ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100); }
//安排一些block的replication workFound = computeReplicationWork(blocksToProcess); // Update FSNamesystemMetrics counters synchronized (this) { pendingReplicationBlocksCount = pendingReplications.size(); underReplicatedBlocksCount = neededReplications.size(); scheduledReplicationBlocksCount = workFound; corruptReplicaBlocksCount = corruptReplicas.size(); } //安排一些数据节点去处理它的无效blocks
workFound += computeInvalidateWork(nodesToProcess); return workFound; }
private int computeReplicationWork(int blocksToProcess) throws IOException { //从neededReplications中最多选择blocksToProcess个需要copy副本的Blocks List<List<Block>> blocksToReplicate = chooseUnderReplicatedBlocks(blocksToProcess); // replicate blocks int scheduledReplicationCount = 0; for (int i=0; i<blocksToReplicate.size(); i++) { for(Block block : blocksToReplicate.get(i)) {
if (computeReplicationWorkForBlock(block, i)) { //安排block的副本的copy工作给那些数据节点 scheduledReplicationCount++; } } } return scheduledReplicationCount; }
private int computeInvalidateWork(int nodesToProcess) { int blockCnt = 0; for(int nodeCnt = 0; nodeCnt < nodesToProcess; nodeCnt++ ) { int work = invalidateWorkForOneNode();//安排一个数据节点去处理它的无效Blocks if(work == 0) break; blockCnt += work; } return blockCnt; }
对于调用的方法chooseUnderReplicatedBlocks、computeReplicationWorkForBlock和invalidateWorkForOneNode,我不在继续深入下去了,有兴趣的盆友可以查看相应的源码,它们的功能参看我上面的注释。但是,我还想稍微补充一点的是关于invalidateWorkForOneNode方法,它返回的是数据节点处理它上面的无效Blocks的数量,因为数据节点每一次处理它上面的无效Blocks是由数量限制的,而不是一下子就处理它上面的所有的无效Blocks,这个数量的限制是由FSNamesystem.blockInvalidateLimit决定的,它的值是Math.max(100,20*(int)(heartbeatInterval/1000))。
对于方法processPendingReplications,它主要负责将pendingReplications中复制副本超时(失败)的Blocks重新交给replicationMonitor来处理。具体代码如下:
void processPendingReplications() { Block[] timedOutItems = pendingReplications.getTimedOutBlocks(); if (timedOutItems != null) { synchronized (this) { for (int i = 0; i < timedOutItems.length; i++) { NumberReplicas num = countNodes(timedOutItems[i]); neededReplications.add(timedOutItems[i],num.liveReplicas(),num.decommissionedReplicas(),getReplication(timedOutItems[i])); } } } }
那么,pendingReplications中不够副本数量的blocks是从哪里来的呢?它主要来自以下几个地方:
1. 当lease被删除时,需要检测和这个租约关联的hdfs文件的block数是否和期望的一致,如果小于期望值则将这个块加入到需要复制队列中;
2. 当离开安全模式时需要校验块的复制情况,如果没达到复制因子的则加入到需要复制队列中;
3. 当datanode注册到namenode时需要校验这个datanode是否处于正准备退役阶段,如果是那需要检测该datanode节点上的所有block的复制数是否已经达到复制因子,如果没有则需要加入到需要复制队列中;
4. 当DecommissionManager的监控线程执行检测时,如果发现某个退役节点处于正准备退役阶段,则对该退役节点的所有块执行检测,查看是否达到复制因子,如果没有达到则将该block加入到需要复制队列中;
好了,关于NameNode的后台工作线程ReplicationMonitor就简单地先讲到这里了。