Apache Nutch 1.3 學習筆記四(Generate)


 

1. Generate的作用

   Inject之后就是Generate,這個方法主要是從CrawlDb中產生一個Fetch可以抓取的url集合(fetchlist),再結合一定的過濾條件,它的命令行如下:
   

 

  1. bin/nutch generate  
  2.    Usage: Generator <crawldb> <segments_dir> [-force] [-topN N] [-numFetchers numFetchers] [-adddays numDays] [-noFilter] [-noNorm][-maxNumSegments num]  


   
參數說明:
   * crawldb: crawldb
的相對路徑
   * segments: segments
的相對路徑
   * force:  
這個主要是對目錄進行加鎖用的配置,如果為true,當目標鎖文件存在的,會認為是有效的,但如果為false,當目標文件存在時,就就會拋出IOException
   * topN:
這里表示產生TopNurl
   * numFetchers:
這里是指GenerateMP任務要幾個Reducer節點,也就是要幾個輸出文件,這個配置會影響到FetcherMap個數。
   * numDays:
這里是表示當前的日期,是在對url過濾中用到的
   * noFilter:
這里表示是否對url進行過濾
   * noNorm:
這里表示是否以url進行規格化
   * maxNumSegments:
這里表示segment的最大個數
   
Nutch 1.3 版本中,支持在一次Generate為多個segment產生相應的fetchlists,而IP地址的解析只針對那些准備被抓取的url,在一個segment中,所有url都以IP,domain或者host來分類。

2. Generate源代碼分析

   generate可能主要分成三部分,

  1.    + 第一部分是產生要抓取的url子集,進行相應的過濾和規格化操作
  2.    + 第二部分是讀取上面產生的url子集,生成多個segment
  3.    + 第三部分是更新crawldb數據庫,以保證下一次Generate不會包含相同的url

   2.1 第一部分,產生url子集分析

   這里主要是一個MP任務,用於產生相應的url抓取集合,主要代碼如下:
  

 

  1. // map to inverted subset due for fetch, sort by score  
  2.    JobConf job = new NutchJob(getConf());  
  3.    job.setJobName("generate: select from " + dbDir);  
  4.     
  5.     
  6.    // 如果用戶沒有設置numFetchers這個值,那就默認為Map的個數  
  7.    if (numLists == -1) { // for politeness make  
  8.      numLists = job.getNumMapTasks(); // a partition per fetch task  
  9.    }  
  10. // 如果MapReduce的設置為local,那就產生一個輸出文件  
  11. // NOTE:這里partition也是Hadoop中的一個概念,就是在Map后,它會對每一個key進行partition操作,看這個key會映射到哪一個reduce上,  
  12. // 所以相同keyvalue就會聚合到這個reduce節點上  
  13.    if ("local".equals(job.get("mapred.job.tracker")) && numLists != 1) {  
  14.      // override  
  15.      LOG.info("Generator: jobtracker is 'local', generating exactly one partition.");  
  16.      numLists = 1;  
  17.    }  
  18.    job.setLong(GENERATOR_CUR_TIME, curTime);  
  19.    // record real generation time  
  20.    long generateTime = System.currentTimeMillis();  
  21.    job.setLong(Nutch.GENERATE_TIME_KEY, generateTime);  
  22.    job.setLong(GENERATOR_TOP_N, topN);  
  23.    job.setBoolean(GENERATOR_FILTER, filter);  
  24.    job.setBoolean(GENERATOR_NORMALISE, norm);  
  25.    job.setInt(GENERATOR_MAX_NUM_SEGMENTS, maxNumSegments);  
  26.     
  27.     
  28.    // 配置輸入路徑  
  29.    FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME));  
  30.    job.setInputFormat(SequenceFileInputFormat.class);  // 配置CrawlDb的輸入格式  
  31.     
  32.     
  33.    // 配置Mapper,PartitionerReducer,這里都是Selector,因為它繼承了這三個抽象接口  
  34.    job.setMapperClass(Selector.class);                   
  35.    job.setPartitionerClass(Selector.class);  
  36.    job.setReducerClass(Selector.class);  
  37.     
  38.     
  39.    FileOutputFormat.setOutputPath(job, tempDir);  
  40. // 配置輸出格式  
  41.    job.setOutputFormat(SequenceFileOutputFormat.class);  
  42. // 配置輸出的<key,value>的類型<FloatWritable,SelectorEntry>  
  43.    job.setOutputKeyClass(FloatWritable.class);  
  44. // 因為Map的輸出會按key來排序,所以這里擴展了一個排序比較方法  
  45.    job.setOutputKeyComparatorClass(DecreasingFloatComparator.class);  
  46.    job.setOutputValueClass(SelectorEntry.class);  
  47. // 設置輸出格式,這個類繼承自OutputFormat,如果用戶要擴展自己的OutputFormat,那必須繼承自這個抽象接口  
  48.    job.setOutputFormat(GeneratorOutputFormat.class);  
  49.     
  50.     
  51.    try {  
  52.      JobClient.runJob(job);   // 提交任務  
  53.    } catch (IOException e) {  
  54.      throw e;  
  55.    }  



