Java+大數據開發——Hadoop集群環境搭建(二)


1. MAPREDUCE使用

mapreducehadoop中的分布式運算編程框架,只要按照其編程規范,只需要編寫少量的業務邏輯代碼即可實現一個強大的海量數據並發處理程序

 

2. Demo開發——wordcount

2.1需求

從大量(比如T級別)文本文件中,統計出每一個單詞出現的總次數。

 

2.2mapreduce 實現思路

Map階段:

a) HDFS的源數據文件中逐行讀取數據

b) 將每一行數據切分出單詞

c) 為每一個單詞構造一個鍵值對(單詞,1)

d) 將鍵值對發送給reduce

 

Reduce階段

a) 接收map階段輸出的單詞鍵值對

b) 將相同單詞的鍵值對匯聚成一組

c) 對每一組,遍歷組中的所有“值”,累加求和,即得到每一個單詞的總次數

d) (單詞,總次數)輸出到HDFS的文件中

 

2.3具體編碼實現

(1)定義一個mapper

 

//首先要定義四個泛型的類型
//keyin: LongWritable valuein: Text
//keyout: Text valueout:IntWritable

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
//map方法的生命周期: 框架每傳一行數據就被調用一次
//key : 這一行的起始點在文件中的偏移量
//value: 這一行的內容
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//拿到一行數據轉換為string
String line = value.toString();
//將這一行切分出各個單詞
String[] words = line.split(" ");
//遍歷數組,輸出<單詞,1>
for(String word:words){
context.write(
new Text(word), new IntWritable(1));
}
}
}

 

(2)定義一個reducer

 

//生命周期:框架每傳遞進來一個kv 組,reduce方法被調用一次
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//定義一個計數器
int count = 0;
//遍歷這一組kv的所有v,累加到count中
for(IntWritable value:values){
count
+= value.get();
}
context.write(key,
new IntWritable(count));
}
}

 

(3)定義一個主類,用來描述job並提交job

 

public class WordCountRunner {
//把業務邏輯相關的信息(哪個是mapper,哪個是reducer,要處理的數據在哪里,輸出的結果放哪里。。。。。。)描述成一個job對象
//把這個描述好的job提交給集群去運行
public static void main(String[] args) throws Exception {
Configuration conf
= new Configuration();
Job wcjob
= Job.getInstance(conf);
//指定我這個job所在的jar包
// wcjob.setJar("/home/hadoop/wordcount.jar");
wcjob.setJarByClass(WordCountRunner.class);

wcjob.setMapperClass(WordCountMapper.
class);
wcjob.setReducerClass(WordCountReducer.
class);
//設置我們的業務邏輯Mapper類的輸出key和value的數據類型
wcjob.setMapOutputKeyClass(Text.class);
wcjob.setMapOutputValueClass(IntWritable.
class);
//設置我們的業務邏輯Reducer類的輸出key和value的數據類型
wcjob.setOutputKeyClass(Text.class);
wcjob.setOutputValueClass(IntWritable.
class);
//指定要處理的數據所在的位置
FileInputFormat.setInputPaths(wcjob, "hdfs://hdp-server01:9000/wordcount/data/big.txt");
//指定處理完成之后的結果所保存的位置
FileOutputFormat.setOutputPath(wcjob, new Path("hdfs://hdp-server01:9000/wordcount/output/"));

//向yarn集群提交這個job
boolean res = wcjob.waitForCompletion(true);
System.exit(res
?0:1);
}

 

3.程序打包運行

 1. 將程序打包

 2. 准備輸入數據

vi  /home/hadoop/test.txt

Hello tom
Hello jim
Hello ketty
Hello world
Ketty tom

hdfs上創建輸入數據文件夾

hadoop   fs  mkdir  -p  /wordcount/input

words.txt上傳到hdfs

hadoop  fs  –put  /home/hadoop/words.txt  /wordcount/input

3. 將程序jar包上傳到集群的任意一台服務器上

4. 使用命令啟動執行wordcount程序jar

$ hadoop jar wordcount.jar cn.itcast.bigdata.mrsimple.WordCountDriver /wordcount/input /wordcount/out

5. 查看執行結果

$ hadoop fs –cat /wordcount/out/part-r-00000

 

作者:傑瑞教育
出處:http://www.cnblogs.com/jerehedu/ 
版權聲明:本文版權歸傑瑞教育技有限公司和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利。
技術咨詢:JRedu技術交流

 


注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
  © 2014-2022 ITdaan.com 联系我们: