MapReduce 自定檔案資料輸入

1 篇文章 / 0 new
author
MapReduce 自定檔案資料輸入
MepReduce 預設使用的輸入格式為 TextInputFormat 其以行為單位. 傳遞給 mapper 進行處理, 若要變更行為則須自行撰寫 InputForm, 以下是直接複製 TextInputFormat 然後變更其資料提交行為. 主要是將 nextKeyValue() 內的 readLine 作替換.
► 建立 InputFormat
由原本傳回的 LineRecordReader 改成自己的 CustomLineRecordReader 然後在 driver 內設置 job.setInputFormatClass(MaxInputFormat.class); 即可改用自己的方式處理讀入與每次傳遞資料.
public class MaxInputFormat extends FileInputFormat<LongWritable,Text>{
    public RecordReader<LongWritable, Text> createRecordReader(
            InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        return new CustomLineRecordReader();
    }
}
► 自訂的Reader CustomLineRecordReader
主要內容同 TextInputFormat , 只要增加自行處理部分為 itemNext, 就依測試每筆資料內容而言可以直接在 mapper處裡就好而無須自行撰寫 InputFormat, 改寫目的只是為了學習.
public class CustomLineRecordReader extends RecordReader<LongWritable, Text> {
    private long start;
    private long pos;
    private long end;
    private LineReader in;
    private int maxLineLength;
    private LongWritable key = new LongWritable();
    private Text value = new Text();
    private static final Log LOG = LogFactory.getLog(CustomLineRecordReader.class);
    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
        // 單筆許可最大長度
        Configuration job = context.getConfiguration();
        maxLineLength = job.getInt("mapred.linerecordreader.maxlength",Integer.MAX_VALUE);
        // genericSplit 用來通知此次處理資料的範圍
        FileSplit split = (FileSplit)genericSplit;
        start = split.getStart();
        end = start + split.getLength();
        final Path file = split.getPath();//要處理的目標檔案
        FileSystem fs = file.getFileSystem(job);
        FSDataInputStream fileIn = fs.open(split.getPath());
        fileIn.seek(start);//移到此次處理的起始位置
        in = new LineReader(fileIn, job);
        if (start != 0) {// 移到次處理的起始位置
            start += in.readLine(new Text(), 0,(int) Math.min((long) Integer.MAX_VALUE, end - start));
        }
        this.pos = start;//紀錄開始位置
    }
    public boolean nextKeyValue() throws IOException {
        key.set(pos);//設置提交資料key, 資料位於檔案的起始位置
        int newSize = 0;
        while (pos < end) {
            //每次讀入一行, 儲存至 value
            //newSize = in.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength));
            //自行處理每次讀入內容
            newSize = readItem(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength));
            //沒有新資料則結束
            if (newSize == 0) {
                break;
            }
            // 下一筆起始位置
            pos += newSize;
            //System.out.printf("[pos:%d end:%d total:%d] v:%s\n",key.get(),pos,end, value);
            if (newSize < maxLineLength) {
                break; // 符合單筆最大限制, 跳出提交資料  return true(found key/value)
            }
            // 單資資料超過許可長度
            LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
        }
        if (newSize == 0) {//已經達到了分裂的結束
            key = null;
            value = null;
            return false;
        } else {//告訴 hadoop 已經備好 key/value
            return true;
        }
    }
    //自行處理每次傳給 mapper 的 key/value
    ArrayList<String> valueBuffer = new ArrayList<String>();//暫存item
    int lineLength = 0; //紀錄已讀取資料長度
    Text tmpLine = new Text();
    private int readItem(Text text, int maxLineLength, int maxBytesToConsume) throws IOException {
        text.clear();
        int fristItem = 1;
        if (valueBuffer.isEmpty()) {
            //每筆內容:1950-12-04 7999,1950-12-04 7999,1950-12-04 7999
            pos = lineLength;//紀錄位置調整
            int len = in.readLine(tmpLine, maxLineLength, maxBytesToConsume);
            if (len>0) {//item 資料以逗點作分隔
                lineLength += len;
                String[] aryStr = tmpLine.toString().split("[,]");
                for(String s : aryStr) {  
                    valueBuffer.add(s);
                }
            }
            fristItem = 0;
        }
        if (!valueBuffer.isEmpty()) {
            text.set(valueBuffer.get(0));
            valueBuffer.remove(0);
        }
        //System.out.printf("[i:%d]\n",text.getLength()+fristItem);
        return text.getLength()+fristItem;
    }
    public float getProgress() throws IOException, InterruptedException {
        if (start == end) {
            return 0.0f;
        } else {
            return Math.min(1.0f, (pos - start) / (float) (end - start));
        }
    }
    public LongWritable getCurrentKey() throws IOException,    InterruptedException {
        return key;
    }
    public Text getCurrentValue() throws IOException, InterruptedException {
        return value;
    }
    public synchronized void close() throws IOException {
        if (in != null) {
            in.close();
        }
    }
}
► 輸入的檔案內容
1950-12-04 7999,1950-12-04 7999,1950-12-04 7999,1950-12-04 7999,1950-12-04 7110
1950-12-10 8999,1950-12-04 7999,1950-12-04 7999,1950-12-04 7999,1950-12-04 7111
關鍵字: 
Free Web Hosting