MepReduce 預設的輸出格式為 TextOutputFormat (輸出檔名為 part-r-00000), 若想變更part 名稱可直接設定參數 mapreduce.output.basename 即可.
但若一reduce 想依傳入的資料分類輸出在不同的檔案內, 則需要自己提供 OutputFormat, 較簡易的方式則可直接繼承 FileOutputFormat 來達成
► 建立自訂格式
► CustomOutputFormat 建立可動態指定檔名的類別
主要需實作 FileOutputFormat.RecordWriter, 原預設格式 TextOutputFomat 僅為單一檔案寫入模式, 這裡改由 MultiRecordWriter 讓其可同時進行多檔案的寫入維護, 藉此達成單一檔案輸出或多檔案輸出的能力. 並經由 customFileName 來取得產生的檔案名稱.
此部分是直接從 TextOutputFormat.LineRecordWriter 內抽出獨立出來, 主要作用為每筆資料的實際寫入/檔案關閉 等動作.
from http://www.infoq.com/cn/articles/HadoopOutputFormat
但若一reduce 想依傳入的資料分類輸出在不同的檔案內, 則需要自己提供 OutputFormat, 較簡易的方式則可直接繼承 FileOutputFormat 來達成
► 建立自訂格式
然後在 driver 內指定 job.setOutputFormatClass(MaxOutputFormat.class); 這樣就可依需求自定輸出檔名.public class MaxOutputFormat extends CustomOutputFormat<Text, IntWritable> { @Override protected String customFileName(Text key, IntWritable value, TaskAttemptContext job) { //依需求定義自己的輸出檔案名稱 String baseName = job.getConfiguration().get("mapreduce.output.basename");//系統參數指定名稱 String outName = (baseName==null)? key.toString() : baseName ; return getUniqueFile(job, outName, "");//基本名稱 + 任務資訊(-r-00000) } }
► CustomOutputFormat 建立可動態指定檔名的類別
主要需實作 FileOutputFormat.RecordWriter, 原預設格式 TextOutputFomat 僅為單一檔案寫入模式, 這裡改由 MultiRecordWriter 讓其可同時進行多檔案的寫入維護, 藉此達成單一檔案輸出或多檔案輸出的能力. 並經由 customFileName 來取得產生的檔案名稱.
► LineRecordWriterpublic abstract class CustomOutputFormat<K extends WritableComparable<?>, V extends Writable> extends FileOutputFormat<K, V> { protected abstract String customFileName(K key, V value, TaskAttemptContext job); // 藉由調用 customFileName 來取得自訂格式的檔案名稱 private RecordWriter<K, V> writer = null; // 實作 abstract FileOutputFormat.RecordWriter public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { if (writer == null) { writer = new MultiRecordWriter(job); } return writer; } // 可維護多檔案的 RecordWriter 類別 public class MultiRecordWriter extends RecordWriter<K, V> { private HashMap<String, RecordWriter<K, V>> recordWriters = null;//RecordWriter的暫存 private TaskAttemptContext job = null; private Path workPath = null;//任務輸出目錄 public MultiRecordWriter(TaskAttemptContext job) throws IOException { super(); this.job = job; recordWriters = new HashMap<String, RecordWriter<K, V>>(); //取得任務輸出路徑 OutputCommitter committer = getOutputCommitter(job); if (committer instanceof FileOutputCommitter) { this.workPath = ((FileOutputCommitter) committer).getWorkPath(); } else { Path outputPath = getOutputPath(job); if (outputPath == null) { throw new IOException("job 沒有定義輸出路徑"); } this.workPath = outputPath; } } @Override //實作 abstract RecordWriter.close public void close(TaskAttemptContext context) throws IOException,InterruptedException { Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator(); while (values.hasNext()) { values.next().close(context); } this.recordWriters.clear(); } @Override //實作 abstract RecordWriter.write public void write(K key, V value) throws IOException,InterruptedException { String baseName = customFileName(key, value, job);//得到輸出檔名 //從暫存取出 RecordWriter<K, V> rw = this.recordWriters.get(baseName); if (rw == null) {//若無則新建 rw = getBaseRecordWriter(job, baseName); this.recordWriters.put(baseName, rw);//加入暫存 } rw.write(key, value); } // 系統暫存檔輸出路徑 ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension} private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); String keyValueSeparator = conf.get("mapred.textoutputformat.separator", "/t");// 資料分隔符號 RecordWriter<K, V> recordWriter = null; if ( getCompressOutput(job) ) {// 使用壓縮 Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); Path file = new Path(workPath, baseName + codec.getDefaultExtension()); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); recordWriter = new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator); } else { Path file = new Path(workPath, baseName); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); recordWriter = new LineRecordWriter<K, V>(fileOut, keyValueSeparator); } return recordWriter; } } }
此部分是直接從 TextOutputFormat.LineRecordWriter 內抽出獨立出來, 主要作用為每筆資料的實際寫入/檔案關閉 等動作.
public class LineRecordWriter<K, V> extends RecordWriter<K, V> { private static final String utf8 = "UTF-8";// 定義字符編碼格式 private static final byte[] newline; static { try { newline = "\n".getBytes(utf8);//換行 } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } protected DataOutputStream out; private final byte[] keyValueSeparator; // 實現構造方法,出入輸出流對象和分隔符號 public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; try { this.keyValueSeparator = keyValueSeparator.getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } public LineRecordWriter(DataOutputStream out) { this(out, "\t"); } private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o; out.write(to.getBytes(), 0, to.getLength()); } else { out.write(o.toString().getBytes(utf8)); } } //將mapreduce的key,value以自定義格式寫入到輸出流中 public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (nullKey && nullValue) { return; } if (!nullKey) { writeObject(key); } if (!(nullKey || nullValue)) { out.write(keyValueSeparator); } if (!nullValue) { writeObject(value); } out.write(newline); } public synchronized void close(TaskAttemptContext context) throws IOException { out.close(); } }
from http://www.infoq.com/cn/articles/HadoopOutputFormat