下面主要分析一下Selector這個類,它使用了多重繼承,同時實現了三個接口,Mapper,Partitioner,Reducer

* 下面是SelectorMapper的分析

 這里的Map主要做了幾件事:

  • 如果有filter設置,先對url進行過濾
  • 通過FetchSchedule查看當前url是不達到了抓取的時間,沒有達到抓取時間的就過濾掉
  • 計算新的排序分數,根據url的當前分數,這里調用了ScoringFiltersgeneratorSortValue方法
  • 對上一步產生的分數進行過濾,當這個分數小於一定的閥值時,對url進行過濾
  • 收集所有沒有被過濾的url信息,輸出為<FloatWritable,SelectorEntry>類型,這里的key就是第三步計算出來的分數,  Map的輸出會調用DecreasingFloatComparator方法來對這個key進行排序

* Selector中的Partition方法主要是調用了URLPartition來進行相應的分塊操作

 這里會首先根據urlhashCode來進行partition,如果用戶設置了根據domain或者ip來進行partition,那這里會根據用戶的配置來
 
進行相應的partition操作,最后調用如下方法來得到一個映射的reduceID
 (hashCode & Integer.MAX_VALUE) % numReduceTasks;

* Selector中的Reducer操作主要是收集沒有被過濾的url,每個reducerurl個數不會超過limit個數,這個limit是通過如下公式計算的

 

  1. limit = job.getLong(GENERATOR_TOP_N, Long.MAX_VALUE) / job.getNumReduceTasks();  

 GENERATOR_TOP_N是用戶定義的,reducer的個數也是用戶定義的。
 
在一個reducer任務中,如果收集的url個數超過了這個limit,那就新開一個segment,這里的segment也有一個上限,就是用戶設置的maxNumSegments, 當新開的segment個數大小這個maxNumSegment時,url就會被過濾掉。
 
這里urlsegment中的分布有兩個情況,一種是當沒有設置GENERATOR_MAX_COUNT這個參數時,每一個segment中所包含的url個數不超過limit上限,segmetn中對urlhost個數沒有限制,而segment個數的上限為maxNumSegments這個變量的值,這個變量是通過設置GENERATOR_MAX_NUM_SEGMENTS這個參數得到的,默認為1,所以說默認只產生一個segment; 而當設置了GENERATOR_MAX_COUNT的時候,每一個segment中所包含的urlhost的個數的上限就是這個maxCount的值,也就是說每一個segment所包含的同一個hosturl的個數不能超過maxCount這個值,當超過這個值后,就把這個url放到下一個segment中去。 
 
舉個簡單的例子,如果Reducer中收到10url,而現在maxNumSegments2limit5,也就是說一個segment最多放5url,那這時如果用第一種設置的話,那0-4url會放在第一個segment中,5-9url會放在第二個segment,這樣的話,兩個segment都放了5url;但如果用第二種方法,這里設置的maxCount4,但我們這里的10urlhost分成2類,也就是說0-4url屬於同一個host1, 5-9url屬於host2,那這里會把0-4個中的前4url放在segment1中,host1的第5url放在segmetn2中,而host2中的5-8url會放在segment1中,而第9個網頁會放在segment2中,因為這里的maxCount設置為4,也就是說在每一個segment中,一個host所對應的url不能超過4,所以這里的segment1放了8url,而segment2放了2url,這里會現出不均勻的情況。


*
有沒有注意到這里的OutputFormat使用了GenerateOutputFormat,它擴展了MultipleSequenceFileOutputFormat,重寫了generateFileNameForKeyValue這個方法,就是對不同的segment生成不同的目錄名,生成規則如下
  

 

  1. "fetchlist-" + value.segnum.toString() + "/" + name;  

 

2.2 第二部分是讀取上面產生的url子集,生成多個segment,主要代碼如下:

     

 

  1. // read the subdirectories generated in the temp  
  2.    // output and turn them into segments  
  3.    List<Path> generatedSegments = new ArrayList<Path>();  
  4.     
  5.     
  6.    FileStatus[] status = fs.listStatus(tempDir);  // 這里讀取上面生成的多個fetchlistsegment  
  7.    try {  
  8.      for (FileStatus stat : status) {  
  9.        Path subfetchlist = stat.getPath();  
  10.        if (!subfetchlist.getName().startsWith("fetchlist-")) continue;   // 過濾不是以fetchlist-開頭的文件  
  11.        // start a new partition job for this segment  
  12.        Path newSeg = partitionSegment(fs, segments, subfetchlist, numLists);   // segment進行Partition操作,產生一個新的目錄  
  13.        generatedSegments.add(newSeg);  
  14.      }  
  15.    } catch (Exception e) {  
  16.      LOG.warn("Generator: exception while partitioning segments, exiting ...");  
  17.      fs.delete(tempDir, true);  
  18.      return null;  
  19.    }  
  20.     
  21.     
  22.    if (generatedSegments.size() == 0) {  
  23.      LOG.warn("Generator: 0 records selected for fetching, exiting ...");  
  24.      LockUtil.removeLockFile(fs, lock);  
  25.      fs.delete(tempDir, true);  
  26.      return null;  
  27.    }  

 

  * 下面主要對這個partitionSegment函數進行分析,看看到底做了些什么

 

  1.    // invert again, partition by host/domain/IP, sort by url hash  
  2. // 從代碼的注釋中我們可以看到,這里主要是對urlhost/domain/IP進行分類  
  3. // NOTE:這里的分類就是Partition的意思,就是相同host或者是domain或者是IPurl發到同一台機器上  
  4. // 這里主要是通過URLPartitioner來做的,具體是按哪一個來分類,是通用參數來配置的,這里有PARTITION_MODE_DOMAINPARTITION_MODE_IP  
  5. // 來配置,默認是按UrlhashCode來分。  
  6.     if (LOG.isInfoEnabled()) {  
  7.         LOG.info("Generator: Partitioning selected urls for politeness.");  
  8.     }  
  9.     Path segment = new Path(segmentsDir, generateSegmentName()); // 也是在segmentDir目錄產生一個新的目錄,以當前時間命名  
  10.     Path output = new Path(segment, CrawlDatum.GENERATE_DIR_NAME); // 在上面的目錄下再生成一個特定的crawl_generate目錄  
  11.     
  12.     
  13.     LOG.info("Generator: segment: " + segment);  
  14. 下面又用一個MP任務來做  
  15.     NutchJob job = new NutchJob(getConf());  
  16.     job.setJobName("generate: partition " + segment);  
  17.     job.setInt("partition.url.seed", new Random().nextInt()); // 這里產生一個Partition的隨機數  
  18.     
  19.     
  20.     FileInputFormat.addInputPath(job, inputDir);                // 輸入目錄名  
  21.   job.setInputFormat(SequenceFileInputFormat.class);          // 輸入文件格式  
  22.     
  23.     
  24.     job.setMapperClass(SelectorInverseMapper.class);            // 輸入的Mapper,主要是過濾原來的key,使用url來做為新的key  
  25.     job.setMapOutputKeyClass(Text.class);                       // Mapperkey輸出類型,這里就是url的類型  
  26.     job.setMapOutputValueClass(SelectorEntry.class);            // Mappervalue的輸出類型,這里還是原因的SelectorEntry類型  
  27.     job.setPartitionerClass(URLPartitioner.class);              // 這里的key(url)Partition使用這個類來做,這個類前面有說明  
  28.     job.setReducerClass(PartitionReducer.class);                // 這里的Reducer類,  
  29.     job.setNumReduceTasks(numLists);                            // 這里配置工作的Reducer的個數,也就是生成幾個相應的輸出文件  
  30.     
  31.     
  32.     FileOutputFormat.setOutputPath(job, output);                // 配置輸出路徑  
  33.   job.setOutputFormat(SequenceFileOutputFormat.class);      // 配置輸出格式  
  34.     job.setOutputKeyClass(Text.class);                          // 配置輸出的keyvalue的類型  
  35.     job.setOutputValueClass(CrawlDatum.class);                  // 注意這里返回的類型為<Text,CrawlDatum>  
  36.     job.setOutputKeyComparatorClass(HashComparator.class);      // 這里定義控制key排序的比較方法  
  37.     JobClient.runJob(job);                                      // 提交任務  
  38.     return segment;   

 

