这里主要说下在task初始化时的distribute purge相关的类:TrackerDistributedCacheManager
并启动
4.6.0的distribute purge行为由4个参数控制。
1 2 3 4 | local .cache.size #默认10737418240 mapreduce.tasktracker.cache. local .numberdirectories #默认10000 mapreduce.tasktracker.cache. local .keep.pct #默认0.95 mapreduce.tasktracker.distributedcache.checkperiod #默认1分钟 |
在声明一个TrackerDistributedCacheManager类的实例时,会初始化一个CleanupThread类的实例。
1 | this .cleanupThread=newCleanupThread(conf); |
1 2 3 4 5 6 7 8 9 | publicvoidstartCleanupThread() { this .cleanupThread.start(); } (由TaskTracker类的initialize方法调用 // Initialize DistributedCache this .distributedCacheManager=newTrackerDistributedCacheManager( this .fConf,taskController); this .distributedCacheManager.startCleanupThread(); ) |
CleanupThread是TrackerDistributedCacheManager类的内部类:
其原理是启动一个thread,定期去触发 BaseDirManager类的checkAndCleanup方法,不会阻塞当前进程。间隔时间由mapreduce.tasktracker.distributedcache.checkperiod控制。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | public void run() { while ( running) { try { Thread. sleep(cleanUpCheckPeriod); baseDirManager.checkAndCleanup(); //调用checkAndCleanup方法 } catch (IOException e) { LOG.error( "Exception in DistributedCache CleanupThread." , e); } catch (InterruptedException e) { LOG.info( "Cleanup..." ,e); //To force us to exit cleanly running = false ; } catch (Throwable t) { exitTaskTracker(t); } } } |
BaseDirManager也是TrackerDistributedCacheManager的内部类,控制distribute cache的删除操作和删除后状态数据的更新:
其中checkAndCleanup方法的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | Collection<CacheStatus> toBeDeletedCache = new LinkedList<CacheStatus>(); HashMap<Path, CacheDir> toBeCleanedBaseDir = new HashMap<Path, CacheDir>(); ......... for (Map.Entry<Path, CacheDir> baseDir : properties.entrySet()) { //循环遍历mapred.local.dir目录大小和文件量 CacheDir baseDirCounts = baseDir.getValue(); LOG.debug(baseDir.getKey() + ": allowedCacheSize=" + allowedCacheSize + ",baseDirCounts.size=" + baseDirCounts.size + ",allowedCacheSubdirs=" + allowedCacheSubdirs + ",baseDirCounts.subdirs=" + baseDirCounts.subdirs); if (allowedCacheSize < baseDirCounts.size || allowedCacheSubdirs < baseDirCounts.subdirs) { //触发purge的条件(local.cache.size小于某一个目录大小,mapreduce.tasktracker.cache.local.numberdirectories小于某一个文件下的文件数量) CacheDir tcc = new CacheDir(); tcc.size = baseDirCounts.size - allowedCacheSizeCleanupGoal; tcc.subdirs = baseDirCounts.subdirs - allowedCacheSubdirsCleanupGoal; toBeCleanedBaseDir.put(baseDir.getKey(), tcc); //生成需要删除的目录的HashMap } } |
实际的删除动作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | // do the deletion, after releasing the global lock for (CacheStatus cacheStatus : toBeDeletedCache) { //循环遍历需要删除的cache cacheStatus. lock.lock(); //获取删除对象的锁 try { Path localizedDir = cacheStatus.getLocalizedUniqueDir(); if (cacheStatus.user == null ) { TrackerDistributedCacheManager. LOG.info( "Deleted path " + localizedDir); try { localFs.delete(localizedDir, true ); //public的情况调用FileSystem的delete方法 } catch (IOException e) { TrackerDistributedCacheManager. LOG.warn( "Could not delete distributed cache empty directory " + localizedDir, e); } } else { TrackerDistributedCacheManager. LOG.info( "Deleted path " + localizedDir + " as " + cacheStatus.user ); String base = cacheStatus.getBaseDir().toString(); String userDir = TaskTracker.getUserDir(cacheStatus. user); int skip = base.length() + 1 + userDir.length() + 1 ; String relative = localizedDir.toString().substring(skip); taskController.deleteAsUser(cacheStatus.user , relative); //private的情况调用TaskController的deleteAsUser的方法 } deleteCacheInfoUpdate(cacheStatus); } finally { cacheStatus. lock.unlock(); } } |