MapReduce 基本運作

1 篇文章 / 0 new
author
MapReduce 基本運作
整個運算基本有三程序所組成, 運用常見範例來了解 MapReduce 運作最基本流程, 首先從整個程序的進入點開始

MapReduce Driver 整個程序作業配置與啟動
public class WordCount {
    public static void main(String[] args) throws Exception  {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "Word Count");
        //工作配置
        job.setJarByClass(WordCount.class);
        job.setMapperClass(MyMapper.class);//指定讀入數據產生資料集的 Mapper 類別
        job.setCombinerClass(MyReducer.class);//非必要, 用來做前置處理以減少資訊量
        job.setReducerClass(MyReducer.class);//指定 資料集處理的 Reducer 類別
        job.setOutputKeyClass(Text.class);//指定 key 類型
        job.setOutputValueClass(IntWritable.class);//指定 value 類型
        FileInputFormat.setInputPaths(job, new Path("/data.txt"));//指定資料源,若為目錄則表示要處理該目錄下所有檔案資料
        FileOutputFormat.setOutputPath(job, new Path("/out"));//指定最終資料的輸出目錄, 該目錄不可以先存在
        System.exit(job.waitForCompletion(true) ? 0 : 1);//執行
    }
}
WordCount 交由hadoop運作後, job 則自動讀取 data.txt 的內容然後逐行交給 mapper 處理.
假設 data.txt 內容如下
a b c d
a c d


Mapper 接收由 Job 逐一傳入的每行資料
public class MyMapper extends Mapper<Object, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    // 實作 map
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        //資料處理作為
        StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            context.write(word, one);//資料存入key=value
        }
    };
}
MyMapper.map 將接收到的資料如下, 因檔案共兩行所以會傳入兩次. key 通常為該筆資料在檔案的起始位置, value 則為檔案該行的資料內容.
key=0 values=[a b c d]
key=9 values=[a c d]
map 處理後資料寫入 context, 內容為
[a,1], [b,1], [c,1], [d,1], [a,1], [c,1], [d,1]


Reducer Job 將 mapper 產生的資料集以相同 key 為集合組成 values(Iterable)後, 逐一交給 reduce 處理
public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();
    // 實作 reduce
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {//Iterable 只要取出後資料即消失
            sum += val.get();
        }
        result.set(sum);
        context.write(key, result);//結果寫入 part-r-xxxxx 檔內
    };
}
MyReducer.reduce 則會接收到如下資料, 雖然map共寫入7筆資料, 但傳給reduce時會將相同 key 組成資料集再傳入, 如此可節省傳遞次數加快處理, 所以reduce僅接收到四筆資料. (注意 key 預設並無排序),
key=a values=[1,1]
key=b values=[1]
key=c values=[1,1]
key=d values=[1,1]
reduce 處理後資料寫入 context, 內容為
[a,2], [b,1], [c,2], [d,2]
完成整個處理作業


►整個 MapReduce 資料處理概念圖
►其他
增加輸入檔案過濾機制 FileInputFormat.setInputPathFilter
關鍵字: 
Free Web Hosting