2.3 第三部分是更新crawldb數據庫,以保證下一次Generate不會包含相同的url,這個是可以配置的,主要代碼如下:

 

  1. if (getConf().getBoolean(GENERATE_UPDATE_CRAWLDB, false)) {   // 判斷是否要把狀態更新到原來的數據庫中  
  2.         // update the db from tempDir  
  3.         Path tempDir2 = new Path(getConf().get("mapred.temp.dir", ".") + "/generate-temp-"  
  4.          + System.currentTimeMillis());  
  5.     
  6.     
  7.         job = new NutchJob(getConf()); // 生成MP任務的配置  
  8.         job.setJobName("generate: updatedb " + dbDir);  
  9.         job.setLong(Nutch.GENERATE_TIME_KEY, generateTime);  
  10.     // 加上面生成的所有segment的路徑做為輸入  
  11.         for (Path segmpaths : generatedSegments) { // add each segment dir to input path  
  12.         Path subGenDir = new Path(segmpaths, CrawlDatum.GENERATE_DIR_NAME);  
  13.         FileInputFormat.addInputPath(job, subGenDir);  
  14.         }  
  15.         // add current crawldb to input path  
  16.     // 把數據庫的路徑也做為輸入  
  17.         FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME));  
  18.         job.setInputFormat(SequenceFileInputFormat.class);          // 定義了輸入格式  
  19.         job.setMapperClass(CrawlDbUpdater.class);                   // 定義了MapperReducer方法  
  20.         job.setReducerClass(CrawlDbUpdater.class);  
  21.         job.setOutputFormat(MapFileOutputFormat.class);             // 定義了輸出格式  
  22.         job.setOutputKeyClass(Text.class);                          // 定義了輸出的keyvalue的類型  
  23.         job.setOutputValueClass(CrawlDatum.class);  
  24.         FileOutputFormat.setOutputPath(job, tempDir2);              // 定義了臨時輸出目錄  
  25.         try {  
  26.           JobClient.runJob(job);  
  27.           CrawlDb.install(job, dbDir);                              // 刪除原來的數據庫,把上面的臨時輸出目錄重命名為真正的數據目錄名  
  28.         } catch (IOException e) {  
  29.           LockUtil.removeLockFile(fs, lock);  
  30.           fs.delete(tempDir, true);  
  31.           fs.delete(tempDir2, true);  
  32.           throw e;  
  33.         }  
  34.         fs.delete(tempDir2, true);  
  35.     }     

 

