MapReduce 輸出資料排序

1 篇文章 / 0 new
author
MapReduce 輸出資料排序
MR預設輸出時為以 key 作排序後輸出, 但若相同 key 其 value 並不會進行排序作業, 若需要連同 value 一起排序則需要自己進行, 以下是方法之一, 利用自訂排序類(需繼承WritableComparator)然後透過 job.setSortComparatorClass / job.setGroupingComparatorClass 指定, 但排序對像一樣是 key. 另須注意, 雖然兩個都是ComparatorClass但影響的後續動作卻不一樣
Sort Driver
public class MaxSortOutput extends Configured implements Tool {
    //for setSortComparatorClass 純粹資料排序
    public static class KeyComparator extends WritableComparator {
        protected KeyComparator() { super(DataPair.class, true); }
        @Override
        public int compare(WritableComparable w1, WritableComparable w2) {
            return ((DataPair)w1).compareTo((DataPair) w2);
        }
    }
    //for setGroupingComparatorClass
    public static class GroupComparator extends WritableComparator {
        protected GroupComparator() { super(DataPair.class, true); }
        @Override
        public int compare(WritableComparable w1, WritableComparable w2) {
            return ((DataPair)w1).compareKeyTo((DataPair) w2);
        }
    }
    @Override
    public int run(String[] args) throws Exception {
        Job job = new Job(getConf(), "Sort Output");
        job.setJarByClass(getClass());
        FileInputFormat.addInputPaths(job, "/in");
        FileOutputFormat.setOutputPath(job, new Path("/out"));
        job.setMapperClass(MyMapper.class);
        /*//將map寫入的資料進行分群,Partitioner數受ReduceTasks數量影響
        job.setNumReduceTasks(2);
        job.setPartitionerClass(DataPartitioner.class); */
        job.setSortComparatorClass(KeyComparator.class); //排序 map寫入的資料
        job.setGroupingComparatorClass(GroupComparator.class); //當回傳值不是 0 時則該筆資料會傳遞給 reduce
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(DataPair.class);
        job.setOutputValueClass(NullWritable.class);
        return job.waitForCompletion(true) ? 0 : 1;
    }
    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new MaxSortOutput(), args));
    }
    //分群
    public static class DataPartitioner extends Partitioner<DataPair, NullWritable> {        
        @Override
        public int getPartition(DataPair key, NullWritable value, int numPartitions) {
            return key.getKey() % numPartitions;//numPartitions值受job.setNumReduceTasks所影響
        }
    }
}
► 運作程序依序為 MyMapper -> DataPartitioner -> KeyComparator -> GroupComparator -> MyReducer
DataPartitioner(Partitioner)
用來自訂分群原則, 但可分的群組數受 ReduceTasks 數量所影響, 透過適當的分群可達成不同的排序目標. 另當對大量資料進行分群時, 若各群的數量能夠平均相對整體運作效能較佳. 至於分群原則除自行規劃外, 可透過取樣方法來獲得好的分群方式, hadoop 提供的取樣類有
RandomSampler : 亂數取得資料進行分群計算, 效率較低
IntervalSampler : 固定間距取樣, 適合已排序資料
SplitSampler : 從頭取出 n 個資料進行分群規劃, 因是從頭讀取資料所以不適合已排序資料, 效率較高
KeyComparator(SortComparator)
作用純粹是資料排序, 排序的資料範圍則受 分群 規則所影響, 僅對各自群內資料進行排序
GroupComparator(GroupingComparator)
雖然一樣繼承WritableComparator但實際的影響行為比較像 篩選 因為當傳回值為 0 則資料並不會傳給reducer, 傳回1,-1則會交給reducer處理, 且對資料排序無影響
data flow (from O'reilly - Hadoop.The.Definitive.Guide)
就個人覺得 Partitioner 在A作用, SortComparator與GroupingComparator作用在B或C.比較合理. 至於實際情況因尚未研究過核心程式細節所以就不確定了.
Map / Reducer
public class MyMapper extends Mapper<LongWritable, Text, DataPair, NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String year = line.substring(0, 4);
        int data = Integer.parseInt(line.substring(11, 15));
        //轉由自訂類 DataPair 儲存資料後當key來傳遞,主要目的為能進行兩個欄位進行排序
        context.write(new DataPair(Integer.parseInt(year), data), NullWritable.get());
    }
}
public class MyReducer extends Reducer<DataPair, NullWritable, DataPair, NullWritable> {
    @Override
    protected void reduce(DataPair key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.write(key, NullWritable.get());
    }
}
► 資料封裝, 用來將原本的 key, value 放入自訂類以便於排序處理
public class DataPair implements WritableComparable<DataPair> {
    private int key;
    private int value;
    public DataPair() {}
    public DataPair(int key, int value) {
        this.key = key;
        this.value = value;
    }
    @Override
    public int compareTo(DataPair ip) {
        int cmp = compare(key, ip.key);
        if (cmp != 0) { return cmp; }
        return compare(value, ip.value);
    }
    public int compareKeyTo(DataPair ip) { return compare(key, ip.key); }
    private int compare(int a, int b) { return (a < b ? -1 : (a == b ? 0 : 1)); }
    public int getKey() { return key; }
    public int getValue() { return value; }
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(key);
        out.writeInt(value);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        key = in.readInt();
        value = in.readInt();
    }
    @Override
    public int hashCode() { return key * 10000 + value; }
    @Override
    public boolean equals(Object o) {
        if (o instanceof DataPair) {
            DataPair ip = (DataPair) o;
            return key == ip.key && value == ip.value;
        }
        return false;
    }    
    @Override //輸出格式
    public String toString() { return key + " " + value; }
}
關鍵字: 
Free Web Hosting