MapReduce 自訂檔案輸出格式(1)

1 篇文章 / 0 new
author
MapReduce 自訂檔案輸出格式(1)
MepReduce 預設的輸出格式為 TextOutputFormat (輸出檔名為 part-r-00000), 若想變更part 名稱可直接設定參數 mapreduce.output.basename 即可.

但若一reduce 想依傳入的資料分類輸出在不同的檔案內, 則需要自己提供 OutputFormat, 較簡易的方式則可直接繼承 FileOutputFormat 來達成
► 建立自訂格式
public class MaxOutputFormat extends CustomOutputFormat<Text, IntWritable> {
    @Override
    protected String customFileName(Text key, IntWritable value, TaskAttemptContext job) {
        //依需求定義自己的輸出檔案名稱
        String baseName = job.getConfiguration().get("mapreduce.output.basename");//系統參數指定名稱
        String outName = (baseName==null)? key.toString() : baseName ;
        return getUniqueFile(job, outName, "");//基本名稱 + 任務資訊(-r-00000)
    }
}
然後在 driver 內指定 job.setOutputFormatClass(MaxOutputFormat.class); 這樣就可依需求自定輸出檔名.

CustomOutputFormat 建立可動態指定檔名的類別
主要需實作 FileOutputFormat.RecordWriter, 原預設格式 TextOutputFomat 僅為單一檔案寫入模式, 這裡改由 MultiRecordWriter 讓其可同時進行多檔案的寫入維護, 藉此達成單一檔案輸出或多檔案輸出的能力. 並經由 customFileName 來取得產生的檔案名稱.
public abstract class CustomOutputFormat<K extends WritableComparable<?>, V extends Writable> extends FileOutputFormat<K, V> {
    protected abstract String customFileName(K key, V value, TaskAttemptContext job);
    // 藉由調用 customFileName 來取得自訂格式的檔案名稱
 
    private RecordWriter<K, V> writer = null;
    // 實作  abstract FileOutputFormat.RecordWriter
    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        if (writer == null) {
            writer = new MultiRecordWriter(job);
        }
        return writer;
    }
    // 可維護多檔案的 RecordWriter 類別
    public class MultiRecordWriter extends RecordWriter<K, V> {
        private HashMap<String, RecordWriter<K, V>> recordWriters = null;//RecordWriter的暫存
        private TaskAttemptContext job = null;
        private Path workPath = null;//任務輸出目錄
        public MultiRecordWriter(TaskAttemptContext job) throws IOException {
            super();
            this.job = job;
            recordWriters = new HashMap<String, RecordWriter<K, V>>();
            //取得任務輸出路徑
            OutputCommitter committer = getOutputCommitter(job);
            if (committer instanceof FileOutputCommitter) {
                this.workPath = ((FileOutputCommitter) committer).getWorkPath();
            } else {
                Path outputPath = getOutputPath(job);
                if (outputPath == null) {
                    throw new IOException("job 沒有定義輸出路徑");
                }
                this.workPath = outputPath;
            }
        }
        @Override //實作 abstract RecordWriter.close
        public void close(TaskAttemptContext context) throws IOException,InterruptedException {
            Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator();
            while (values.hasNext()) {
                values.next().close(context);
            }
            this.recordWriters.clear();
        }
        @Override //實作 abstract RecordWriter.write
        public void write(K key, V value) throws IOException,InterruptedException {            
            String baseName = customFileName(key, value, job);//得到輸出檔名
            //從暫存取出
            RecordWriter<K, V> rw = this.recordWriters.get(baseName);
            if (rw == null) {//若無則新建
                rw = getBaseRecordWriter(job, baseName);
                this.recordWriters.put(baseName, rw);//加入暫存
            }
            rw.write(key, value);
        }
        // 系統暫存檔輸出路徑 ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension}
        private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException, InterruptedException {
            Configuration conf = job.getConfiguration();
            String keyValueSeparator = conf.get("mapred.textoutputformat.separator", "/t");// 資料分隔符號
            RecordWriter<K, V> recordWriter = null;
            if ( getCompressOutput(job) ) {// 使用壓縮
                Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);
                CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
                Path file = new Path(workPath, baseName    + codec.getDefaultExtension());
                FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
                recordWriter = new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator);
            } else {
                Path file = new Path(workPath, baseName);
                FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
                recordWriter = new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
            }
            return recordWriter;
        }
    }
}
LineRecordWriter
此部分是直接從 TextOutputFormat.LineRecordWriter 內抽出獨立出來, 主要作用為每筆資料的實際寫入/檔案關閉 等動作.
public class LineRecordWriter<K, V> extends RecordWriter<K, V> {
    private static final String utf8 = "UTF-8";// 定義字符編碼格式
    private static final byte[] newline;
    static {
        try {
            newline = "\n".getBytes(utf8);//換行
        } catch (UnsupportedEncodingException uee) {
            throw new IllegalArgumentException("can't find " + utf8    + " encoding");
        }
    }
    protected DataOutputStream out;
    private final byte[] keyValueSeparator;
    // 實現構造方法,出入輸出流對象和分隔符號
    public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
        this.out = out;
        try {
            this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
        } catch (UnsupportedEncodingException uee) {
            throw new IllegalArgumentException("can't find " + utf8    + " encoding");
        }
    }
    public LineRecordWriter(DataOutputStream out) {
        this(out, "\t");
    }
    private void writeObject(Object o) throws IOException {
        if (o instanceof Text) {
            Text to = (Text) o;
            out.write(to.getBytes(), 0, to.getLength());
        } else {
            out.write(o.toString().getBytes(utf8));
        }
    }
    //將mapreduce的key,value以自定義格式寫入到輸出流中
    public synchronized void write(K key, V value) throws IOException {
        boolean nullKey = key == null || key instanceof NullWritable;
        boolean nullValue = value == null || value instanceof NullWritable;
        if (nullKey && nullValue) {
            return;
        }
        if (!nullKey) {
            writeObject(key);
        }
        if (!(nullKey || nullValue)) {
            out.write(keyValueSeparator);
        }
        if (!nullValue) {
            writeObject(value);
        }
        out.write(newline);
    }
    public synchronized void close(TaskAttemptContext context) throws IOException {
        out.close();
    }
}

from http://www.infoq.com/cn/articles/HadoopOutputFormat
關鍵字: 
Free Web Hosting