* 下面我們來看一下CrawlDbUpdater類做了些什么,它實現了MapperReducer的接口,接口說明如下

它是用來更新CrawlDb數據庫,以保證下一次Generate不會包含相同的url
它的map函數很簡單,只是收集相應的<key,value>操作,沒有做其它操作,下面我們來看一下它的reduce方法做了些什么

 

  1. genTime.set(0L);  
  2.             while (values.hasNext()) { // 這里遍歷相同urlCrawlDatum  
  3.         CrawlDatum val = values.next();  
  4.         if (val.getMetaData().containsKey(Nutch.WRITABLE_GENERATE_TIME_KEY)) {   // 判斷當前url是否已經被generate  
  5.                 LongWritable gt = (LongWritable) val.getMetaData().get(  
  6.                 Nutch.WRITABLE_GENERATE_TIME_KEY);                              // 得到Generate的時間   
  7.                 genTime.set(gt.get());  
  8.                 if (genTime.get() != generateTime) {    // 還沒看明白這里是什么意思,一種情況會產生不相同,當這個url已經被generate一次,這里被第二次generate,所以會產生時間不同  
  9.                 orig.set(val);  
  10.                 genTime.set(0L);  
  11.                 continue;       // 不知道這里為什么要加continue,加與不加應該沒什么區別  
  12.                 }  
  13.             } else {  
  14.                 orig.set(val);  
  15.             }  
  16.             }  
  17.             if (genTime.get() != 0L) {        // NOTE:想想這里什么時候genTime0,當這個url被過濾掉,或者沒有符合Generate要求,或者分數小於相應的閥值時  
  18.         orig.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);  // 設置新的Generate時間  
  19.             }  
  20.             output.collect(key, orig);  

 

3. 總結

這里大概介紹了一下Generate的流程,其中大量用到了MapReduce任務,還有大量的配置,要深入理解還需要去自己實踐來加深理解。

 

作者:http://blog.csdn.net/amuseme_lu

 

 


 

 

 

相關文章閱讀及免費下載:

 

Apache Nutch 1.3 學習筆記目錄

 

Apache Nutch 1.3 學習筆記一

 

Apache Nutch 1.3 學習筆記二

 

Apache Nutch 1.3 學習筆記三(Inject)

 

Apache Nutch 1.3 學習筆記三(Inject CrawlDB Reader)

 

Apache Nutch 1.3 學習筆記四(Generate)

 

Apache Nutch 1.3 學習筆記四(SegmentReader分析)

 

Apache Nutch 1.3 學習筆記五(FetchThread)

 

Apache Nutch 1.3 學習筆記五(Fetcher流程)

 

Apache Nutch 1.3 學習筆記六(ParseSegment)

 

Apache Nutch 1.3 學習筆記七(CrawlDb - updatedb)

 

Apache Nutch 1.3 學習筆記八(LinkDb)

 

Apache Nutch 1.3 學習筆記九(SolrIndexer)

 

Apache Nutch 1.3 學習筆記十(Ntuch 插件機制簡單介紹)

 

Apache Nutch 1.3 學習筆記十(插件擴展)

 

Apache Nutch 1.3 學習筆記十(插件機制分析)

 

Apache Nutch 1.3 學習筆記十一(頁面評分機制 OPIC)

 

Apache Nutch 1.3 學習筆記十一(頁面評分機制 LinkRank 介紹)

 

Apache Nutch 1.3 學習筆記十二(Nutch 2.0 的主要變化)

 

更多《Apache Nutch文檔》,盡在開卷有益360 http://www.docin.com/book_360

 


注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2021 ITdaan.com