这里主要说下在task初始化时的distribute purge相关的类:TrackerDistributedCacheManager
4.6.0的distribute purge行为由4个参数控制。
1
2
3
4
local
.cache.size 
#默认10737418240
mapreduce.tasktracker.cache.
local
.numberdirectories 
#默认10000
mapreduce.tasktracker.cache.
local
.keep.pct 
#默认0.95
mapreduce.tasktracker.distributedcache.checkperiod 
#默认1分钟
在声明一个TrackerDistributedCacheManager类的实例时,会初始化一个CleanupThread类的实例。
1
this
.cleanupThread=newCleanupThread(conf);
并启动
1
2
3
4
5
6
7
8
9
publicvoidstartCleanupThread() {
this
.cleanupThread.start();
  
}
(由TaskTracker类的initialize方法调用
// Initialize DistributedCache
this
.distributedCacheManager=newTrackerDistributedCacheManager(
this
.fConf,taskController);
this
.distributedCacheManager.startCleanupThread();
CleanupThread是TrackerDistributedCacheManager类的内部类:
其原理是启动一个thread,定期去触发 BaseDirManager类的checkAndCleanup方法,不会阻塞当前进程。间隔时间由mapreduce.tasktracker.distributedcache.checkperiod控制。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public 
void 
run() {
  
while 
( running) {
    
try 
{
      
Thread. sleep(cleanUpCheckPeriod);
      
baseDirManager.checkAndCleanup();  
//调用checkAndCleanup方法
    
catch 
(IOException e) {
      
LOG.error(
"Exception in DistributedCache CleanupThread." 
, e);
    
catch
(InterruptedException e) {
      
LOG.info(
"Cleanup..." 
,e);
      
//To force us to exit cleanly
      
running = 
false
;
    
catch 
(Throwable t) {
      
exitTaskTracker(t);
    
}
  
}
}
BaseDirManager也是TrackerDistributedCacheManager的内部类,控制distribute cache的删除操作和删除后状态数据的更新:
其中checkAndCleanup方法的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    
Collection<CacheStatus> toBeDeletedCache = 
new 
LinkedList<CacheStatus>();
    
HashMap<Path, CacheDir> toBeCleanedBaseDir = 
new 
HashMap<Path, CacheDir>();
.........
         
for 
(Map.Entry<Path, CacheDir> baseDir : properties.entrySet()) { 
//循环遍历mapred.local.dir目录大小和文件量
          
CacheDir baseDirCounts = baseDir.getValue();
          
LOG.debug(baseDir.getKey() + 
": allowedCacheSize=" 
+ allowedCacheSize +
              
",baseDirCounts.size=" 
+ baseDirCounts.size +
              
",allowedCacheSubdirs=" 
+ allowedCacheSubdirs +
              
",baseDirCounts.subdirs=" 
+ baseDirCounts.subdirs);
          
if 
(allowedCacheSize < baseDirCounts.size ||
              
allowedCacheSubdirs < baseDirCounts.subdirs) { 
//触发purge的条件(local.cache.size小于某一个目录大小,mapreduce.tasktracker.cache.local.numberdirectories小于某一个文件下的文件数量)
            
CacheDir tcc = 
new 
CacheDir();
            
tcc.size = baseDirCounts.size - allowedCacheSizeCleanupGoal;
            
tcc.subdirs = baseDirCounts.subdirs - allowedCacheSubdirsCleanupGoal;
            
toBeCleanedBaseDir.put(baseDir.getKey(), tcc); 
//生成需要删除的目录的HashMap
          
}
        
}
实际的删除动作:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// do the deletion, after releasing the global lock
for 
(CacheStatus cacheStatus : toBeDeletedCache) {  
//循环遍历需要删除的cache
  
cacheStatus. lock.lock();  
//获取删除对象的锁
  
try 
{
    
Path localizedDir = cacheStatus.getLocalizedUniqueDir();
    
if 
(cacheStatus.user == 
null
) {
      
TrackerDistributedCacheManager. LOG.info(
"Deleted path " 
+ localizedDir);
      
try 
{
        
localFs.delete(localizedDir, 
true
);  
//public的情况调用FileSystem的delete方法
      
catch 
(IOException e) {
        
TrackerDistributedCacheManager. LOG.warn(
"Could not delete distributed cache empty directory "
                 
+ localizedDir, e);
      
}
    
else 
      
TrackerDistributedCacheManager. LOG.info(
"Deleted path " 
+ localizedDir + 
" as " 
+ cacheStatus.user );
      
String base = cacheStatus.getBaseDir().toString();
      
String userDir = TaskTracker.getUserDir(cacheStatus. user);
      
int 
skip = base.length() + 
1 
+ userDir.length() + 
1
;
      
String relative = localizedDir.toString().substring(skip);
      
taskController.deleteAsUser(cacheStatus.user , relative);  
//private的情况调用TaskController的deleteAsUser的方法
    
}
    
deleteCacheInfoUpdate(cacheStatus);
  
finally 
{
    
cacheStatus. lock.unlock();
  
}
}